Annotation of src/usr.bin/rsync/downloader.c, Revision 1.21
1.21 ! benno 1: /* $Id: downloader.c,v 1.20 2019/05/08 20:00:25 benno Exp $ */
1.1 benno 2: /*
3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4: *
5: * Permission to use, copy, modify, and distribute this software for any
6: * purpose with or without fee is hereby granted, provided that the above
7: * copyright notice and this permission notice appear in all copies.
8: *
9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16: */
17: #include <sys/mman.h>
18: #include <sys/stat.h>
19:
20: #include <assert.h>
21: #include <errno.h>
22: #include <fcntl.h>
23: #include <inttypes.h>
24: #include <math.h>
25: #include <stdio.h>
26: #include <stdlib.h>
27: #include <string.h>
28: #include <time.h>
29: #include <unistd.h>
30:
1.8 tb 31: #include <openssl/md4.h>
32:
1.1 benno 33: #include "extern.h"
34:
35: /*
36: * A small optimisation: have a 1 MB pre-write buffer.
37: * Disable the pre-write buffer by having this be zero.
38: * (It doesn't affect performance much.)
39: */
40: #define OBUF_SIZE (1024 * 1024)
41:
42: enum downloadst {
43: DOWNLOAD_READ_NEXT = 0,
44: DOWNLOAD_READ_LOCAL,
45: DOWNLOAD_READ_REMOTE
46: };
47:
48: /*
49: * Like struct upload, but used to keep track of what we're downloading.
50: * This also is managed by the receiver process.
51: */
52: struct download {
53: enum downloadst state; /* state of affairs */
1.2 benno 54: size_t idx; /* index of current file */
1.1 benno 55: struct blkset blk; /* its blocks */
56: void *map; /* mmap of current file */
57: size_t mapsz; /* length of mapsz */
58: int ofd; /* open origin file */
59: int fd; /* open output file */
60: char *fname; /* output filename */
1.2 benno 61: MD4_CTX ctx; /* current hashing context */
1.1 benno 62: off_t downloaded; /* total downloaded */
63: off_t total; /* total in file */
64: const struct flist *fl; /* file list */
65: size_t flsz; /* size of file list */
66: int rootfd; /* destination directory */
67: int fdin; /* read descriptor from sender */
68: char *obuf; /* pre-write buffer */
69: size_t obufsz; /* current size of obuf */
70: size_t obufmax; /* max size we'll wbuffer */
71: };
72:
73:
74: /*
75: * Simply log the filename.
76: */
77: static void
1.2 benno 78: log_file(struct sess *sess,
1.1 benno 79: const struct download *dl, const struct flist *f)
80: {
81: float frac, tot = dl->total;
82: int prec = 0;
83: const char *unit = "B";
84:
85: if (sess->opts->server)
86: return;
87:
1.14 deraadt 88: frac = (dl->total == 0) ? 100.0 :
1.1 benno 89: 100.0 * dl->downloaded / dl->total;
90:
91: if (dl->total > 1024 * 1024 * 1024) {
92: tot = dl->total / (1024. * 1024. * 1024.);
93: prec = 3;
94: unit = "GB";
95: } else if (dl->total > 1024 * 1024) {
96: tot = dl->total / (1024. * 1024.);
97: prec = 2;
98: unit = "MB";
99: } else if (dl->total > 1024) {
100: tot = dl->total / 1024.;
101: prec = 1;
102: unit = "KB";
103: }
104:
1.20 benno 105: LOG1("%s (%.*f %s, %.1f%% downloaded)",
1.18 deraadt 106: f->path, prec, tot, unit, frac);
1.1 benno 107: }
108:
109: /*
110: * Reinitialise a download context w/o overwriting the persistent parts
111: * of the structure (like p->fl or p->flsz) for index "idx".
112: * The MD4 context is pre-seeded.
113: */
114: static void
115: download_reinit(struct sess *sess, struct download *p, size_t idx)
116: {
117: int32_t seed = htole32(sess->seed);
118:
1.4 deraadt 119: assert(p->state == DOWNLOAD_READ_NEXT);
1.1 benno 120:
121: p->idx = idx;
122: memset(&p->blk, 0, sizeof(struct blkset));
123: p->map = MAP_FAILED;
124: p->mapsz = 0;
125: p->ofd = -1;
126: p->fd = -1;
127: p->fname = NULL;
128: MD4_Init(&p->ctx);
129: p->downloaded = p->total = 0;
130: /* Don't touch p->fl. */
131: /* Don't touch p->flsz. */
132: /* Don't touch p->rootfd. */
133: /* Don't touch p->fdin. */
134: MD4_Update(&p->ctx, &seed, sizeof(int32_t));
135: }
136:
137: /*
138: * Free a download context.
139: * If "cleanup" is non-zero, we also try to clean up the temporary file,
140: * assuming that it has been opened in p->fd.
141: */
142: static void
143: download_cleanup(struct download *p, int cleanup)
144: {
145:
1.4 deraadt 146: if (p->map != MAP_FAILED) {
1.1 benno 147: assert(p->mapsz);
148: munmap(p->map, p->mapsz);
149: p->map = MAP_FAILED;
150: p->mapsz = 0;
151: }
1.4 deraadt 152: if (p->ofd != -1) {
1.1 benno 153: close(p->ofd);
154: p->ofd = -1;
155: }
1.4 deraadt 156: if (p->fd != -1) {
1.1 benno 157: close(p->fd);
1.4 deraadt 158: if (cleanup && p->fname != NULL)
1.1 benno 159: unlinkat(p->rootfd, p->fname, 0);
160: p->fd = -1;
161: }
162: free(p->fname);
163: p->fname = NULL;
164: p->state = DOWNLOAD_READ_NEXT;
165: }
166:
167: /*
168: * Initial allocation of the download object using the file list "fl" of
169: * size "flsz", the destination "rootfd", and the sender read "fdin".
170: * Returns NULL on allocation failure.
171: * On success, download_free() must be called with the pointer.
172: */
173: struct download *
1.2 benno 174: download_alloc(struct sess *sess, int fdin,
1.1 benno 175: const struct flist *fl, size_t flsz, int rootfd)
176: {
177: struct download *p;
178:
1.4 deraadt 179: if ((p = malloc(sizeof(struct download))) == NULL) {
1.20 benno 180: ERR("malloc");
1.1 benno 181: return NULL;
182: }
183:
184: p->state = DOWNLOAD_READ_NEXT;
185: p->fl = fl;
186: p->flsz = flsz;
187: p->rootfd = rootfd;
188: p->fdin = fdin;
189: download_reinit(sess, p, 0);
190: p->obufsz = 0;
191: p->obuf = NULL;
192: p->obufmax = OBUF_SIZE;
1.4 deraadt 193: if (p->obufmax && (p->obuf = malloc(p->obufmax)) == NULL) {
1.20 benno 194: ERR("malloc");
1.1 benno 195: free(p);
196: return NULL;
197: }
198: return p;
199: }
200:
201: /*
202: * Perform all cleanups (including removing stray files) and free.
203: * Passing a NULL to this function is ok.
204: */
205: void
206: download_free(struct download *p)
207: {
208:
1.4 deraadt 209: if (p == NULL)
1.1 benno 210: return;
211: download_cleanup(p, 1);
212: free(p->obuf);
213: free(p);
214: }
215:
216: /*
217: * Optimisation: instead of dumping directly into the output file, keep
218: * a buffer and write as much as we can into the buffer.
219: * That way, we can avoid calling write() too much, and instead call it
220: * with big buffers.
221: * To flush the buffer w/o changing it, pass 0 as "sz".
222: * Returns zero on failure, non-zero on success.
223: */
224: static int
1.21 ! benno 225: buf_copy(const char *buf, size_t sz, struct download *p)
1.1 benno 226: {
227: size_t rem, tocopy;
228: ssize_t ssz;
229:
230: assert(p->obufsz <= p->obufmax);
231:
1.2 benno 232: /*
1.1 benno 233: * Copy as much as we can.
234: * If we've copied everything, exit.
235: * If we have no pre-write buffer (obufmax of zero), this never
236: * gets called, so we never buffer anything.
237: */
238:
239: if (sz && p->obufsz < p->obufmax) {
1.4 deraadt 240: assert(p->obuf != NULL);
1.1 benno 241: rem = p->obufmax - p->obufsz;
242: assert(rem > 0);
243: tocopy = rem < sz ? rem : sz;
244: memcpy(p->obuf + p->obufsz, buf, tocopy);
245: sz -= tocopy;
246: buf += tocopy;
247: p->obufsz += tocopy;
248: assert(p->obufsz <= p->obufmax);
1.4 deraadt 249: if (sz == 0)
1.1 benno 250: return 1;
251: }
252:
253: /* Drain the main buffer. */
254:
255: if (p->obufsz) {
256: assert(p->obufmax);
257: assert(p->obufsz <= p->obufmax);
1.4 deraadt 258: assert(p->obuf != NULL);
1.1 benno 259: if ((ssz = write(p->fd, p->obuf, p->obufsz)) < 0) {
1.20 benno 260: ERR("%s: write", p->fname);
1.1 benno 261: return 0;
262: } else if ((size_t)ssz != p->obufsz) {
1.20 benno 263: ERRX("%s: short write", p->fname);
1.1 benno 264: return 0;
265: }
266: p->obufsz = 0;
267: }
268:
1.2 benno 269: /*
1.1 benno 270: * Now drain anything left.
271: * If we have no pre-write buffer, this is it.
272: */
273:
274: if (sz) {
275: if ((ssz = write(p->fd, buf, sz)) < 0) {
1.20 benno 276: ERR("%s: write", p->fname);
1.1 benno 277: return 0;
278: } else if ((size_t)ssz != sz) {
1.20 benno 279: ERRX("%s: short write", p->fname);
1.1 benno 280: return 0;
281: }
282: }
283: return 1;
284: }
285:
286: /*
287: * The downloader waits on a file the sender is going to give us, opens
288: * and mmaps the existing file, opens a temporary file, dumps the file
289: * (or metadata) into the temporary file, then renames.
290: * This happens in several possible phases to avoid blocking.
291: * Returns <0 on failure, 0 on no more data (end of phase), >0 on
292: * success (more data to be read from the sender).
293: */
294: int
295: rsync_downloader(struct download *p, struct sess *sess, int *ofd)
296: {
1.12 florian 297: int c;
1.1 benno 298: int32_t idx, rawtok;
299: const struct flist *f;
1.10 florian 300: size_t sz, tok;
1.2 benno 301: struct stat st;
1.1 benno 302: char *buf = NULL;
1.2 benno 303: unsigned char ourmd[MD4_DIGEST_LENGTH],
1.1 benno 304: md[MD4_DIGEST_LENGTH];
305:
306: /*
307: * If we don't have a download already in session, then the next
308: * one is coming in.
309: * Read either the stop (phase) signal from the sender or block
310: * metadata, in which case we open our file and wait for data.
311: */
312:
1.4 deraadt 313: if (p->state == DOWNLOAD_READ_NEXT) {
1.3 deraadt 314: if (!io_read_int(sess, p->fdin, &idx)) {
1.20 benno 315: ERRX1("io_read_int");
1.1 benno 316: return -1;
317: } else if (idx >= 0 && (size_t)idx >= p->flsz) {
1.20 benno 318: ERRX("index out of bounds");
1.1 benno 319: return -1;
320: } else if (idx < 0) {
1.20 benno 321: LOG3("downloader: phase complete");
1.1 benno 322: return 0;
323: }
324:
325: /* Short-circuit: dry_run mode does nothing. */
326:
327: if (sess->opts->dry_run)
328: return 1;
329:
1.2 benno 330: /*
1.1 benno 331: * Now get our block information.
332: * This is all we'll need to reconstruct the file from
333: * the map, as block sizes are regular.
334: */
335:
336: download_reinit(sess, p, idx);
1.3 deraadt 337: if (!blk_send_ack(sess, p->fdin, &p->blk)) {
1.20 benno 338: ERRX1("blk_send_ack");
1.1 benno 339: goto out;
340: }
341:
1.2 benno 342: /*
1.1 benno 343: * Next, we want to open the existing file for using as
344: * block input.
345: * We do this in a non-blocking way, so if the open
346: * succeeds, then we'll go reentrant til the file is
347: * readable and we can mmap() it.
348: * Set the file descriptor that we want to wait for.
349: */
350:
351: p->state = DOWNLOAD_READ_LOCAL;
352: f = &p->fl[idx];
1.4 deraadt 353: p->ofd = openat(p->rootfd, f->path, O_RDONLY | O_NONBLOCK, 0);
1.1 benno 354:
1.4 deraadt 355: if (p->ofd == -1 && errno != ENOENT) {
1.20 benno 356: ERR("%s: openat", f->path);
1.1 benno 357: goto out;
1.4 deraadt 358: } else if (p->ofd != -1) {
1.1 benno 359: *ofd = p->ofd;
360: return 1;
361: }
362:
363: /* Fall-through: there's no file. */
364: }
365:
366: /*
367: * At this point, the server is sending us data and we want to
368: * hoover it up as quickly as possible or we'll deadlock.
369: * We want to be pulling off of f->fdin as quickly as possible,
370: * so perform as much buffering as we can.
371: */
372:
373: f = &p->fl[p->idx];
374:
375: /*
376: * Next in sequence: we have an open download session but
377: * haven't created our temporary file.
378: * This means that we've already opened (or tried to open) the
379: * original file in a nonblocking way, and we can map it.
380: */
381:
1.4 deraadt 382: if (p->state == DOWNLOAD_READ_LOCAL) {
383: assert(p->fname == NULL);
1.1 benno 384:
1.2 benno 385: /*
1.1 benno 386: * Try to fstat() the file descriptor if valid and make
387: * sure that we're still a regular file.
388: * Then, if it has non-zero size, mmap() it for hashing.
389: */
390:
1.4 deraadt 391: if (p->ofd != -1 &&
392: fstat(p->ofd, &st) == -1) {
1.20 benno 393: ERR("%s: fstat", f->path);
1.1 benno 394: goto out;
1.4 deraadt 395: } else if (p->ofd != -1 && !S_ISREG(st.st_mode)) {
1.20 benno 396: WARNX("%s: not regular", f->path);
1.1 benno 397: goto out;
398: }
399:
1.4 deraadt 400: if (p->ofd != -1 && st.st_size > 0) {
1.1 benno 401: p->mapsz = st.st_size;
1.2 benno 402: p->map = mmap(NULL, p->mapsz,
1.1 benno 403: PROT_READ, MAP_SHARED, p->ofd, 0);
1.4 deraadt 404: if (p->map == MAP_FAILED) {
1.20 benno 405: ERR("%s: mmap", f->path);
1.1 benno 406: goto out;
407: }
408: }
409:
410: /* Success either way: we don't need this. */
411:
412: *ofd = -1;
413:
1.10 florian 414: /* Create the temporary file. */
1.1 benno 415:
1.21 ! benno 416: if (mktemplate(&p->fname, f->path, sess->opts->recursive) ==
! 417: -1) {
1.20 benno 418: ERRX1("mktemplate");
1.10 florian 419: goto out;
1.1 benno 420: }
1.10 florian 421:
422: if ((p->fd = mkstempat(p->rootfd, p->fname)) == -1) {
1.20 benno 423: ERR("mkstempat");
1.1 benno 424: goto out;
425: }
426:
1.2 benno 427: /*
1.1 benno 428: * FIXME: we can technically wait until the temporary
429: * file is writable, but since it's guaranteed to be
430: * empty, I don't think this is a terribly expensive
431: * operation as it doesn't involve reading the file into
432: * memory beforehand.
433: */
434:
1.20 benno 435: LOG3("%s: temporary: %s", f->path, p->fname);
1.1 benno 436: p->state = DOWNLOAD_READ_REMOTE;
437: return 1;
438: }
439:
440: /*
441: * This matches the sequence in blk_flush().
442: * If we've gotten here, then we have a possibly-open map file
443: * (not for new files) and our temporary file is writable.
444: * We read the size/token, then optionally the data.
445: * The size >0 for reading data, 0 for no more data, and <0 for
446: * a token indicator.
447: */
448:
1.12 florian 449: again:
1.4 deraadt 450: assert(p->state == DOWNLOAD_READ_REMOTE);
451: assert(p->fname != NULL);
452: assert(p->fd != -1);
453: assert(p->fdin != -1);
1.1 benno 454:
1.3 deraadt 455: if (!io_read_int(sess, p->fdin, &rawtok)) {
1.20 benno 456: ERRX1("io_read_int");
1.1 benno 457: goto out;
1.2 benno 458: }
1.1 benno 459:
460: if (rawtok > 0) {
461: sz = rawtok;
1.4 deraadt 462: if ((buf = malloc(sz)) == NULL) {
1.20 benno 463: ERR("realloc");
1.1 benno 464: goto out;
465: }
1.3 deraadt 466: if (!io_read_buf(sess, p->fdin, buf, sz)) {
1.20 benno 467: ERRX1("io_read_int");
1.1 benno 468: goto out;
1.21 ! benno 469: } else if (!buf_copy(buf, sz, p)) {
1.20 benno 470: ERRX1("buf_copy");
1.1 benno 471: goto out;
472: }
473: p->total += sz;
474: p->downloaded += sz;
1.20 benno 475: LOG4("%s: received %zu B block", p->fname, sz);
1.1 benno 476: MD4_Update(&p->ctx, buf, sz);
477: free(buf);
1.12 florian 478:
479: /* Fast-track more reads as they arrive. */
480:
1.21 ! benno 481: if ((c = io_read_check(p->fdin)) < 0) {
1.20 benno 482: ERRX1("io_read_check");
1.12 florian 483: goto out;
484: } else if (c > 0)
485: goto again;
486:
1.1 benno 487: return 1;
488: } else if (rawtok < 0) {
489: tok = -rawtok - 1;
490: if (tok >= p->blk.blksz) {
1.20 benno 491: ERRX("%s: token not in block set: %zu (have %zu blocks)",
1.18 deraadt 492: p->fname, tok, p->blk.blksz);
1.1 benno 493: goto out;
494: }
495: sz = tok == p->blk.blksz - 1 ? p->blk.rem : p->blk.len;
496: assert(sz);
1.4 deraadt 497: assert(p->map != MAP_FAILED);
1.1 benno 498: buf = p->map + (tok * p->blk.len);
499:
500: /*
501: * Now we read from our block.
502: * We should only be at this point if we have a
503: * block to read from, i.e., if we were able to
504: * map our origin file and create a block
505: * profile from it.
506: */
507:
1.4 deraadt 508: assert(p->map != MAP_FAILED);
1.21 ! benno 509: if (!buf_copy(buf, sz, p)) {
1.20 benno 510: ERRX1("buf_copy");
1.1 benno 511: goto out;
512: }
513: p->total += sz;
1.20 benno 514: LOG4("%s: copied %zu B", p->fname, sz);
1.1 benno 515: MD4_Update(&p->ctx, buf, sz);
1.12 florian 516:
517: /* Fast-track more reads as they arrive. */
518:
1.21 ! benno 519: if ((c = io_read_check(p->fdin)) < 0) {
1.20 benno 520: ERRX1("io_read_check");
1.12 florian 521: goto out;
522: } else if (c > 0)
523: goto again;
524:
1.1 benno 525: return 1;
526: }
527:
1.21 ! benno 528: if (!buf_copy(NULL, 0, p)) {
1.20 benno 529: ERRX1("buf_copy");
1.1 benno 530: goto out;
531: }
532:
1.4 deraadt 533: assert(rawtok == 0);
534: assert(p->obufsz == 0);
1.1 benno 535:
1.2 benno 536: /*
1.1 benno 537: * Make sure our resulting MD4 hashes match.
538: * FIXME: if the MD4 hashes don't match, then our file has
539: * changed out from under us.
540: * This should require us to re-run the sequence in another
541: * phase.
542: */
543:
544: MD4_Final(ourmd, &p->ctx);
545:
1.3 deraadt 546: if (!io_read_buf(sess, p->fdin, md, MD4_DIGEST_LENGTH)) {
1.20 benno 547: ERRX1("io_read_buf");
1.1 benno 548: goto out;
549: } else if (memcmp(md, ourmd, MD4_DIGEST_LENGTH)) {
1.20 benno 550: ERRX("%s: hash does not match", p->fname);
1.1 benno 551: goto out;
1.5 benno 552: }
553:
1.9 florian 554: /* Adjust our file metadata (uid, mode, etc.). */
1.6 benno 555:
1.9 florian 556: if (!rsync_set_metadata(sess, 1, p->fd, f, p->fname)) {
1.20 benno 557: ERRX1("rsync_set_metadata");
1.9 florian 558: goto out;
1.1 benno 559: }
560:
561: /* Finally, rename the temporary to the real file. */
562:
1.4 deraadt 563: if (renameat(p->rootfd, p->fname, p->rootfd, f->path) == -1) {
1.20 benno 564: ERR("%s: renameat: %s", p->fname, f->path);
1.1 benno 565: goto out;
566: }
567:
568: log_file(sess, p, f);
569: download_cleanup(p, 0);
570: return 1;
571: out:
572: download_cleanup(p, 1);
573: return -1;
574: }