=================================================================== RCS file: /cvsrepo/anoncvs/cvs/src/usr.bin/rsync/sender.c,v retrieving revision 1.14 retrieving revision 1.15 diff -u -r1.14 -r1.15 --- src/usr.bin/rsync/sender.c 2019/02/18 21:34:54 1.14 +++ src/usr.bin/rsync/sender.c 2019/02/18 21:55:27 1.15 @@ -1,4 +1,4 @@ -/* $Id: sender.c,v 1.14 2019/02/18 21:34:54 benno Exp $ */ +/* $Id: sender.c,v 1.15 2019/02/18 21:55:27 benno Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -34,7 +34,7 @@ * A request from the receiver to download updated file data. */ struct send_dl { - int32_t idx; /* index in our file list */ + int32_t idx; /* index in our file list */ struct blkset *blks; /* the sender's block information */ TAILQ_ENTRY(send_dl) entries; }; @@ -46,6 +46,7 @@ struct send_up { struct send_dl *cur; /* file being updated or NULL */ struct blkstat stat; /* status of file being updated */ + int primed; /* blk_recv_ack() was called */ }; TAILQ_HEAD(send_dlq, send_dl); @@ -86,192 +87,10 @@ p->stat.offs = 0; p->stat.hint = 0; p->stat.curst = BLKSTAT_NONE; + p->primed = 0; } /* - * This is the bulk of the sender work. - * Here we tend to an output buffer that responds to receiver requests - * for data. - * This does not act upon the output descriptor itself so as to avoid - * blocking, which otherwise would deadlock the protocol. - * Returns zero on failure, non-zero on success. - */ -static int -send_up_fsm(struct sess *sess, size_t *phase, - struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax, - const struct flist *fl) -{ - size_t pos = 0, isz = sizeof(int32_t), - dsz = MD4_DIGEST_LENGTH; - unsigned char fmd[MD4_DIGEST_LENGTH]; - off_t sz; - char buf[20]; - - switch (up->stat.curst) { - case BLKSTAT_DATA: - /* - * A data segment to be written: buffer both the length - * and the data. - * If we've finished the transfer, move on to the token; - * otherwise, keep sending data. - */ - - sz = MINIMUM(MAX_CHUNK, - up->stat.curlen - up->stat.curpos); - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz); - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_buf(sess, *wb, &pos, *wbsz, - up->stat.map + up->stat.curpos, sz); - - up->stat.curpos += sz; - if (up->stat.curpos == up->stat.curlen) - up->stat.curst = BLKSTAT_TOK; - return 1; - case BLKSTAT_TOK: - /* - * The data token following (maybe) a data segment. - * These can also come standalone if, say, the file's - * being fully written. - * It's followed by a hash or another data segment, - * depending on the token. - */ - - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, - &pos, *wbsz, up->stat.curtok); - up->stat.curst = up->stat.curtok ? - BLKSTAT_NEXT : BLKSTAT_HASH; - return 1; - case BLKSTAT_HASH: - /* - * The hash following transmission of all file contents. - * This is always followed by the state that we're - * finished with the file. - */ - - hash_file(up->stat.map, up->stat.mapsz, fmd, sess); - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz); - up->stat.curst = BLKSTAT_DONE; - return 1; - case BLKSTAT_DONE: - /* - * The data has been written. - * Clear our current send file and allow the block below - * to find another. - */ - - LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded", - fl[up->cur->idx].path, - (intmax_t)up->stat.total / 1024, - 100.0 * up->stat.dirty / up->stat.total); - send_up_reset(up); - return 1; - case BLKSTAT_PHASE: - /* - * This is where we actually stop the algorithm: we're - * already at the second phase. - */ - - send_up_reset(up); - (*phase)++; - return 1; - case BLKSTAT_NEXT: - /* - * Our last case: we need to find the - * next block (and token) to transmit to - * the receiver. - * These will drive the finite state - * machine in the first few conditional - * blocks of this set. - */ - - assert(up->stat.fd != -1); - blk_match(sess, up->cur->blks, - fl[up->cur->idx].path, &up->stat); - return 1; - case BLKSTAT_NONE: - break; - } - - assert(BLKSTAT_NONE == up->stat.curst); - - /* - * We've either hit the phase change following the last file (or - * start, or prior phase change), or we need to prime the next - * file for transmission. - * We special-case dry-run mode. - */ - - if (up->cur->idx < 0) { - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); - - if (sess->opts->server && sess->rver > 27) { - if (!io_lowbuffer_alloc(sess, - wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); - } - up->stat.curst = BLKSTAT_PHASE; - } else if (sess->opts->dry_run) { - if (!sess->opts->server) - LOG1(sess, "%s", fl[up->cur->idx].wpath); - - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx); - up->stat.curst = BLKSTAT_NEXT; - } else { - assert(up->stat.fd != -1); - - /* - * FIXME: use the nice output of log_file() and so on in - * downloader.c, which means moving this into - * BLKSTAT_DONE instead of having it be here. - */ - - if (!sess->opts->server) - LOG1(sess, "%s", fl[up->cur->idx].wpath); - - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - assert(sizeof(buf) == 20); - blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx); - io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20); - - LOG3(sess, "%s: primed for %jd B total", - fl[up->cur->idx].path, - (intmax_t)up->cur->blks->size); - up->stat.curst = BLKSTAT_NEXT; - } - - return 1; -} - -/* * Enqueue a download request, getting it off the read channel as * quickly a possible. * This frees up the read channel for further incoming requests. @@ -284,7 +103,7 @@ int32_t idx, const struct flist *fl, size_t flsz, int fd) { struct send_dl *s; - + /* End-of-phase marker. */ if (idx == -1) { @@ -299,7 +118,7 @@ } /* Validate the index. */ - + if (idx < 0 || (uint32_t)idx >= flsz) { ERRX(sess, "file index out of bounds: invalid %" PRId32 " out of %zu", idx, flsz); @@ -359,6 +178,7 @@ struct flist *fl = NULL; const struct flist *f; size_t i, flsz = 0, phase = 0, excl; + off_t sz; int rc = 0, c; int32_t idx; struct pollfd pfd[3]; @@ -366,8 +186,9 @@ struct send_dl *dl; struct send_up up; struct stat st; + unsigned char filemd[MD4_DIGEST_LENGTH]; void *wbuf = NULL; - size_t wbufpos = 0, wbufsz = 0, wbufmax = 0; + size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; ssize_t ssz; if (pledge("stdio getpw rpath unveil", NULL) == -1) { @@ -394,7 +215,7 @@ /* Client sends zero-length exclusions if deleting. */ if (!sess->opts->server && sess->opts->del && - !io_write_int(sess, fdout, 0)) { + !io_write_int(sess, fdout, 0)) { ERRX1(sess, "io_write_int"); goto out; } @@ -563,20 +384,170 @@ sess->total_write += ssz; } - /* - * Engage the FSM for the current transfer. - * If our phase changes, stop processing. - */ - - if (pfd[1].revents & POLLOUT && up.cur != NULL) { + if (pfd[1].revents & POLLOUT) { assert(pfd[2].fd == -1); assert(wbufpos == 0 && wbufsz == 0); - if (!send_up_fsm(sess, &phase, - &up, &wbuf, &wbufsz, &wbufmax, fl)) { - ERRX1(sess, "send_up_fsm"); - goto out; - } else if (phase > 1) - break; + + /* + * If we have data to write, do it now according + * to the data finite state machine. + * If we receive an invalid index (-1), then + * we're either promoted to the second phase or + * it's time to exit, depending upon which phase + * we're in. + * Otherwise, we either start a transfer + * sequence (if not primed) or continue one. + */ + + pos = 0; + if (BLKSTAT_DATA == up.stat.curst) { + /* + * A data segment to be written: buffer + * both the length and the data, then + * put is in the token phase. + */ + + sz = MINIMUM(MAX_CHUNK, + up.stat.curlen - up.stat.curpos); + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, sizeof(int32_t))) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_int(sess, + wbuf, &pos, wbufsz, sz); + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, sz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, + up.stat.map + up.stat.curpos, sz); + up.stat.curpos += sz; + if (up.stat.curpos == up.stat.curlen) + up.stat.curst = BLKSTAT_TOK; + } else if (BLKSTAT_TOK == up.stat.curst) { + /* + * The data token following (maybe) a + * data segment. + * These can also come standalone if, + * say, the file's being fully written. + * It's followed by a hash or another + * data segment, depending on the token. + */ + + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, sizeof(int32_t))) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_int(sess, wbuf, + &pos, wbufsz, up.stat.curtok); + up.stat.curst = up.stat.curtok ? + BLKSTAT_NONE : BLKSTAT_HASH; + } else if (BLKSTAT_HASH == up.stat.curst) { + /* + * The hash following transmission of + * all file contents. + * This is always followed by the state + * that we're finished with the file. + */ + + hash_file(up.stat.map, + up.stat.mapsz, filemd, sess); + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_buf(sess, wbuf, &pos, + wbufsz, filemd, MD4_DIGEST_LENGTH); + up.stat.curst = BLKSTAT_DONE; + } else if (BLKSTAT_DONE == up.stat.curst) { + /* + * The data has been written. + * Clear our current send file and allow + * the block below to find another. + */ + + LOG3(sess, "%s: flushed %jd KB total, " + "%.2f%% uploaded", + fl[up.cur->idx].path, + (intmax_t)up.stat.total / 1024, + 100.0 * up.stat.dirty / up.stat.total); + send_up_reset(&up); + } else if (up.cur != NULL && up.cur->idx < 0) { + /* + * We've hit the phase change following + * the last file (or start, or prior + * phase change). + * Simply acknowledge it. + * FIXME: use buffering. + */ + + if (!io_write_int(sess, fdout, -1)) { + ERRX1(sess, "io_write_int"); + goto out; + } + if (sess->opts->server && sess->rver > 27 && + !io_write_int(sess, fdout, -1)) { + ERRX1(sess, "io_write_int"); + goto out; + } + send_up_reset(&up); + + /* + * This is where we actually stop the + * algorithm: we're already at the + * second phase. + */ + + if (phase++) + break; + } else if (up.cur != NULL && up.primed == 0) { + /* + * We're getting ready to send the file + * contents to the receiver. + * FIXME: use buffering. + */ + + if (!sess->opts->server) + LOG1(sess, "%s", fl[up.cur->idx].wpath); + + /* Dry-running does nothing but a response. */ + + if (sess->opts->dry_run && + !io_write_int(sess, fdout, up.cur->idx)) { + ERRX1(sess, "io_write_int"); + goto out; + } + + /* Actually perform the block send. */ + + assert(up.stat.fd != -1); + if (!blk_recv_ack(sess, fdout, + up.cur->blks, up.cur->idx)) { + ERRX1(sess, "blk_recv_ack"); + goto out; + } + LOG3(sess, "%s: primed for %jd B total", + fl[up.cur->idx].path, + (intmax_t)up.cur->blks->size); + up.primed = 1; + } else if (up.cur != NULL) { + /* + * Our last case: we need to find the + * next block (and token) to transmit to + * the receiver. + * These will drive the finite state + * machine in the first few conditional + * blocks of this set. + */ + + assert(up.stat.fd != -1); + blk_match(sess, up.cur->blks, + fl[up.cur->idx].path, &up.stat); + } } /* @@ -615,7 +586,7 @@ pfd[1].fd = fdout; continue; } - + /* * Non-blocking open of file. * This will be picked up in the state machine @@ -657,7 +628,6 @@ out: send_up_reset(&up); while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { - TAILQ_REMOVE(&sdlq, dl, entries); free(dl->blks); free(dl); }