Annotation of src/usr.bin/rsync/downloader.c, Revision 1.2
1.1 benno 1: /* $Id$ */
2: /*
3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4: *
5: * Permission to use, copy, modify, and distribute this software for any
6: * purpose with or without fee is hereby granted, provided that the above
7: * copyright notice and this permission notice appear in all copies.
8: *
9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16: */
17: #include <sys/mman.h>
18: #include <sys/stat.h>
19:
20: #include <assert.h>
21: #include <errno.h>
22: #include <fcntl.h>
23: #include <inttypes.h>
24: #include <math.h>
25: #include <poll.h>
26: #include <stdio.h>
27: #include <stdlib.h>
28: #include <string.h>
29: #include <time.h>
30: #include <unistd.h>
31:
32: #include "extern.h"
33: #include "md4.h"
34:
35: /*
36: * A small optimisation: have a 1 MB pre-write buffer.
37: * Disable the pre-write buffer by having this be zero.
38: * (It doesn't affect performance much.)
39: */
40: #define OBUF_SIZE (1024 * 1024)
41:
42: enum downloadst {
43: DOWNLOAD_READ_NEXT = 0,
44: DOWNLOAD_READ_LOCAL,
45: DOWNLOAD_READ_REMOTE
46: };
47:
48: /*
49: * Like struct upload, but used to keep track of what we're downloading.
50: * This also is managed by the receiver process.
51: */
52: struct download {
53: enum downloadst state; /* state of affairs */
1.2 ! benno 54: size_t idx; /* index of current file */
1.1 benno 55: struct blkset blk; /* its blocks */
56: void *map; /* mmap of current file */
57: size_t mapsz; /* length of mapsz */
58: int ofd; /* open origin file */
59: int fd; /* open output file */
60: char *fname; /* output filename */
1.2 ! benno 61: MD4_CTX ctx; /* current hashing context */
1.1 benno 62: off_t downloaded; /* total downloaded */
63: off_t total; /* total in file */
64: const struct flist *fl; /* file list */
65: size_t flsz; /* size of file list */
66: int rootfd; /* destination directory */
67: int fdin; /* read descriptor from sender */
68: char *obuf; /* pre-write buffer */
69: size_t obufsz; /* current size of obuf */
70: size_t obufmax; /* max size we'll wbuffer */
71: };
72:
73:
74: /*
75: * Simply log the filename.
76: */
77: static void
1.2 ! benno 78: log_file(struct sess *sess,
1.1 benno 79: const struct download *dl, const struct flist *f)
80: {
81: float frac, tot = dl->total;
82: int prec = 0;
83: const char *unit = "B";
84:
85: if (sess->opts->server)
86: return;
87:
1.2 ! benno 88: frac = 0 == dl->total ? 100.0 :
1.1 benno 89: 100.0 * dl->downloaded / dl->total;
90:
91: if (dl->total > 1024 * 1024 * 1024) {
92: tot = dl->total / (1024. * 1024. * 1024.);
93: prec = 3;
94: unit = "GB";
95: } else if (dl->total > 1024 * 1024) {
96: tot = dl->total / (1024. * 1024.);
97: prec = 2;
98: unit = "MB";
99: } else if (dl->total > 1024) {
100: tot = dl->total / 1024.;
101: prec = 1;
102: unit = "KB";
103: }
104:
1.2 ! benno 105: LOG1(sess, "%s (%.*f %s, %.1f%% downloaded)",
1.1 benno 106: f->path, prec, tot, unit, frac);
107: }
108:
109: /*
110: * Reinitialise a download context w/o overwriting the persistent parts
111: * of the structure (like p->fl or p->flsz) for index "idx".
112: * The MD4 context is pre-seeded.
113: */
114: static void
115: download_reinit(struct sess *sess, struct download *p, size_t idx)
116: {
117: int32_t seed = htole32(sess->seed);
118:
119: assert(DOWNLOAD_READ_NEXT == p->state);
120:
121: p->idx = idx;
122: memset(&p->blk, 0, sizeof(struct blkset));
123: p->map = MAP_FAILED;
124: p->mapsz = 0;
125: p->ofd = -1;
126: p->fd = -1;
127: p->fname = NULL;
128: MD4_Init(&p->ctx);
129: p->downloaded = p->total = 0;
130: /* Don't touch p->fl. */
131: /* Don't touch p->flsz. */
132: /* Don't touch p->rootfd. */
133: /* Don't touch p->fdin. */
134: MD4_Update(&p->ctx, &seed, sizeof(int32_t));
135: }
136:
137: /*
138: * Free a download context.
139: * If "cleanup" is non-zero, we also try to clean up the temporary file,
140: * assuming that it has been opened in p->fd.
141: */
142: static void
143: download_cleanup(struct download *p, int cleanup)
144: {
145:
146: if (MAP_FAILED != p->map) {
147: assert(p->mapsz);
148: munmap(p->map, p->mapsz);
149: p->map = MAP_FAILED;
150: p->mapsz = 0;
151: }
152: if (-1 != p->ofd) {
153: close(p->ofd);
154: p->ofd = -1;
155: }
156: if (-1 != p->fd) {
157: close(p->fd);
1.2 ! benno 158: if (cleanup && NULL != p->fname)
1.1 benno 159: unlinkat(p->rootfd, p->fname, 0);
160: p->fd = -1;
161: }
162: free(p->fname);
163: p->fname = NULL;
164: p->state = DOWNLOAD_READ_NEXT;
165: }
166:
167: /*
168: * Initial allocation of the download object using the file list "fl" of
169: * size "flsz", the destination "rootfd", and the sender read "fdin".
170: * Returns NULL on allocation failure.
171: * On success, download_free() must be called with the pointer.
172: */
173: struct download *
1.2 ! benno 174: download_alloc(struct sess *sess, int fdin,
1.1 benno 175: const struct flist *fl, size_t flsz, int rootfd)
176: {
177: struct download *p;
178:
179: if (NULL == (p = malloc(sizeof(struct download)))) {
180: ERR(sess, "malloc");
181: return NULL;
182: }
183:
184: p->state = DOWNLOAD_READ_NEXT;
185: p->fl = fl;
186: p->flsz = flsz;
187: p->rootfd = rootfd;
188: p->fdin = fdin;
189: download_reinit(sess, p, 0);
190: p->obufsz = 0;
191: p->obuf = NULL;
192: p->obufmax = OBUF_SIZE;
193: if (p->obufmax &&
1.2 ! benno 194: NULL == (p->obuf = malloc(p->obufmax))) {
1.1 benno 195: ERR(sess, "malloc");
196: free(p);
197: return NULL;
198: }
199: return p;
200: }
201:
202: /*
203: * Perform all cleanups (including removing stray files) and free.
204: * Passing a NULL to this function is ok.
205: */
206: void
207: download_free(struct download *p)
208: {
209:
210: if (NULL == p)
211: return;
212: download_cleanup(p, 1);
213: free(p->obuf);
214: free(p);
215: }
216:
217: /*
218: * Optimisation: instead of dumping directly into the output file, keep
219: * a buffer and write as much as we can into the buffer.
220: * That way, we can avoid calling write() too much, and instead call it
221: * with big buffers.
222: * To flush the buffer w/o changing it, pass 0 as "sz".
223: * Returns zero on failure, non-zero on success.
224: */
225: static int
1.2 ! benno 226: buf_copy(struct sess *sess,
1.1 benno 227: const char *buf, size_t sz, struct download *p)
228: {
229: size_t rem, tocopy;
230: ssize_t ssz;
231:
232: assert(p->obufsz <= p->obufmax);
233:
1.2 ! benno 234: /*
1.1 benno 235: * Copy as much as we can.
236: * If we've copied everything, exit.
237: * If we have no pre-write buffer (obufmax of zero), this never
238: * gets called, so we never buffer anything.
239: */
240:
241: if (sz && p->obufsz < p->obufmax) {
242: assert(NULL != p->obuf);
243: rem = p->obufmax - p->obufsz;
244: assert(rem > 0);
245: tocopy = rem < sz ? rem : sz;
246: memcpy(p->obuf + p->obufsz, buf, tocopy);
247: sz -= tocopy;
248: buf += tocopy;
249: p->obufsz += tocopy;
250: assert(p->obufsz <= p->obufmax);
251: if (0 == sz)
252: return 1;
253: }
254:
255: /* Drain the main buffer. */
256:
257: if (p->obufsz) {
258: assert(p->obufmax);
259: assert(p->obufsz <= p->obufmax);
260: assert(NULL != p->obuf);
261: if ((ssz = write(p->fd, p->obuf, p->obufsz)) < 0) {
262: ERR(sess, "%s: write", p->fname);
263: return 0;
264: } else if ((size_t)ssz != p->obufsz) {
265: ERRX(sess, "%s: short write", p->fname);
266: return 0;
267: }
268: p->obufsz = 0;
269: }
270:
1.2 ! benno 271: /*
1.1 benno 272: * Now drain anything left.
273: * If we have no pre-write buffer, this is it.
274: */
275:
276: if (sz) {
277: if ((ssz = write(p->fd, buf, sz)) < 0) {
278: ERR(sess, "%s: write", p->fname);
279: return 0;
280: } else if ((size_t)ssz != sz) {
281: ERRX(sess, "%s: short write", p->fname);
282: return 0;
283: }
284: }
285: return 1;
286: }
287:
288: /*
289: * The downloader waits on a file the sender is going to give us, opens
290: * and mmaps the existing file, opens a temporary file, dumps the file
291: * (or metadata) into the temporary file, then renames.
292: * This happens in several possible phases to avoid blocking.
293: * Returns <0 on failure, 0 on no more data (end of phase), >0 on
294: * success (more data to be read from the sender).
295: */
296: int
297: rsync_downloader(struct download *p, struct sess *sess, int *ofd)
298: {
299: int32_t idx, rawtok;
300: uint32_t hash;
301: const struct flist *f;
302: size_t sz, dirlen, tok;
303: const char *cp;
304: mode_t perm;
1.2 ! benno 305: struct stat st;
1.1 benno 306: char *buf = NULL;
1.2 ! benno 307: unsigned char ourmd[MD4_DIGEST_LENGTH],
1.1 benno 308: md[MD4_DIGEST_LENGTH];
309: struct timespec tv[2];
310:
311: /*
312: * If we don't have a download already in session, then the next
313: * one is coming in.
314: * Read either the stop (phase) signal from the sender or block
315: * metadata, in which case we open our file and wait for data.
316: */
317:
318: if (DOWNLOAD_READ_NEXT == p->state) {
319: if ( ! io_read_int(sess, p->fdin, &idx)) {
320: ERRX1(sess, "io_read_int");
321: return -1;
322: } else if (idx >= 0 && (size_t)idx >= p->flsz) {
323: ERRX(sess, "index out of bounds");
324: return -1;
325: } else if (idx < 0) {
326: LOG3(sess, "downloader: phase complete");
327: return 0;
328: }
329:
330: /* Short-circuit: dry_run mode does nothing. */
331:
332: if (sess->opts->dry_run)
333: return 1;
334:
1.2 ! benno 335: /*
1.1 benno 336: * Now get our block information.
337: * This is all we'll need to reconstruct the file from
338: * the map, as block sizes are regular.
339: */
340:
341: download_reinit(sess, p, idx);
342: if ( ! blk_send_ack(sess, p->fdin, &p->blk)) {
343: ERRX1(sess, "blk_send_ack");
344: goto out;
345: }
346:
1.2 ! benno 347: /*
1.1 benno 348: * Next, we want to open the existing file for using as
349: * block input.
350: * We do this in a non-blocking way, so if the open
351: * succeeds, then we'll go reentrant til the file is
352: * readable and we can mmap() it.
353: * Set the file descriptor that we want to wait for.
354: */
355:
356: p->state = DOWNLOAD_READ_LOCAL;
357: f = &p->fl[idx];
1.2 ! benno 358: p->ofd = openat(p->rootfd, f->path,
1.1 benno 359: O_RDONLY | O_NONBLOCK, 0);
360:
361: if (-1 == p->ofd && ENOENT != errno) {
362: ERR(sess, "%s: openat", f->path);
363: goto out;
364: } else if (-1 != p->ofd) {
365: *ofd = p->ofd;
366: return 1;
367: }
368:
369: /* Fall-through: there's no file. */
370: }
371:
372: /*
373: * At this point, the server is sending us data and we want to
374: * hoover it up as quickly as possible or we'll deadlock.
375: * We want to be pulling off of f->fdin as quickly as possible,
376: * so perform as much buffering as we can.
377: */
378:
379: f = &p->fl[p->idx];
380:
381: /*
382: * Next in sequence: we have an open download session but
383: * haven't created our temporary file.
384: * This means that we've already opened (or tried to open) the
385: * original file in a nonblocking way, and we can map it.
386: */
387:
388: if (DOWNLOAD_READ_LOCAL == p->state) {
389: assert(NULL == p->fname);
390:
1.2 ! benno 391: /*
1.1 benno 392: * Try to fstat() the file descriptor if valid and make
393: * sure that we're still a regular file.
394: * Then, if it has non-zero size, mmap() it for hashing.
395: */
396:
397: if (-1 != p->ofd &&
398: -1 == fstat(p->ofd, &st)) {
399: ERR(sess, "%s: fstat", f->path);
400: goto out;
401: } else if (-1 != p->ofd && ! S_ISREG(st.st_mode)) {
402: WARNX(sess, "%s: not regular", f->path);
403: goto out;
404: }
405:
406: if (-1 != p->ofd && st.st_size > 0) {
407: p->mapsz = st.st_size;
1.2 ! benno 408: p->map = mmap(NULL, p->mapsz,
1.1 benno 409: PROT_READ, MAP_SHARED, p->ofd, 0);
410: if (MAP_FAILED == p->map) {
411: ERR(sess, "%s: mmap", f->path);
412: goto out;
413: }
414: }
415:
416: /* Success either way: we don't need this. */
417:
418: *ofd = -1;
419:
1.2 ! benno 420: /*
1.1 benno 421: * Create the temporary file.
422: * Use a simple scheme of path/.FILE.RANDOM, where we
423: * fill in RANDOM with an arc4random number.
424: * The tricky part is getting into the directory if
425: * we're in recursive mode.
426: */
427:
428: hash = arc4random();
429: if (sess->opts->recursive &&
430: NULL != (cp = strrchr(f->path, '/'))) {
431: dirlen = cp - f->path;
432: if (asprintf(&p->fname, "%.*s/.%s.%" PRIu32,
433: (int)dirlen, f->path,
434: f->path + dirlen + 1, hash) < 0)
435: p->fname = NULL;
436: } else {
1.2 ! benno 437: if (asprintf(&p->fname, ".%s.%" PRIu32,
1.1 benno 438: f->path, hash) < 0)
439: p->fname = NULL;
440: }
441: if (NULL == p->fname) {
442: ERR(sess, "asprintf");
443: goto out;
444: }
445:
1.2 ! benno 446: /*
1.1 benno 447: * Inherit permissions from the source file if we're new
448: * or specifically told with -p.
449: */
450:
451: if ( ! sess->opts->preserve_perms)
452: perm = -1 == p->ofd ? f->st.mode : st.st_mode;
453: else
454: perm = f->st.mode;
455:
1.2 ! benno 456: p->fd = openat(p->rootfd, p->fname,
1.1 benno 457: O_APPEND|O_WRONLY|O_CREAT|O_EXCL, perm);
458:
459: if (-1 == p->fd) {
460: ERR(sess, "%s: openat", p->fname);
461: goto out;
462: }
463:
1.2 ! benno 464: /*
1.1 benno 465: * FIXME: we can technically wait until the temporary
466: * file is writable, but since it's guaranteed to be
467: * empty, I don't think this is a terribly expensive
468: * operation as it doesn't involve reading the file into
469: * memory beforehand.
470: */
471:
472: LOG3(sess, "%s: temporary: %s", f->path, p->fname);
473: p->state = DOWNLOAD_READ_REMOTE;
474: return 1;
475: }
476:
477: /*
478: * This matches the sequence in blk_flush().
479: * If we've gotten here, then we have a possibly-open map file
480: * (not for new files) and our temporary file is writable.
481: * We read the size/token, then optionally the data.
482: * The size >0 for reading data, 0 for no more data, and <0 for
483: * a token indicator.
484: */
485:
486: assert(DOWNLOAD_READ_REMOTE == p->state);
487: assert(NULL != p->fname);
488: assert(-1 != p->fd);
489: assert(-1 != p->fdin);
490:
491: if ( ! io_read_int(sess, p->fdin, &rawtok)) {
492: ERRX1(sess, "io_read_int");
493: goto out;
1.2 ! benno 494: }
1.1 benno 495:
496: if (rawtok > 0) {
497: sz = rawtok;
498: if (NULL == (buf = malloc(sz))) {
499: ERR(sess, "realloc");
500: goto out;
501: }
502: if ( ! io_read_buf(sess, p->fdin, buf, sz)) {
503: ERRX1(sess, "io_read_int");
504: goto out;
505: } else if ( ! buf_copy(sess, buf, sz, p)) {
506: ERRX1(sess, "buf_copy");
507: goto out;
508: }
509: p->total += sz;
510: p->downloaded += sz;
511: LOG4(sess, "%s: received %zu B block", p->fname, sz);
512: MD4_Update(&p->ctx, buf, sz);
513: free(buf);
514: return 1;
515: } else if (rawtok < 0) {
516: tok = -rawtok - 1;
517: if (tok >= p->blk.blksz) {
518: ERRX(sess, "%s: token not in block "
1.2 ! benno 519: "set: %zu (have %zu blocks)",
1.1 benno 520: p->fname, tok, p->blk.blksz);
521: goto out;
522: }
523: sz = tok == p->blk.blksz - 1 ? p->blk.rem : p->blk.len;
524: assert(sz);
525: assert(MAP_FAILED != p->map);
526: buf = p->map + (tok * p->blk.len);
527:
528: /*
529: * Now we read from our block.
530: * We should only be at this point if we have a
531: * block to read from, i.e., if we were able to
532: * map our origin file and create a block
533: * profile from it.
534: */
535:
536: assert(MAP_FAILED != p->map);
537: if ( ! buf_copy(sess, buf, sz, p)) {
538: ERRX1(sess, "buf_copy");
539: goto out;
540: }
541: p->total += sz;
542: LOG4(sess, "%s: copied %zu B", p->fname, sz);
543: MD4_Update(&p->ctx, buf, sz);
544: return 1;
545: }
546:
547: if ( ! buf_copy(sess, NULL, 0, p)) {
548: ERRX1(sess, "buf_copy");
549: goto out;
550: }
551:
552: assert(0 == rawtok);
553: assert(0 == p->obufsz);
554:
1.2 ! benno 555: /*
1.1 benno 556: * Make sure our resulting MD4 hashes match.
557: * FIXME: if the MD4 hashes don't match, then our file has
558: * changed out from under us.
559: * This should require us to re-run the sequence in another
560: * phase.
561: */
562:
563: MD4_Final(ourmd, &p->ctx);
564:
565: if ( ! io_read_buf(sess, p->fdin, md, MD4_DIGEST_LENGTH)) {
566: ERRX1(sess, "io_read_buf");
567: goto out;
568: } else if (memcmp(md, ourmd, MD4_DIGEST_LENGTH)) {
569: ERRX(sess, "%s: hash does not match", p->fname);
570: goto out;
571: }
572:
573: /* Conditionally adjust file modification time. */
574:
575: if (sess->opts->preserve_times) {
576: tv[0].tv_sec = time(NULL);
577: tv[0].tv_nsec = 0;
578: tv[1].tv_sec = f->st.mtime;
579: tv[1].tv_nsec = 0;
580: if (-1 == futimens(p->fd, tv)) {
581: ERR(sess, "%s: futimens", p->fname);
582: goto out;
583: }
584: LOG4(sess, "%s: updated date", f->path);
585: }
586:
587: /* Finally, rename the temporary to the real file. */
588:
589: if (-1 == renameat(p->rootfd, p->fname, p->rootfd, f->path)) {
590: ERR(sess, "%s: renameat: %s", p->fname, f->path);
591: goto out;
592: }
593:
594: log_file(sess, p, f);
595: download_cleanup(p, 0);
596: return 1;
597: out:
598: download_cleanup(p, 1);
599: return -1;
600: }