Annotation of src/usr.bin/rsync/sender.c, Revision 1.12
1.12 ! deraadt 1: /* $Id: sender.c,v 1.11 2019/02/16 17:59:33 deraadt Exp $ */
1.1 benno 2: /*
3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4: *
5: * Permission to use, copy, modify, and distribute this software for any
6: * purpose with or without fee is hereby granted, provided that the above
7: * copyright notice and this permission notice appear in all copies.
8: *
9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16: */
1.8 florian 17: #include <sys/mman.h>
18: #include <sys/queue.h>
1.1 benno 19: #include <sys/stat.h>
20:
21: #include <assert.h>
1.8 florian 22: #include <fcntl.h>
1.1 benno 23: #include <inttypes.h>
1.8 florian 24: #include <poll.h>
1.1 benno 25: #include <stdlib.h>
26: #include <string.h>
27: #include <unistd.h>
28:
1.9 florian 29: #include <openssl/md4.h>
30:
1.1 benno 31: #include "extern.h"
32:
33: /*
1.8 florian 34: * A request from the receiver to download updated file data.
35: */
36: struct send_dl {
1.11 deraadt 37: int32_t idx; /* index in our file list */
1.8 florian 38: struct blkset *blks; /* the sender's block information */
39: TAILQ_ENTRY(send_dl) entries;
40: };
41:
42: /*
43: * The current file being "updated": sent from sender to receiver.
44: * If there is no file being uploaded, "cur" is NULL.
45: */
46: struct send_up {
47: struct send_dl *cur; /* file being updated or NULL */
48: struct blkstat stat; /* status of file being updated */
49: int primed; /* blk_recv_ack() was called */
50: };
51:
52: TAILQ_HEAD(send_dlq, send_dl);
53:
54: /*
55: * We have finished updating the receiver's file with sender data.
56: * Deallocate and wipe clean all resources required for that.
57: */
58: static void
59: send_up_reset(struct send_up *p)
60: {
61:
62: assert(NULL != p);
63:
64: /* Free the download request, if applicable. */
65:
66: if (p->cur != NULL) {
67: free(p->cur->blks);
68: free(p->cur);
69: p->cur = NULL;
70: }
71:
72: /* If we mapped a file for scanning, unmap it and close. */
73:
74: if (p->stat.map != MAP_FAILED)
75: munmap(p->stat.map, p->stat.mapsz);
76:
77: p->stat.map = MAP_FAILED;
78: p->stat.mapsz = 0;
79:
80: if (p->stat.fd != -1)
81: close(p->stat.fd);
82:
83: p->stat.fd = -1;
84:
85: /* Now clear the in-transfer information. */
86:
87: p->stat.offs = 0;
88: p->stat.hint = 0;
1.9 florian 89: p->stat.curst = BLKSTAT_NONE;
1.8 florian 90: p->primed = 0;
91: }
92:
93: /*
94: * Enqueue a download request, getting it off the read channel as
95: * quickly a possible.
96: * This frees up the read channel for further incoming requests.
97: * We'll handle each element in turn, up to and including the last
98: * request (phase change), which is always a -1 idx.
99: * Returns zero on failure, non-zero on success.
100: */
101: static int
102: send_dl_enqueue(struct sess *sess, struct send_dlq *q,
103: int32_t idx, const struct flist *fl, size_t flsz, int fd)
104: {
105: struct send_dl *s;
1.11 deraadt 106:
1.8 florian 107: /* End-of-phase marker. */
108:
109: if (idx == -1) {
110: if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
111: ERR(sess, "calloc");
112: return 0;
113: }
114: s->idx = -1;
115: s->blks = NULL;
116: TAILQ_INSERT_TAIL(q, s, entries);
117: return 1;
118: }
119:
120: /* Validate the index. */
1.11 deraadt 121:
1.8 florian 122: if (idx < 0 || (uint32_t)idx >= flsz) {
1.10 florian 123: ERRX(sess, "file index out of bounds: invalid %"
1.8 florian 124: PRId32 " out of %zu", idx, flsz);
125: return 0;
126: } else if (S_ISDIR(fl[idx].st.mode)) {
127: ERRX(sess, "blocks requested for "
128: "directory: %s", fl[idx].path);
129: return 0;
130: } else if (S_ISLNK(fl[idx].st.mode)) {
131: ERRX(sess, "blocks requested for "
132: "symlink: %s", fl[idx].path);
133: return 0;
134: } else if (!S_ISREG(fl[idx].st.mode)) {
135: ERRX(sess, "blocks requested for "
136: "special: %s", fl[idx].path);
137: return 0;
138: }
139:
140: if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
141: ERR(sess, "callloc");
142: return 0;
143: }
144: s->idx = idx;
145: s->blks = NULL;
146: TAILQ_INSERT_TAIL(q, s, entries);
147:
1.10 florian 148: /*
1.8 florian 149: * This blocks til the full blockset has been read.
150: * That's ok, because the most important thing is getting data
151: * off the wire.
152: */
153:
154: if (!sess->opts->dry_run) {
155: s->blks = blk_recv(sess, fd, fl[idx].path);
156: if (s->blks == NULL) {
157: ERRX1(sess, "blk_recv");
158: return 0;
159: }
160: }
161: return 1;
162: }
163:
164: /*
1.1 benno 165: * A client sender manages the read-only source files and sends data to
166: * the receiver as requested.
167: * First it sends its list of files, then it waits for the server to
168: * request updates to individual files.
1.8 florian 169: * It queues requests for updates as soon as it receives them.
1.1 benno 170: * Returns zero on failure, non-zero on success.
171: *
172: * Pledges: stdio, rpath, unveil.
173: */
174: int
175: rsync_sender(struct sess *sess, int fdin,
176: int fdout, size_t argc, char **argv)
177: {
1.9 florian 178: struct flist *fl = NULL;
179: const struct flist *f;
180: size_t i, flsz = 0, phase = 0, excl;
181: off_t sz;
182: int rc = 0, c;
183: int32_t idx;
184: struct pollfd pfd[3];
185: struct send_dlq sdlq;
186: struct send_dl *dl;
187: struct send_up up;
188: struct stat st;
189: unsigned char filemd[MD4_DIGEST_LENGTH];
190: void *wbuf = NULL;
191: size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0;
192: ssize_t ssz;
1.1 benno 193:
1.6 benno 194: if (pledge("stdio getpw rpath unveil", NULL) == -1) {
1.1 benno 195: ERR(sess, "pledge");
196: return 0;
197: }
198:
1.8 florian 199: memset(&up, 0, sizeof(struct send_up));
200: TAILQ_INIT(&sdlq);
201: up.stat.fd = -1;
202: up.stat.map = MAP_FAILED;
203:
1.1 benno 204: /*
205: * Generate the list of files we want to send from our
206: * command-line input.
207: * This will also remove all invalid files.
208: */
209:
1.4 deraadt 210: if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
1.1 benno 211: ERRX1(sess, "flist_gen");
212: goto out;
213: }
214:
215: /* Client sends zero-length exclusions if deleting. */
216:
1.4 deraadt 217: if (!sess->opts->server && sess->opts->del &&
1.7 deraadt 218: !io_write_int(sess, fdout, 0)) {
1.1 benno 219: ERRX1(sess, "io_write_int");
220: goto out;
1.2 benno 221: }
1.1 benno 222:
1.2 benno 223: /*
1.1 benno 224: * Then the file list in any mode.
225: * Finally, the IO error (always zero for us).
226: */
1.2 benno 227:
1.4 deraadt 228: if (!flist_send(sess, fdin, fdout, fl, flsz)) {
1.1 benno 229: ERRX1(sess, "flist_send");
230: goto out;
1.4 deraadt 231: } else if (!io_write_int(sess, fdout, 0)) {
1.1 benno 232: ERRX1(sess, "io_write_int");
233: goto out;
1.2 benno 234: }
1.1 benno 235:
236: /* Exit if we're the server with zero files. */
237:
1.5 deraadt 238: if (flsz == 0 && sess->opts->server) {
1.1 benno 239: WARNX(sess, "sender has empty file list: exiting");
240: rc = 1;
241: goto out;
1.4 deraadt 242: } else if (!sess->opts->server)
1.1 benno 243: LOG1(sess, "Transfer starting: %zu files", flsz);
244:
245: /*
246: * If we're the server, read our exclusion list.
247: * This is always 0 for now.
248: */
249:
250: if (sess->opts->server) {
1.4 deraadt 251: if (!io_read_size(sess, fdin, &excl)) {
1.1 benno 252: ERRX1(sess, "io_read_size");
253: goto out;
1.5 deraadt 254: } else if (excl != 0) {
1.1 benno 255: ERRX1(sess, "exclusion list is non-empty");
256: goto out;
257: }
258: }
259:
1.10 florian 260: /*
1.8 florian 261: * Set up our poll events.
262: * We start by polling only in receiver requests, enabling other
263: * poll events on demand.
1.1 benno 264: */
265:
1.8 florian 266: pfd[0].fd = fdin; /* from receiver */
267: pfd[0].events = POLLIN;
268: pfd[1].fd = -1; /* to receiver */
269: pfd[1].events = POLLOUT;
270: pfd[2].fd = -1; /* from local file */
271: pfd[2].events = POLLIN;
272:
1.1 benno 273: for (;;) {
1.8 florian 274: assert(pfd[0].fd != -1);
275: if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
276: ERR(sess, "poll");
277: goto out;
278: } else if (c == 0) {
279: ERRX(sess, "poll: timeout");
1.1 benno 280: goto out;
281: }
1.8 florian 282: for (i = 0; i < 3; i++)
283: if (pfd[i].revents & (POLLERR|POLLNVAL)) {
284: ERRX(sess, "poll: bad fd");
285: goto out;
286: } else if (pfd[i].revents & POLLHUP) {
287: ERRX(sess, "poll: hangup");
288: goto out;
289: }
290:
1.1 benno 291: /*
1.8 florian 292: * If we have a request coming down off the wire, pull
293: * it in as quickly as possible into our buffer.
294: * This unclogs the socket buffers so the data can flow.
1.9 florian 295: * FIXME: if we're multiplexing, we might stall here if
296: * there's only a log message and no actual data.
297: * This can be fixed by doing a conditional test.
1.1 benno 298: */
299:
1.8 florian 300: if (pfd[0].revents & POLLIN)
301: for (;;) {
302: if (!io_read_int(sess, fdin, &idx)) {
303: ERRX1(sess, "io_read_int");
304: goto out;
1.10 florian 305: }
1.8 florian 306: if (!send_dl_enqueue(sess,
307: &sdlq, idx, fl, flsz, fdin)) {
308: ERRX1(sess, "send_dl_enqueue");
309: goto out;
310: }
311: c = io_read_check(sess, fdin);
312: if (c < 0) {
313: ERRX1(sess, "io_read_check");
314: goto out;
315: } else if (c == 0)
316: break;
317: }
318:
319: /*
1.9 florian 320: * One of our local files has been opened in response
321: * to a receiver request and now we can map it.
1.8 florian 322: * We'll respond to the event by looking at the map when
323: * the writer is available.
324: * Here we also enable the poll event for output.
325: */
326:
327: if (pfd[2].revents & POLLIN) {
328: assert(up.cur != NULL);
329: assert(up.stat.fd != -1);
330: assert(up.stat.map == MAP_FAILED);
331: assert(up.stat.mapsz == 0);
1.9 florian 332: f = &fl[up.cur->idx];
1.8 florian 333:
334: if (fstat(up.stat.fd, &st) == -1) {
1.9 florian 335: ERR(sess, "%s: fstat", f->path);
1.1 benno 336: goto out;
337: }
338:
1.8 florian 339: /*
340: * If the file is zero-length, the map will
341: * fail, but either way we want to unset that
1.9 florian 342: * we're waiting for the file to open and set
343: * that we're ready for the output channel.
1.8 florian 344: */
345:
346: if ((up.stat.mapsz = st.st_size) > 0) {
1.9 florian 347: up.stat.map = mmap(NULL,
348: up.stat.mapsz, PROT_READ,
349: MAP_SHARED, up.stat.fd, 0);
1.8 florian 350: if (up.stat.map == MAP_FAILED) {
1.9 florian 351: ERR(sess, "%s: mmap", f->path);
1.8 florian 352: goto out;
353: }
1.9 florian 354: }
355:
1.8 florian 356: pfd[2].fd = -1;
357: pfd[1].fd = fdout;
358: }
359:
360: /*
1.9 florian 361: * If we have buffers waiting to write, write them out
362: * as soon as we can in a non-blocking fashion.
363: * We must not be waiting for any local files.
364: * ALL WRITES MUST HAPPEN HERE.
365: * This keeps the sender deadlock-free.
1.8 florian 366: */
1.1 benno 367:
1.9 florian 368: if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
369: assert(pfd[2].fd == -1);
370: assert(wbufsz - wbufpos);
371: ssz = write(fdout,
372: wbuf + wbufpos, wbufsz - wbufpos);
373: if (ssz < 0) {
374: ERR(sess, "write");
375: goto out;
376: }
377: wbufpos += ssz;
378: if (wbufpos == wbufsz)
379: wbufpos = wbufsz = 0;
380: pfd[1].revents &= ~POLLOUT;
381:
382: /* This is usually in io.c... */
383:
384: sess->total_write += ssz;
385: }
386:
1.8 florian 387: if (pfd[1].revents & POLLOUT) {
388: assert(pfd[2].fd == -1);
1.9 florian 389: assert(0 == wbufpos && 0 == wbufsz);
1.8 florian 390:
391: /*
1.9 florian 392: * If we have data to write, do it now according
393: * to the data finite state machine.
394: * If we receive an invalid index (-1), then
395: * we're either promoted to the second phase or
396: * it's time to exit, depending upon which phase
397: * we're in.
1.8 florian 398: * Otherwise, we either start a transfer
399: * sequence (if not primed) or continue one.
400: */
401:
1.9 florian 402: pos = 0;
403: if (BLKSTAT_DATA == up.stat.curst) {
404: /*
405: * A data segment to be written: buffer
406: * both the length and the data, then
407: * put is in the token phase.
408: */
409:
1.12 ! deraadt 410: sz = MINIMUM(MAX_CHUNK,
! 411: up.stat.curlen - up.stat.curpos);
1.10 florian 412: if (!io_lowbuffer_alloc(sess, &wbuf,
1.9 florian 413: &wbufsz, &wbufmax, sizeof(int32_t))) {
414: ERRX1(sess, "io_lowbuffer_alloc");
415: goto out;
416: }
417: io_lowbuffer_int(sess,
418: wbuf, &pos, wbufsz, sz);
1.10 florian 419: if (!io_lowbuffer_alloc(sess, &wbuf,
1.9 florian 420: &wbufsz, &wbufmax, sz)) {
421: ERRX1(sess, "io_lowbuffer_alloc");
422: goto out;
423: }
1.10 florian 424: io_lowbuffer_buf(sess, wbuf, &pos, wbufsz,
1.9 florian 425: up.stat.map + up.stat.curpos, sz);
426: up.stat.curpos += sz;
427: if (up.stat.curpos == up.stat.curlen)
428: up.stat.curst = BLKSTAT_TOK;
429: } else if (BLKSTAT_TOK == up.stat.curst) {
430: /*
431: * The data token following (maybe) a
432: * data segment.
433: * These can also come standalone if,
434: * say, the file's being fully written.
435: * It's followed by a hash or another
436: * data segment, depending on the token.
437: */
438:
439: if (!io_lowbuffer_alloc(sess, &wbuf,
440: &wbufsz, &wbufmax, sizeof(int32_t))) {
441: ERRX1(sess, "io_lowbuffer_alloc");
442: goto out;
443: }
444: io_lowbuffer_int(sess, wbuf,
445: &pos, wbufsz, up.stat.curtok);
446: up.stat.curst = up.stat.curtok ?
447: BLKSTAT_NONE : BLKSTAT_HASH;
448: } else if (BLKSTAT_HASH == up.stat.curst) {
449: /*
450: * The hash following transmission of
451: * all file contents.
452: * This is always followed by the state
453: * that we're finished with the file.
454: */
455:
456: hash_file(up.stat.map,
457: up.stat.mapsz, filemd, sess);
1.10 florian 458: if (!io_lowbuffer_alloc(sess, &wbuf,
1.9 florian 459: &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) {
460: ERRX1(sess, "io_lowbuffer_alloc");
461: goto out;
462: }
463: io_lowbuffer_buf(sess, wbuf, &pos,
464: wbufsz, filemd, MD4_DIGEST_LENGTH);
465: up.stat.curst = BLKSTAT_DONE;
466: } else if (BLKSTAT_DONE == up.stat.curst) {
467: /*
468: * The data has been written.
469: * Clear our current send file and allow
470: * the block below to find another.
471: */
472:
473: LOG3(sess, "%s: flushed %jd KB total, "
474: "%.2f%% uploaded",
475: fl[up.cur->idx].path,
1.10 florian 476: (intmax_t)up.stat.total / 1024,
1.9 florian 477: 100.0 * up.stat.dirty / up.stat.total);
1.8 florian 478: send_up_reset(&up);
1.9 florian 479: } else if (NULL != up.cur && up.cur->idx < 0) {
480: /*
481: * We've hit the phase change following
482: * the last file (or start, or prior
483: * phase change).
484: * Simply acknowledge it.
485: * FIXME: use buffering.
486: */
487:
1.8 florian 488: if (!io_write_int(sess, fdout, -1)) {
1.1 benno 489: ERRX1(sess, "io_write_int");
490: goto out;
491: }
1.8 florian 492: if (sess->opts->server && sess->rver > 27 &&
493: !io_write_int(sess, fdout, -1)) {
494: ERRX1(sess, "io_write_int");
495: goto out;
496: }
1.9 florian 497: send_up_reset(&up);
498:
1.10 florian 499: /*
1.9 florian 500: * This is where we actually stop the
501: * algorithm: we're already at the
502: * second phase.
503: */
1.8 florian 504:
505: if (phase++)
506: break;
1.9 florian 507: } else if (NULL != up.cur && 0 == up.primed) {
508: /*
509: * We're getting ready to send the file
510: * contents to the receiver.
511: * FIXME: use buffering.
512: */
513:
1.8 florian 514: if (!sess->opts->server)
515: LOG1(sess, "%s", fl[up.cur->idx].wpath);
1.1 benno 516:
1.8 florian 517: /* Dry-running does nothing but a response. */
1.1 benno 518:
1.8 florian 519: if (sess->opts->dry_run &&
520: !io_write_int(sess, fdout, up.cur->idx)) {
521: ERRX1(sess, "io_write_int");
522: goto out;
523: }
1.1 benno 524:
1.8 florian 525: /* Actually perform the block send. */
1.1 benno 526:
1.8 florian 527: assert(up.stat.fd != -1);
528: if (!blk_recv_ack(sess, fdout,
529: up.cur->blks, up.cur->idx)) {
530: ERRX1(sess, "blk_recv_ack");
531: goto out;
1.10 florian 532: }
1.9 florian 533: LOG3(sess, "%s: primed for %jd B total",
534: fl[up.cur->idx].path,
535: (intmax_t)up.cur->blks->size);
1.8 florian 536: up.primed = 1;
1.9 florian 537: } else if (NULL != up.cur) {
538: /*
539: * Our last case: we need to find the
540: * next block (and token) to transmit to
541: * the receiver.
542: * These will drive the finite state
543: * machine in the first few conditional
544: * blocks of this set.
545: */
546:
1.8 florian 547: assert(up.stat.fd != -1);
1.9 florian 548: blk_match(sess, up.cur->blks,
1.8 florian 549: fl[up.cur->idx].path, &up.stat);
1.1 benno 550: }
551: }
552:
553: /*
1.8 florian 554: * Incoming queue management.
555: * If we have no queue component that we're waiting on,
556: * then pull off the receiver-request queue and start
557: * processing the request.
1.1 benno 558: */
559:
1.8 florian 560: if (up.cur == NULL) {
561: assert(pfd[2].fd == -1);
562: assert(up.stat.fd == -1);
563: assert(up.stat.map == MAP_FAILED);
564: assert(up.stat.mapsz == 0);
1.9 florian 565: assert(wbufsz == 0 && wbufpos == 0);
566: pfd[1].fd = -1;
567:
568: /*
569: * If there's nothing in the queue, then keep
570: * the output channel disabled and wait for
571: * whatever comes next from the reader.
572: */
1.8 florian 573:
574: if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
575: continue;
1.9 florian 576:
1.8 florian 577: TAILQ_REMOVE(&sdlq, up.cur, entries);
578:
1.10 florian 579: /*
1.9 florian 580: * End of phase: enable channel to receiver.
581: * We'll need our output buffer enabled in order
582: * to process this event.
583: */
1.8 florian 584:
585: if (up.cur->idx == -1) {
586: pfd[1].fd = fdout;
587: continue;
588: }
1.11 deraadt 589:
1.10 florian 590: /*
1.9 florian 591: * Non-blocking open of file.
592: * This will be picked up in the state machine
593: * block of not being primed.
594: */
1.1 benno 595:
1.9 florian 596: up.stat.fd = open(fl[up.cur->idx].path,
1.8 florian 597: O_RDONLY|O_NONBLOCK, 0);
598: if (up.stat.fd == -1) {
599: ERR(sess, "%s: open", fl[up.cur->idx].path);
600: goto out;
601: }
602: pfd[2].fd = up.stat.fd;
1.1 benno 603: }
604: }
605:
1.9 florian 606: if (!TAILQ_EMPTY(&sdlq)) {
607: ERRX(sess, "phases complete with files still queued");
608: goto out;
609: }
610:
1.4 deraadt 611: if (!sess_stats_send(sess, fdout)) {
1.1 benno 612: ERRX1(sess, "sess_stats_end");
613: goto out;
614: }
615:
616: /* Final "goodbye" message. */
617:
1.4 deraadt 618: if (!io_read_int(sess, fdin, &idx)) {
1.1 benno 619: ERRX1(sess, "io_read_int");
620: goto out;
1.5 deraadt 621: } else if (idx != -1) {
1.1 benno 622: ERRX(sess, "read incorrect update complete ack");
623: goto out;
624: }
625:
626: LOG2(sess, "sender finished updating");
627: rc = 1;
628: out:
1.9 florian 629: send_up_reset(&up);
630: while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
631: free(dl->blks);
632: free(dl);
633: }
1.1 benno 634: flist_free(fl, flsz);
1.9 florian 635: free(wbuf);
1.1 benno 636: return rc;
637: }