Annotation of src/usr.bin/rsync/sender.c, Revision 1.8
1.8 ! florian 1: /* $Id: sender.c,v 1.7 2019/02/16 05:06:30 deraadt Exp $ */
1.1 benno 2: /*
3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4: *
5: * Permission to use, copy, modify, and distribute this software for any
6: * purpose with or without fee is hereby granted, provided that the above
7: * copyright notice and this permission notice appear in all copies.
8: *
9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16: */
1.8 ! florian 17: #include <sys/mman.h>
! 18: #include <sys/queue.h>
1.1 benno 19: #include <sys/stat.h>
20:
21: #include <assert.h>
1.8 ! florian 22: #include <fcntl.h>
1.1 benno 23: #include <inttypes.h>
1.8 ! florian 24: #include <poll.h>
1.1 benno 25: #include <stdlib.h>
26: #include <string.h>
27: #include <unistd.h>
28:
29: #include "extern.h"
30:
31: /*
1.8 ! florian 32: * A request from the receiver to download updated file data.
! 33: */
! 34: struct send_dl {
! 35: int32_t idx; /* index in our file list */
! 36: struct blkset *blks; /* the sender's block information */
! 37: TAILQ_ENTRY(send_dl) entries;
! 38: };
! 39:
! 40: /*
! 41: * The current file being "updated": sent from sender to receiver.
! 42: * If there is no file being uploaded, "cur" is NULL.
! 43: */
! 44: struct send_up {
! 45: struct send_dl *cur; /* file being updated or NULL */
! 46: struct blkstat stat; /* status of file being updated */
! 47: int primed; /* blk_recv_ack() was called */
! 48: };
! 49:
! 50: TAILQ_HEAD(send_dlq, send_dl);
! 51:
! 52: /*
! 53: * We have finished updating the receiver's file with sender data.
! 54: * Deallocate and wipe clean all resources required for that.
! 55: */
! 56: static void
! 57: send_up_reset(struct send_up *p)
! 58: {
! 59:
! 60: assert(NULL != p);
! 61:
! 62: /* Free the download request, if applicable. */
! 63:
! 64: if (p->cur != NULL) {
! 65: free(p->cur->blks);
! 66: free(p->cur);
! 67: p->cur = NULL;
! 68: }
! 69:
! 70: /* If we mapped a file for scanning, unmap it and close. */
! 71:
! 72: if (p->stat.map != MAP_FAILED)
! 73: munmap(p->stat.map, p->stat.mapsz);
! 74:
! 75: p->stat.map = MAP_FAILED;
! 76: p->stat.mapsz = 0;
! 77:
! 78: if (p->stat.fd != -1)
! 79: close(p->stat.fd);
! 80:
! 81: p->stat.fd = -1;
! 82:
! 83: /* Now clear the in-transfer information. */
! 84:
! 85: p->stat.offs = 0;
! 86: p->stat.hint = 0;
! 87: p->primed = 0;
! 88: }
! 89:
! 90: /*
! 91: * Enqueue a download request, getting it off the read channel as
! 92: * quickly a possible.
! 93: * This frees up the read channel for further incoming requests.
! 94: * We'll handle each element in turn, up to and including the last
! 95: * request (phase change), which is always a -1 idx.
! 96: * Returns zero on failure, non-zero on success.
! 97: */
! 98: static int
! 99: send_dl_enqueue(struct sess *sess, struct send_dlq *q,
! 100: int32_t idx, const struct flist *fl, size_t flsz, int fd)
! 101: {
! 102: struct send_dl *s;
! 103:
! 104: /* End-of-phase marker. */
! 105:
! 106: if (idx == -1) {
! 107: if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
! 108: ERR(sess, "calloc");
! 109: return 0;
! 110: }
! 111: s->idx = -1;
! 112: s->blks = NULL;
! 113: TAILQ_INSERT_TAIL(q, s, entries);
! 114: return 1;
! 115: }
! 116:
! 117: /* Validate the index. */
! 118:
! 119: if (idx < 0 || (uint32_t)idx >= flsz) {
! 120: ERRX(sess, "file index out of bounds: invalid %"
! 121: PRId32 " out of %zu", idx, flsz);
! 122: return 0;
! 123: } else if (S_ISDIR(fl[idx].st.mode)) {
! 124: ERRX(sess, "blocks requested for "
! 125: "directory: %s", fl[idx].path);
! 126: return 0;
! 127: } else if (S_ISLNK(fl[idx].st.mode)) {
! 128: ERRX(sess, "blocks requested for "
! 129: "symlink: %s", fl[idx].path);
! 130: return 0;
! 131: } else if (!S_ISREG(fl[idx].st.mode)) {
! 132: ERRX(sess, "blocks requested for "
! 133: "special: %s", fl[idx].path);
! 134: return 0;
! 135: }
! 136:
! 137: if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
! 138: ERR(sess, "callloc");
! 139: return 0;
! 140: }
! 141: s->idx = idx;
! 142: s->blks = NULL;
! 143: TAILQ_INSERT_TAIL(q, s, entries);
! 144:
! 145: /*
! 146: * This blocks til the full blockset has been read.
! 147: * That's ok, because the most important thing is getting data
! 148: * off the wire.
! 149: */
! 150:
! 151: if (!sess->opts->dry_run) {
! 152: s->blks = blk_recv(sess, fd, fl[idx].path);
! 153: if (s->blks == NULL) {
! 154: ERRX1(sess, "blk_recv");
! 155: return 0;
! 156: }
! 157: }
! 158: return 1;
! 159: }
! 160:
! 161: /*
1.1 benno 162: * A client sender manages the read-only source files and sends data to
163: * the receiver as requested.
164: * First it sends its list of files, then it waits for the server to
165: * request updates to individual files.
1.8 ! florian 166: * It queues requests for updates as soon as it receives them.
1.1 benno 167: * Returns zero on failure, non-zero on success.
168: *
169: * Pledges: stdio, rpath, unveil.
170: */
171: int
172: rsync_sender(struct sess *sess, int fdin,
173: int fdout, size_t argc, char **argv)
174: {
175: struct flist *fl = NULL;
1.8 ! florian 176: size_t i, flsz = 0, phase = 0, excl;
1.1 benno 177: int rc = 0, c;
178: int32_t idx;
1.8 ! florian 179: struct pollfd pfd[3];
! 180: struct send_dlq sdlq;
! 181: struct send_up up;
! 182: struct stat st;
1.1 benno 183:
1.6 benno 184: if (pledge("stdio getpw rpath unveil", NULL) == -1) {
1.1 benno 185: ERR(sess, "pledge");
186: return 0;
187: }
188:
1.8 ! florian 189: memset(&up, 0, sizeof(struct send_up));
! 190: TAILQ_INIT(&sdlq);
! 191: up.stat.fd = -1;
! 192: up.stat.map = MAP_FAILED;
! 193:
1.1 benno 194: /*
195: * Generate the list of files we want to send from our
196: * command-line input.
197: * This will also remove all invalid files.
198: */
199:
1.4 deraadt 200: if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
1.1 benno 201: ERRX1(sess, "flist_gen");
202: goto out;
203: }
204:
205: /* Client sends zero-length exclusions if deleting. */
206:
1.4 deraadt 207: if (!sess->opts->server && sess->opts->del &&
1.7 deraadt 208: !io_write_int(sess, fdout, 0)) {
1.1 benno 209: ERRX1(sess, "io_write_int");
210: goto out;
1.2 benno 211: }
1.1 benno 212:
1.2 benno 213: /*
1.1 benno 214: * Then the file list in any mode.
215: * Finally, the IO error (always zero for us).
216: */
1.2 benno 217:
1.4 deraadt 218: if (!flist_send(sess, fdin, fdout, fl, flsz)) {
1.1 benno 219: ERRX1(sess, "flist_send");
220: goto out;
1.4 deraadt 221: } else if (!io_write_int(sess, fdout, 0)) {
1.1 benno 222: ERRX1(sess, "io_write_int");
223: goto out;
1.2 benno 224: }
1.1 benno 225:
226: /* Exit if we're the server with zero files. */
227:
1.5 deraadt 228: if (flsz == 0 && sess->opts->server) {
1.1 benno 229: WARNX(sess, "sender has empty file list: exiting");
230: rc = 1;
231: goto out;
1.4 deraadt 232: } else if (!sess->opts->server)
1.1 benno 233: LOG1(sess, "Transfer starting: %zu files", flsz);
234:
235: /*
236: * If we're the server, read our exclusion list.
237: * This is always 0 for now.
238: */
239:
240: if (sess->opts->server) {
1.4 deraadt 241: if (!io_read_size(sess, fdin, &excl)) {
1.1 benno 242: ERRX1(sess, "io_read_size");
243: goto out;
1.5 deraadt 244: } else if (excl != 0) {
1.1 benno 245: ERRX1(sess, "exclusion list is non-empty");
246: goto out;
247: }
248: }
249:
1.8 ! florian 250: /*
! 251: * Set up our poll events.
! 252: * We start by polling only in receiver requests, enabling other
! 253: * poll events on demand.
1.1 benno 254: */
255:
1.8 ! florian 256: pfd[0].fd = fdin; /* from receiver */
! 257: pfd[0].events = POLLIN;
! 258: pfd[1].fd = -1; /* to receiver */
! 259: pfd[1].events = POLLOUT;
! 260: pfd[2].fd = -1; /* from local file */
! 261: pfd[2].events = POLLIN;
! 262:
! 263: /* The main sender loop runs into phase == 2. */
1.1 benno 264:
265: for (;;) {
1.8 ! florian 266: assert(pfd[0].fd != -1);
! 267: if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
! 268: ERR(sess, "poll");
! 269: goto out;
! 270: } else if (c == 0) {
! 271: ERRX(sess, "poll: timeout");
1.1 benno 272: goto out;
273: }
274:
1.8 ! florian 275: for (i = 0; i < 3; i++)
! 276: if (pfd[i].revents & (POLLERR|POLLNVAL)) {
! 277: ERRX(sess, "poll: bad fd");
! 278: goto out;
! 279: } else if (pfd[i].revents & POLLHUP) {
! 280: ERRX(sess, "poll: hangup");
! 281: goto out;
! 282: }
! 283:
! 284: /*
! 285: * Flush out multiplexed messages.
! 286: * These might otherwise clog the reader.
! 287: */
! 288:
! 289: if (sess->mplex_reads &&
! 290: (POLLIN & pfd[0].revents)) {
! 291: if (!io_read_flush(sess, fdin)) {
! 292: ERRX1(sess, "io_read_flush");
! 293: goto out;
! 294: } else if (sess->mplex_read_remain == 0)
! 295: pfd[0].revents &= ~POLLIN;
! 296: }
! 297:
1.1 benno 298: /*
1.8 ! florian 299: * If we have a request coming down off the wire, pull
! 300: * it in as quickly as possible into our buffer.
! 301: * This unclogs the socket buffers so the data can flow.
1.1 benno 302: */
303:
1.8 ! florian 304: if (pfd[0].revents & POLLIN)
! 305: for (;;) {
! 306: if (!io_read_int(sess, fdin, &idx)) {
! 307: ERRX1(sess, "io_read_int");
! 308: goto out;
! 309: }
! 310: if (!send_dl_enqueue(sess,
! 311: &sdlq, idx, fl, flsz, fdin)) {
! 312: ERRX1(sess, "send_dl_enqueue");
! 313: goto out;
! 314: }
! 315: c = io_read_check(sess, fdin);
! 316: if (c < 0) {
! 317: ERRX1(sess, "io_read_check");
! 318: goto out;
! 319: } else if (c == 0)
! 320: break;
! 321: }
! 322:
! 323: /*
! 324: * One of our local files has been opened (in response
! 325: * to a receiver request) and now we can map it.
! 326: * We'll respond to the event by looking at the map when
! 327: * the writer is available.
! 328: * Here we also enable the poll event for output.
! 329: */
! 330:
! 331: if (pfd[2].revents & POLLIN) {
! 332: assert(up.cur != NULL);
! 333: assert(up.stat.fd != -1);
! 334: assert(up.stat.map == MAP_FAILED);
! 335: assert(up.stat.mapsz == 0);
! 336:
! 337: if (fstat(up.stat.fd, &st) == -1) {
! 338: ERR(sess, "%s: fstat", fl[up.cur->idx].path);
1.1 benno 339: goto out;
340: }
341:
1.8 ! florian 342: /*
! 343: * If the file is zero-length, the map will
! 344: * fail, but either way we want to unset that
! 345: * we're waiting for the file to open.
! 346: * We'll close the descriptor after processing.
! 347: */
! 348:
! 349: if ((up.stat.mapsz = st.st_size) > 0) {
! 350: up.stat.map = mmap(NULL, up.stat.mapsz,
! 351: PROT_READ, MAP_SHARED, up.stat.fd, 0);
! 352: if (up.stat.map == MAP_FAILED) {
! 353: ERR(sess, "%s: mmap", fl[up.cur->idx].path);
! 354: goto out;
! 355: }
! 356: }
! 357: pfd[2].fd = -1;
! 358: pfd[1].fd = fdout;
! 359: }
! 360:
! 361: /*
! 362: * Our outbound is ready to process the current event.
! 363: * This means we've already opened the file and possibly
! 364: * mapped it, and we're ready to send blocks.
! 365: * Do this one block at a time lest we block the channel
! 366: * while read events are coming in.
! 367: */
1.1 benno 368:
1.8 ! florian 369: if (pfd[1].revents & POLLOUT) {
! 370: assert(up.cur != NULL);
! 371: assert(pfd[2].fd == -1);
! 372:
! 373: /*
! 374: * If we receive an invalid index (-1), then we're
! 375: * either promoted to the second phase or it's time to
! 376: * exit, depending upon which phase we're in.
! 377: * Otherwise, we either start a transfer
! 378: * sequence (if not primed) or continue one.
! 379: */
! 380:
! 381: if (up.cur->idx < 0) {
! 382: pfd[1].fd = -1;
! 383: send_up_reset(&up);
! 384: if (!io_write_int(sess, fdout, -1)) {
1.1 benno 385: ERRX1(sess, "io_write_int");
386: goto out;
387: }
388:
1.8 ! florian 389: /* Send superfluous ack. */
! 390:
! 391: if (sess->opts->server && sess->rver > 27 &&
! 392: !io_write_int(sess, fdout, -1)) {
! 393: ERRX1(sess, "io_write_int");
! 394: goto out;
! 395: }
! 396:
! 397: if (phase++)
! 398: break;
! 399: } else if (0 == up.primed) {
! 400: if (!sess->opts->server)
! 401: LOG1(sess, "%s", fl[up.cur->idx].wpath);
1.1 benno 402:
1.8 ! florian 403: /* Dry-running does nothing but a response. */
1.1 benno 404:
1.8 ! florian 405: if (sess->opts->dry_run &&
! 406: !io_write_int(sess, fdout, up.cur->idx)) {
! 407: ERRX1(sess, "io_write_int");
! 408: goto out;
! 409: }
1.1 benno 410:
1.8 ! florian 411: /* Actually perform the block send. */
1.1 benno 412:
1.8 ! florian 413: assert(up.stat.fd != -1);
! 414: if (!blk_recv_ack(sess, fdout,
! 415: up.cur->blks, up.cur->idx)) {
! 416: ERRX1(sess, "blk_recv_ack");
! 417: goto out;
! 418: }
! 419: up.primed = 1;
! 420: } else {
! 421: assert(up.stat.fd != -1);
! 422: c = blk_match(sess, fdout, up.cur->blks,
! 423: fl[up.cur->idx].path, &up.stat);
! 424: if (c < 0) {
! 425: ERRX1(sess, "blk_match");
! 426: goto out;
! 427: } else if (c > 0) {
! 428: send_up_reset(&up);
! 429: pfd[1].fd = -1;
! 430: }
1.1 benno 431: }
432: }
433:
434: /*
1.8 ! florian 435: * Incoming queue management.
! 436: * If we have no queue component that we're waiting on,
! 437: * then pull off the receiver-request queue and start
! 438: * processing the request.
1.1 benno 439: */
440:
1.8 ! florian 441: if (up.cur == NULL) {
! 442: assert(pfd[2].fd == -1);
! 443: assert(up.stat.fd == -1);
! 444: assert(up.stat.map == MAP_FAILED);
! 445: assert(up.stat.mapsz == 0);
! 446:
! 447: if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
! 448: continue;
! 449: TAILQ_REMOVE(&sdlq, up.cur, entries);
! 450:
! 451: /* End of phase: enable channel to receiver. */
! 452:
! 453: if (up.cur->idx == -1) {
! 454: pfd[1].fd = fdout;
! 455: continue;
! 456: }
! 457:
! 458: /* Non-blocking open of file. */
1.1 benno 459:
1.8 ! florian 460: up.stat.fd = open(fl[up.cur->idx].path,
! 461: O_RDONLY|O_NONBLOCK, 0);
! 462: if (up.stat.fd == -1) {
! 463: ERR(sess, "%s: open", fl[up.cur->idx].path);
! 464: goto out;
! 465: }
! 466: pfd[2].fd = up.stat.fd;
1.1 benno 467: }
468: }
469:
1.4 deraadt 470: if (!sess_stats_send(sess, fdout)) {
1.1 benno 471: ERRX1(sess, "sess_stats_end");
472: goto out;
473: }
474:
475: /* Final "goodbye" message. */
476:
1.4 deraadt 477: if (!io_read_int(sess, fdin, &idx)) {
1.1 benno 478: ERRX1(sess, "io_read_int");
479: goto out;
1.5 deraadt 480: } else if (idx != -1) {
1.1 benno 481: ERRX(sess, "read incorrect update complete ack");
482: goto out;
483: }
484:
485: LOG2(sess, "sender finished updating");
486: rc = 1;
487: out:
488: flist_free(fl, flsz);
489: return rc;
490: }