=================================================================== RCS file: /cvsrepo/anoncvs/cvs/src/usr.bin/rsync/sender.c,v retrieving revision 1.15 retrieving revision 1.16 diff -c -r1.15 -r1.16 *** src/usr.bin/rsync/sender.c 2019/02/18 21:55:27 1.15 --- src/usr.bin/rsync/sender.c 2019/02/18 22:47:34 1.16 *************** *** 1,4 **** ! /* $Id: sender.c,v 1.15 2019/02/18 21:55:27 benno Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * --- 1,4 ---- ! /* $Id: sender.c,v 1.16 2019/02/18 22:47:34 benno Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * *************** *** 46,52 **** struct send_up { struct send_dl *cur; /* file being updated or NULL */ struct blkstat stat; /* status of file being updated */ - int primed; /* blk_recv_ack() was called */ }; TAILQ_HEAD(send_dlq, send_dl); --- 46,51 ---- *************** *** 87,96 **** p->stat.offs = 0; p->stat.hint = 0; p->stat.curst = BLKSTAT_NONE; - p->primed = 0; } /* * Enqueue a download request, getting it off the read channel as * quickly a possible. * This frees up the read channel for further incoming requests. --- 86,277 ---- p->stat.offs = 0; p->stat.hint = 0; p->stat.curst = BLKSTAT_NONE; } /* + * This is the bulk of the sender work. + * Here we tend to an output buffer that responds to receiver requests + * for data. + * This does not act upon the output descriptor itself so as to avoid + * blocking, which otherwise would deadlock the protocol. + * Returns zero on failure, non-zero on success. + */ + static int + send_up_fsm(struct sess *sess, size_t *phase, + struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax, + const struct flist *fl) + { + size_t pos = 0, isz = sizeof(int32_t), + dsz = MD4_DIGEST_LENGTH; + unsigned char fmd[MD4_DIGEST_LENGTH]; + off_t sz; + char buf[20]; + + switch (up->stat.curst) { + case BLKSTAT_DATA: + /* + * A data segment to be written: buffer both the length + * and the data. + * If we've finished the transfer, move on to the token; + * otherwise, keep sending data. + */ + + sz = MINIMUM(MAX_CHUNK, + up->stat.curlen - up->stat.curpos); + if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz); + if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + io_lowbuffer_buf(sess, *wb, &pos, *wbsz, + up->stat.map + up->stat.curpos, sz); + + up->stat.curpos += sz; + if (up->stat.curpos == up->stat.curlen) + up->stat.curst = BLKSTAT_TOK; + return 1; + case BLKSTAT_TOK: + /* + * The data token following (maybe) a data segment. + * These can also come standalone if, say, the file's + * being fully written. + * It's followed by a hash or another data segment, + * depending on the token. + */ + + if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + io_lowbuffer_int(sess, *wb, + &pos, *wbsz, up->stat.curtok); + up->stat.curst = up->stat.curtok ? + BLKSTAT_NEXT : BLKSTAT_HASH; + return 1; + case BLKSTAT_HASH: + /* + * The hash following transmission of all file contents. + * This is always followed by the state that we're + * finished with the file. + */ + + hash_file(up->stat.map, up->stat.mapsz, fmd, sess); + if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz); + up->stat.curst = BLKSTAT_DONE; + return 1; + case BLKSTAT_DONE: + /* + * The data has been written. + * Clear our current send file and allow the block below + * to find another. + */ + + LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded", + fl[up->cur->idx].path, + (intmax_t)up->stat.total / 1024, + 100.0 * up->stat.dirty / up->stat.total); + send_up_reset(up); + return 1; + case BLKSTAT_PHASE: + /* + * This is where we actually stop the algorithm: we're + * already at the second phase. + */ + + send_up_reset(up); + (*phase)++; + return 1; + case BLKSTAT_NEXT: + /* + * Our last case: we need to find the + * next block (and token) to transmit to + * the receiver. + * These will drive the finite state + * machine in the first few conditional + * blocks of this set. + */ + + assert(up->stat.fd != -1); + blk_match(sess, up->cur->blks, + fl[up->cur->idx].path, &up->stat); + return 1; + case BLKSTAT_NONE: + break; + } + + assert(BLKSTAT_NONE == up->stat.curst); + + /* + * We've either hit the phase change following the last file (or + * start, or prior phase change), or we need to prime the next + * file for transmission. + * We special-case dry-run mode. + */ + + if (up->cur->idx < 0) { + if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); + + if (sess->opts->server && sess->rver > 27) { + if (!io_lowbuffer_alloc(sess, + wb, wbsz, wbmax, isz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); + } + up->stat.curst = BLKSTAT_PHASE; + } else if (sess->opts->dry_run) { + if (!sess->opts->server) + LOG1(sess, "%s", fl[up->cur->idx].wpath); + + if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx); + up->stat.curst = BLKSTAT_NEXT; + } else { + assert(up->stat.fd != -1); + + /* + * FIXME: use the nice output of log_file() and so on in + * downloader.c, which means moving this into + * BLKSTAT_DONE instead of having it be here. + */ + + if (!sess->opts->server) + LOG1(sess, "%s", fl[up->cur->idx].wpath); + + if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) { + ERRX1(sess, "io_lowbuffer_alloc"); + return 0; + } + assert(sizeof(buf) == 20); + blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx); + io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20); + + LOG3(sess, "%s: primed for %jd B total", + fl[up->cur->idx].path, + (intmax_t)up->cur->blks->size); + up->stat.curst = BLKSTAT_NEXT; + } + + return 1; + } + + /* * Enqueue a download request, getting it off the read channel as * quickly a possible. * This frees up the read channel for further incoming requests. *************** *** 178,184 **** struct flist *fl = NULL; const struct flist *f; size_t i, flsz = 0, phase = 0, excl; - off_t sz; int rc = 0, c; int32_t idx; struct pollfd pfd[3]; --- 359,364 ---- *************** *** 186,194 **** struct send_dl *dl; struct send_up up; struct stat st; - unsigned char filemd[MD4_DIGEST_LENGTH]; void *wbuf = NULL; ! size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; ssize_t ssz; if (pledge("stdio getpw rpath unveil", NULL) == -1) { --- 366,373 ---- struct send_dl *dl; struct send_up up; struct stat st; void *wbuf = NULL; ! size_t wbufpos = 0, wbufsz = 0, wbufmax = 0; ssize_t ssz; if (pledge("stdio getpw rpath unveil", NULL) == -1) { *************** *** 215,221 **** /* Client sends zero-length exclusions if deleting. */ if (!sess->opts->server && sess->opts->del && ! !io_write_int(sess, fdout, 0)) { ERRX1(sess, "io_write_int"); goto out; } --- 394,400 ---- /* Client sends zero-length exclusions if deleting. */ if (!sess->opts->server && sess->opts->del && ! !io_write_int(sess, fdout, 0)) { ERRX1(sess, "io_write_int"); goto out; } *************** *** 384,553 **** sess->total_write += ssz; } ! if (pfd[1].revents & POLLOUT) { assert(pfd[2].fd == -1); assert(wbufpos == 0 && wbufsz == 0); ! ! /* ! * If we have data to write, do it now according ! * to the data finite state machine. ! * If we receive an invalid index (-1), then ! * we're either promoted to the second phase or ! * it's time to exit, depending upon which phase ! * we're in. ! * Otherwise, we either start a transfer ! * sequence (if not primed) or continue one. ! */ ! ! pos = 0; ! if (BLKSTAT_DATA == up.stat.curst) { ! /* ! * A data segment to be written: buffer ! * both the length and the data, then ! * put is in the token phase. ! */ ! ! sz = MINIMUM(MAX_CHUNK, ! up.stat.curlen - up.stat.curpos); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sizeof(int32_t))) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_int(sess, ! wbuf, &pos, wbufsz, sz); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sz)) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, ! up.stat.map + up.stat.curpos, sz); ! up.stat.curpos += sz; ! if (up.stat.curpos == up.stat.curlen) ! up.stat.curst = BLKSTAT_TOK; ! } else if (BLKSTAT_TOK == up.stat.curst) { ! /* ! * The data token following (maybe) a ! * data segment. ! * These can also come standalone if, ! * say, the file's being fully written. ! * It's followed by a hash or another ! * data segment, depending on the token. ! */ ! ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sizeof(int32_t))) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_int(sess, wbuf, ! &pos, wbufsz, up.stat.curtok); ! up.stat.curst = up.stat.curtok ? ! BLKSTAT_NONE : BLKSTAT_HASH; ! } else if (BLKSTAT_HASH == up.stat.curst) { ! /* ! * The hash following transmission of ! * all file contents. ! * This is always followed by the state ! * that we're finished with the file. ! */ ! ! hash_file(up.stat.map, ! up.stat.mapsz, filemd, sess); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_buf(sess, wbuf, &pos, ! wbufsz, filemd, MD4_DIGEST_LENGTH); ! up.stat.curst = BLKSTAT_DONE; ! } else if (BLKSTAT_DONE == up.stat.curst) { ! /* ! * The data has been written. ! * Clear our current send file and allow ! * the block below to find another. ! */ ! ! LOG3(sess, "%s: flushed %jd KB total, " ! "%.2f%% uploaded", ! fl[up.cur->idx].path, ! (intmax_t)up.stat.total / 1024, ! 100.0 * up.stat.dirty / up.stat.total); ! send_up_reset(&up); ! } else if (up.cur != NULL && up.cur->idx < 0) { ! /* ! * We've hit the phase change following ! * the last file (or start, or prior ! * phase change). ! * Simply acknowledge it. ! * FIXME: use buffering. ! */ ! ! if (!io_write_int(sess, fdout, -1)) { ! ERRX1(sess, "io_write_int"); ! goto out; ! } ! if (sess->opts->server && sess->rver > 27 && ! !io_write_int(sess, fdout, -1)) { ! ERRX1(sess, "io_write_int"); ! goto out; ! } ! send_up_reset(&up); ! ! /* ! * This is where we actually stop the ! * algorithm: we're already at the ! * second phase. ! */ ! ! if (phase++) ! break; ! } else if (up.cur != NULL && up.primed == 0) { ! /* ! * We're getting ready to send the file ! * contents to the receiver. ! * FIXME: use buffering. ! */ ! ! if (!sess->opts->server) ! LOG1(sess, "%s", fl[up.cur->idx].wpath); ! ! /* Dry-running does nothing but a response. */ ! ! if (sess->opts->dry_run && ! !io_write_int(sess, fdout, up.cur->idx)) { ! ERRX1(sess, "io_write_int"); ! goto out; ! } ! ! /* Actually perform the block send. */ ! ! assert(up.stat.fd != -1); ! if (!blk_recv_ack(sess, fdout, ! up.cur->blks, up.cur->idx)) { ! ERRX1(sess, "blk_recv_ack"); ! goto out; ! } ! LOG3(sess, "%s: primed for %jd B total", ! fl[up.cur->idx].path, ! (intmax_t)up.cur->blks->size); ! up.primed = 1; ! } else if (up.cur != NULL) { ! /* ! * Our last case: we need to find the ! * next block (and token) to transmit to ! * the receiver. ! * These will drive the finite state ! * machine in the first few conditional ! * blocks of this set. ! */ ! ! assert(up.stat.fd != -1); ! blk_match(sess, up.cur->blks, ! fl[up.cur->idx].path, &up.stat); ! } } /* --- 563,582 ---- sess->total_write += ssz; } ! /* ! * Engage the FSM for the current transfer. ! * If our phase changes, stop processing. ! */ ! ! if (pfd[1].revents & POLLOUT && up.cur != NULL) { assert(pfd[2].fd == -1); assert(wbufpos == 0 && wbufsz == 0); ! if (!send_up_fsm(sess, &phase, ! &up, &wbuf, &wbufsz, &wbufmax, fl)) { ! ERRX1(sess, "send_up_fsm"); ! goto out; ! } else if (phase > 1) ! break; } /* *************** *** 628,633 **** --- 657,663 ---- out: send_up_reset(&up); while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { + TAILQ_REMOVE(&sdlq, dl, entries); free(dl->blks); free(dl); }