Annotation of src/usr.bin/rsync/downloader.c, Revision 1.1
1.1 ! benno 1: /* $Id$ */
! 2: /*
! 3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
! 4: *
! 5: * Permission to use, copy, modify, and distribute this software for any
! 6: * purpose with or without fee is hereby granted, provided that the above
! 7: * copyright notice and this permission notice appear in all copies.
! 8: *
! 9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
! 10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
! 11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
! 12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
! 13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
! 14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
! 15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
! 16: */
! 17: #include <sys/mman.h>
! 18: #include <sys/stat.h>
! 19:
! 20: #include <assert.h>
! 21: #include <errno.h>
! 22: #include <fcntl.h>
! 23: #include <inttypes.h>
! 24: #include <math.h>
! 25: #include <poll.h>
! 26: #include <stdio.h>
! 27: #include <stdlib.h>
! 28: #include <string.h>
! 29: #include <time.h>
! 30: #include <unistd.h>
! 31:
! 32: #include "extern.h"
! 33: #include "md4.h"
! 34:
! 35: /*
! 36: * A small optimisation: have a 1 MB pre-write buffer.
! 37: * Disable the pre-write buffer by having this be zero.
! 38: * (It doesn't affect performance much.)
! 39: */
! 40: #define OBUF_SIZE (1024 * 1024)
! 41:
! 42: enum downloadst {
! 43: DOWNLOAD_READ_NEXT = 0,
! 44: DOWNLOAD_READ_LOCAL,
! 45: DOWNLOAD_READ_REMOTE
! 46: };
! 47:
! 48: /*
! 49: * Like struct upload, but used to keep track of what we're downloading.
! 50: * This also is managed by the receiver process.
! 51: */
! 52: struct download {
! 53: enum downloadst state; /* state of affairs */
! 54: size_t idx; /* index of current file */
! 55: struct blkset blk; /* its blocks */
! 56: void *map; /* mmap of current file */
! 57: size_t mapsz; /* length of mapsz */
! 58: int ofd; /* open origin file */
! 59: int fd; /* open output file */
! 60: char *fname; /* output filename */
! 61: MD4_CTX ctx; /* current hashing context */
! 62: off_t downloaded; /* total downloaded */
! 63: off_t total; /* total in file */
! 64: const struct flist *fl; /* file list */
! 65: size_t flsz; /* size of file list */
! 66: int rootfd; /* destination directory */
! 67: int fdin; /* read descriptor from sender */
! 68: char *obuf; /* pre-write buffer */
! 69: size_t obufsz; /* current size of obuf */
! 70: size_t obufmax; /* max size we'll wbuffer */
! 71: };
! 72:
! 73:
! 74: /*
! 75: * Simply log the filename.
! 76: */
! 77: static void
! 78: log_file(struct sess *sess,
! 79: const struct download *dl, const struct flist *f)
! 80: {
! 81: float frac, tot = dl->total;
! 82: int prec = 0;
! 83: const char *unit = "B";
! 84:
! 85: if (sess->opts->server)
! 86: return;
! 87:
! 88: frac = 0 == dl->total ? 100.0 :
! 89: 100.0 * dl->downloaded / dl->total;
! 90:
! 91: if (dl->total > 1024 * 1024 * 1024) {
! 92: tot = dl->total / (1024. * 1024. * 1024.);
! 93: prec = 3;
! 94: unit = "GB";
! 95: } else if (dl->total > 1024 * 1024) {
! 96: tot = dl->total / (1024. * 1024.);
! 97: prec = 2;
! 98: unit = "MB";
! 99: } else if (dl->total > 1024) {
! 100: tot = dl->total / 1024.;
! 101: prec = 1;
! 102: unit = "KB";
! 103: }
! 104:
! 105: LOG1(sess, "%s (%.*f %s, %.1f%% downloaded)",
! 106: f->path, prec, tot, unit, frac);
! 107: }
! 108:
! 109: /*
! 110: * Reinitialise a download context w/o overwriting the persistent parts
! 111: * of the structure (like p->fl or p->flsz) for index "idx".
! 112: * The MD4 context is pre-seeded.
! 113: */
! 114: static void
! 115: download_reinit(struct sess *sess, struct download *p, size_t idx)
! 116: {
! 117: int32_t seed = htole32(sess->seed);
! 118:
! 119: assert(DOWNLOAD_READ_NEXT == p->state);
! 120:
! 121: p->idx = idx;
! 122: memset(&p->blk, 0, sizeof(struct blkset));
! 123: p->map = MAP_FAILED;
! 124: p->mapsz = 0;
! 125: p->ofd = -1;
! 126: p->fd = -1;
! 127: p->fname = NULL;
! 128: MD4_Init(&p->ctx);
! 129: p->downloaded = p->total = 0;
! 130: /* Don't touch p->fl. */
! 131: /* Don't touch p->flsz. */
! 132: /* Don't touch p->rootfd. */
! 133: /* Don't touch p->fdin. */
! 134: MD4_Update(&p->ctx, &seed, sizeof(int32_t));
! 135: }
! 136:
! 137: /*
! 138: * Free a download context.
! 139: * If "cleanup" is non-zero, we also try to clean up the temporary file,
! 140: * assuming that it has been opened in p->fd.
! 141: */
! 142: static void
! 143: download_cleanup(struct download *p, int cleanup)
! 144: {
! 145:
! 146: if (MAP_FAILED != p->map) {
! 147: assert(p->mapsz);
! 148: munmap(p->map, p->mapsz);
! 149: p->map = MAP_FAILED;
! 150: p->mapsz = 0;
! 151: }
! 152: if (-1 != p->ofd) {
! 153: close(p->ofd);
! 154: p->ofd = -1;
! 155: }
! 156: if (-1 != p->fd) {
! 157: close(p->fd);
! 158: if (cleanup && NULL != p->fname)
! 159: unlinkat(p->rootfd, p->fname, 0);
! 160: p->fd = -1;
! 161: }
! 162: free(p->fname);
! 163: p->fname = NULL;
! 164: p->state = DOWNLOAD_READ_NEXT;
! 165: }
! 166:
! 167: /*
! 168: * Initial allocation of the download object using the file list "fl" of
! 169: * size "flsz", the destination "rootfd", and the sender read "fdin".
! 170: * Returns NULL on allocation failure.
! 171: * On success, download_free() must be called with the pointer.
! 172: */
! 173: struct download *
! 174: download_alloc(struct sess *sess, int fdin,
! 175: const struct flist *fl, size_t flsz, int rootfd)
! 176: {
! 177: struct download *p;
! 178:
! 179: if (NULL == (p = malloc(sizeof(struct download)))) {
! 180: ERR(sess, "malloc");
! 181: return NULL;
! 182: }
! 183:
! 184: p->state = DOWNLOAD_READ_NEXT;
! 185: p->fl = fl;
! 186: p->flsz = flsz;
! 187: p->rootfd = rootfd;
! 188: p->fdin = fdin;
! 189: download_reinit(sess, p, 0);
! 190: p->obufsz = 0;
! 191: p->obuf = NULL;
! 192: p->obufmax = OBUF_SIZE;
! 193: if (p->obufmax &&
! 194: NULL == (p->obuf = malloc(p->obufmax))) {
! 195: ERR(sess, "malloc");
! 196: free(p);
! 197: return NULL;
! 198: }
! 199: return p;
! 200: }
! 201:
! 202: /*
! 203: * Perform all cleanups (including removing stray files) and free.
! 204: * Passing a NULL to this function is ok.
! 205: */
! 206: void
! 207: download_free(struct download *p)
! 208: {
! 209:
! 210: if (NULL == p)
! 211: return;
! 212: download_cleanup(p, 1);
! 213: free(p->obuf);
! 214: free(p);
! 215: }
! 216:
! 217: /*
! 218: * Optimisation: instead of dumping directly into the output file, keep
! 219: * a buffer and write as much as we can into the buffer.
! 220: * That way, we can avoid calling write() too much, and instead call it
! 221: * with big buffers.
! 222: * To flush the buffer w/o changing it, pass 0 as "sz".
! 223: * Returns zero on failure, non-zero on success.
! 224: */
! 225: static int
! 226: buf_copy(struct sess *sess,
! 227: const char *buf, size_t sz, struct download *p)
! 228: {
! 229: size_t rem, tocopy;
! 230: ssize_t ssz;
! 231:
! 232: assert(p->obufsz <= p->obufmax);
! 233:
! 234: /*
! 235: * Copy as much as we can.
! 236: * If we've copied everything, exit.
! 237: * If we have no pre-write buffer (obufmax of zero), this never
! 238: * gets called, so we never buffer anything.
! 239: */
! 240:
! 241: if (sz && p->obufsz < p->obufmax) {
! 242: assert(NULL != p->obuf);
! 243: rem = p->obufmax - p->obufsz;
! 244: assert(rem > 0);
! 245: tocopy = rem < sz ? rem : sz;
! 246: memcpy(p->obuf + p->obufsz, buf, tocopy);
! 247: sz -= tocopy;
! 248: buf += tocopy;
! 249: p->obufsz += tocopy;
! 250: assert(p->obufsz <= p->obufmax);
! 251: if (0 == sz)
! 252: return 1;
! 253: }
! 254:
! 255: /* Drain the main buffer. */
! 256:
! 257: if (p->obufsz) {
! 258: assert(p->obufmax);
! 259: assert(p->obufsz <= p->obufmax);
! 260: assert(NULL != p->obuf);
! 261: if ((ssz = write(p->fd, p->obuf, p->obufsz)) < 0) {
! 262: ERR(sess, "%s: write", p->fname);
! 263: return 0;
! 264: } else if ((size_t)ssz != p->obufsz) {
! 265: ERRX(sess, "%s: short write", p->fname);
! 266: return 0;
! 267: }
! 268: p->obufsz = 0;
! 269: }
! 270:
! 271: /*
! 272: * Now drain anything left.
! 273: * If we have no pre-write buffer, this is it.
! 274: */
! 275:
! 276: if (sz) {
! 277: if ((ssz = write(p->fd, buf, sz)) < 0) {
! 278: ERR(sess, "%s: write", p->fname);
! 279: return 0;
! 280: } else if ((size_t)ssz != sz) {
! 281: ERRX(sess, "%s: short write", p->fname);
! 282: return 0;
! 283: }
! 284: }
! 285: return 1;
! 286: }
! 287:
! 288: /*
! 289: * The downloader waits on a file the sender is going to give us, opens
! 290: * and mmaps the existing file, opens a temporary file, dumps the file
! 291: * (or metadata) into the temporary file, then renames.
! 292: * This happens in several possible phases to avoid blocking.
! 293: * Returns <0 on failure, 0 on no more data (end of phase), >0 on
! 294: * success (more data to be read from the sender).
! 295: */
! 296: int
! 297: rsync_downloader(struct download *p, struct sess *sess, int *ofd)
! 298: {
! 299: int32_t idx, rawtok;
! 300: uint32_t hash;
! 301: const struct flist *f;
! 302: size_t sz, dirlen, tok;
! 303: const char *cp;
! 304: mode_t perm;
! 305: struct stat st;
! 306: char *buf = NULL;
! 307: unsigned char ourmd[MD4_DIGEST_LENGTH],
! 308: md[MD4_DIGEST_LENGTH];
! 309: struct timespec tv[2];
! 310:
! 311: /*
! 312: * If we don't have a download already in session, then the next
! 313: * one is coming in.
! 314: * Read either the stop (phase) signal from the sender or block
! 315: * metadata, in which case we open our file and wait for data.
! 316: */
! 317:
! 318: if (DOWNLOAD_READ_NEXT == p->state) {
! 319: if ( ! io_read_int(sess, p->fdin, &idx)) {
! 320: ERRX1(sess, "io_read_int");
! 321: return -1;
! 322: } else if (idx >= 0 && (size_t)idx >= p->flsz) {
! 323: ERRX(sess, "index out of bounds");
! 324: return -1;
! 325: } else if (idx < 0) {
! 326: LOG3(sess, "downloader: phase complete");
! 327: return 0;
! 328: }
! 329:
! 330: /* Short-circuit: dry_run mode does nothing. */
! 331:
! 332: if (sess->opts->dry_run)
! 333: return 1;
! 334:
! 335: /*
! 336: * Now get our block information.
! 337: * This is all we'll need to reconstruct the file from
! 338: * the map, as block sizes are regular.
! 339: */
! 340:
! 341: download_reinit(sess, p, idx);
! 342: if ( ! blk_send_ack(sess, p->fdin, &p->blk)) {
! 343: ERRX1(sess, "blk_send_ack");
! 344: goto out;
! 345: }
! 346:
! 347: /*
! 348: * Next, we want to open the existing file for using as
! 349: * block input.
! 350: * We do this in a non-blocking way, so if the open
! 351: * succeeds, then we'll go reentrant til the file is
! 352: * readable and we can mmap() it.
! 353: * Set the file descriptor that we want to wait for.
! 354: */
! 355:
! 356: p->state = DOWNLOAD_READ_LOCAL;
! 357: f = &p->fl[idx];
! 358: p->ofd = openat(p->rootfd, f->path,
! 359: O_RDONLY | O_NONBLOCK, 0);
! 360:
! 361: if (-1 == p->ofd && ENOENT != errno) {
! 362: ERR(sess, "%s: openat", f->path);
! 363: goto out;
! 364: } else if (-1 != p->ofd) {
! 365: *ofd = p->ofd;
! 366: return 1;
! 367: }
! 368:
! 369: /* Fall-through: there's no file. */
! 370: }
! 371:
! 372: /*
! 373: * At this point, the server is sending us data and we want to
! 374: * hoover it up as quickly as possible or we'll deadlock.
! 375: * We want to be pulling off of f->fdin as quickly as possible,
! 376: * so perform as much buffering as we can.
! 377: */
! 378:
! 379: f = &p->fl[p->idx];
! 380:
! 381: /*
! 382: * Next in sequence: we have an open download session but
! 383: * haven't created our temporary file.
! 384: * This means that we've already opened (or tried to open) the
! 385: * original file in a nonblocking way, and we can map it.
! 386: */
! 387:
! 388: if (DOWNLOAD_READ_LOCAL == p->state) {
! 389: assert(NULL == p->fname);
! 390:
! 391: /*
! 392: * Try to fstat() the file descriptor if valid and make
! 393: * sure that we're still a regular file.
! 394: * Then, if it has non-zero size, mmap() it for hashing.
! 395: */
! 396:
! 397: if (-1 != p->ofd &&
! 398: -1 == fstat(p->ofd, &st)) {
! 399: ERR(sess, "%s: fstat", f->path);
! 400: goto out;
! 401: } else if (-1 != p->ofd && ! S_ISREG(st.st_mode)) {
! 402: WARNX(sess, "%s: not regular", f->path);
! 403: goto out;
! 404: }
! 405:
! 406: if (-1 != p->ofd && st.st_size > 0) {
! 407: p->mapsz = st.st_size;
! 408: p->map = mmap(NULL, p->mapsz,
! 409: PROT_READ, MAP_SHARED, p->ofd, 0);
! 410: if (MAP_FAILED == p->map) {
! 411: ERR(sess, "%s: mmap", f->path);
! 412: goto out;
! 413: }
! 414: }
! 415:
! 416: /* Success either way: we don't need this. */
! 417:
! 418: *ofd = -1;
! 419:
! 420: /*
! 421: * Create the temporary file.
! 422: * Use a simple scheme of path/.FILE.RANDOM, where we
! 423: * fill in RANDOM with an arc4random number.
! 424: * The tricky part is getting into the directory if
! 425: * we're in recursive mode.
! 426: */
! 427:
! 428: hash = arc4random();
! 429: if (sess->opts->recursive &&
! 430: NULL != (cp = strrchr(f->path, '/'))) {
! 431: dirlen = cp - f->path;
! 432: if (asprintf(&p->fname, "%.*s/.%s.%" PRIu32,
! 433: (int)dirlen, f->path,
! 434: f->path + dirlen + 1, hash) < 0)
! 435: p->fname = NULL;
! 436: } else {
! 437: if (asprintf(&p->fname, ".%s.%" PRIu32,
! 438: f->path, hash) < 0)
! 439: p->fname = NULL;
! 440: }
! 441: if (NULL == p->fname) {
! 442: ERR(sess, "asprintf");
! 443: goto out;
! 444: }
! 445:
! 446: /*
! 447: * Inherit permissions from the source file if we're new
! 448: * or specifically told with -p.
! 449: */
! 450:
! 451: if ( ! sess->opts->preserve_perms)
! 452: perm = -1 == p->ofd ? f->st.mode : st.st_mode;
! 453: else
! 454: perm = f->st.mode;
! 455:
! 456: p->fd = openat(p->rootfd, p->fname,
! 457: O_APPEND|O_WRONLY|O_CREAT|O_EXCL, perm);
! 458:
! 459: if (-1 == p->fd) {
! 460: ERR(sess, "%s: openat", p->fname);
! 461: goto out;
! 462: }
! 463:
! 464: /*
! 465: * FIXME: we can technically wait until the temporary
! 466: * file is writable, but since it's guaranteed to be
! 467: * empty, I don't think this is a terribly expensive
! 468: * operation as it doesn't involve reading the file into
! 469: * memory beforehand.
! 470: */
! 471:
! 472: LOG3(sess, "%s: temporary: %s", f->path, p->fname);
! 473: p->state = DOWNLOAD_READ_REMOTE;
! 474: return 1;
! 475: }
! 476:
! 477: /*
! 478: * This matches the sequence in blk_flush().
! 479: * If we've gotten here, then we have a possibly-open map file
! 480: * (not for new files) and our temporary file is writable.
! 481: * We read the size/token, then optionally the data.
! 482: * The size >0 for reading data, 0 for no more data, and <0 for
! 483: * a token indicator.
! 484: */
! 485:
! 486: assert(DOWNLOAD_READ_REMOTE == p->state);
! 487: assert(NULL != p->fname);
! 488: assert(-1 != p->fd);
! 489: assert(-1 != p->fdin);
! 490:
! 491: if ( ! io_read_int(sess, p->fdin, &rawtok)) {
! 492: ERRX1(sess, "io_read_int");
! 493: goto out;
! 494: }
! 495:
! 496: if (rawtok > 0) {
! 497: sz = rawtok;
! 498: if (NULL == (buf = malloc(sz))) {
! 499: ERR(sess, "realloc");
! 500: goto out;
! 501: }
! 502: if ( ! io_read_buf(sess, p->fdin, buf, sz)) {
! 503: ERRX1(sess, "io_read_int");
! 504: goto out;
! 505: } else if ( ! buf_copy(sess, buf, sz, p)) {
! 506: ERRX1(sess, "buf_copy");
! 507: goto out;
! 508: }
! 509: p->total += sz;
! 510: p->downloaded += sz;
! 511: LOG4(sess, "%s: received %zu B block", p->fname, sz);
! 512: MD4_Update(&p->ctx, buf, sz);
! 513: free(buf);
! 514: return 1;
! 515: } else if (rawtok < 0) {
! 516: tok = -rawtok - 1;
! 517: if (tok >= p->blk.blksz) {
! 518: ERRX(sess, "%s: token not in block "
! 519: "set: %zu (have %zu blocks)",
! 520: p->fname, tok, p->blk.blksz);
! 521: goto out;
! 522: }
! 523: sz = tok == p->blk.blksz - 1 ? p->blk.rem : p->blk.len;
! 524: assert(sz);
! 525: assert(MAP_FAILED != p->map);
! 526: buf = p->map + (tok * p->blk.len);
! 527:
! 528: /*
! 529: * Now we read from our block.
! 530: * We should only be at this point if we have a
! 531: * block to read from, i.e., if we were able to
! 532: * map our origin file and create a block
! 533: * profile from it.
! 534: */
! 535:
! 536: assert(MAP_FAILED != p->map);
! 537: if ( ! buf_copy(sess, buf, sz, p)) {
! 538: ERRX1(sess, "buf_copy");
! 539: goto out;
! 540: }
! 541: p->total += sz;
! 542: LOG4(sess, "%s: copied %zu B", p->fname, sz);
! 543: MD4_Update(&p->ctx, buf, sz);
! 544: return 1;
! 545: }
! 546:
! 547: if ( ! buf_copy(sess, NULL, 0, p)) {
! 548: ERRX1(sess, "buf_copy");
! 549: goto out;
! 550: }
! 551:
! 552: assert(0 == rawtok);
! 553: assert(0 == p->obufsz);
! 554:
! 555: /*
! 556: * Make sure our resulting MD4 hashes match.
! 557: * FIXME: if the MD4 hashes don't match, then our file has
! 558: * changed out from under us.
! 559: * This should require us to re-run the sequence in another
! 560: * phase.
! 561: */
! 562:
! 563: MD4_Final(ourmd, &p->ctx);
! 564:
! 565: if ( ! io_read_buf(sess, p->fdin, md, MD4_DIGEST_LENGTH)) {
! 566: ERRX1(sess, "io_read_buf");
! 567: goto out;
! 568: } else if (memcmp(md, ourmd, MD4_DIGEST_LENGTH)) {
! 569: ERRX(sess, "%s: hash does not match", p->fname);
! 570: goto out;
! 571: }
! 572:
! 573: /* Conditionally adjust file modification time. */
! 574:
! 575: if (sess->opts->preserve_times) {
! 576: tv[0].tv_sec = time(NULL);
! 577: tv[0].tv_nsec = 0;
! 578: tv[1].tv_sec = f->st.mtime;
! 579: tv[1].tv_nsec = 0;
! 580: if (-1 == futimens(p->fd, tv)) {
! 581: ERR(sess, "%s: futimens", p->fname);
! 582: goto out;
! 583: }
! 584: LOG4(sess, "%s: updated date", f->path);
! 585: }
! 586:
! 587: /* Finally, rename the temporary to the real file. */
! 588:
! 589: if (-1 == renameat(p->rootfd, p->fname, p->rootfd, f->path)) {
! 590: ERR(sess, "%s: renameat: %s", p->fname, f->path);
! 591: goto out;
! 592: }
! 593:
! 594: log_file(sess, p, f);
! 595: download_cleanup(p, 0);
! 596: return 1;
! 597: out:
! 598: download_cleanup(p, 1);
! 599: return -1;
! 600: }