=================================================================== RCS file: /cvsrepo/anoncvs/cvs/src/usr.bin/rsync/sender.c,v retrieving revision 1.14 retrieving revision 1.15 diff -c -r1.14 -r1.15 *** src/usr.bin/rsync/sender.c 2019/02/18 21:34:54 1.14 --- src/usr.bin/rsync/sender.c 2019/02/18 21:55:27 1.15 *************** *** 1,4 **** ! /* $Id: sender.c,v 1.14 2019/02/18 21:34:54 benno Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * --- 1,4 ---- ! /* $Id: sender.c,v 1.15 2019/02/18 21:55:27 benno Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * *************** *** 34,40 **** * A request from the receiver to download updated file data. */ struct send_dl { ! int32_t idx; /* index in our file list */ struct blkset *blks; /* the sender's block information */ TAILQ_ENTRY(send_dl) entries; }; --- 34,40 ---- * A request from the receiver to download updated file data. */ struct send_dl { ! int32_t idx; /* index in our file list */ struct blkset *blks; /* the sender's block information */ TAILQ_ENTRY(send_dl) entries; }; *************** *** 46,51 **** --- 46,52 ---- struct send_up { struct send_dl *cur; /* file being updated or NULL */ struct blkstat stat; /* status of file being updated */ + int primed; /* blk_recv_ack() was called */ }; TAILQ_HEAD(send_dlq, send_dl); *************** *** 86,277 **** p->stat.offs = 0; p->stat.hint = 0; p->stat.curst = BLKSTAT_NONE; } /* - * This is the bulk of the sender work. - * Here we tend to an output buffer that responds to receiver requests - * for data. - * This does not act upon the output descriptor itself so as to avoid - * blocking, which otherwise would deadlock the protocol. - * Returns zero on failure, non-zero on success. - */ - static int - send_up_fsm(struct sess *sess, size_t *phase, - struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax, - const struct flist *fl) - { - size_t pos = 0, isz = sizeof(int32_t), - dsz = MD4_DIGEST_LENGTH; - unsigned char fmd[MD4_DIGEST_LENGTH]; - off_t sz; - char buf[20]; - - switch (up->stat.curst) { - case BLKSTAT_DATA: - /* - * A data segment to be written: buffer both the length - * and the data. - * If we've finished the transfer, move on to the token; - * otherwise, keep sending data. - */ - - sz = MINIMUM(MAX_CHUNK, - up->stat.curlen - up->stat.curpos); - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz); - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_buf(sess, *wb, &pos, *wbsz, - up->stat.map + up->stat.curpos, sz); - - up->stat.curpos += sz; - if (up->stat.curpos == up->stat.curlen) - up->stat.curst = BLKSTAT_TOK; - return 1; - case BLKSTAT_TOK: - /* - * The data token following (maybe) a data segment. - * These can also come standalone if, say, the file's - * being fully written. - * It's followed by a hash or another data segment, - * depending on the token. - */ - - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, - &pos, *wbsz, up->stat.curtok); - up->stat.curst = up->stat.curtok ? - BLKSTAT_NEXT : BLKSTAT_HASH; - return 1; - case BLKSTAT_HASH: - /* - * The hash following transmission of all file contents. - * This is always followed by the state that we're - * finished with the file. - */ - - hash_file(up->stat.map, up->stat.mapsz, fmd, sess); - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz); - up->stat.curst = BLKSTAT_DONE; - return 1; - case BLKSTAT_DONE: - /* - * The data has been written. - * Clear our current send file and allow the block below - * to find another. - */ - - LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded", - fl[up->cur->idx].path, - (intmax_t)up->stat.total / 1024, - 100.0 * up->stat.dirty / up->stat.total); - send_up_reset(up); - return 1; - case BLKSTAT_PHASE: - /* - * This is where we actually stop the algorithm: we're - * already at the second phase. - */ - - send_up_reset(up); - (*phase)++; - return 1; - case BLKSTAT_NEXT: - /* - * Our last case: we need to find the - * next block (and token) to transmit to - * the receiver. - * These will drive the finite state - * machine in the first few conditional - * blocks of this set. - */ - - assert(up->stat.fd != -1); - blk_match(sess, up->cur->blks, - fl[up->cur->idx].path, &up->stat); - return 1; - case BLKSTAT_NONE: - break; - } - - assert(BLKSTAT_NONE == up->stat.curst); - - /* - * We've either hit the phase change following the last file (or - * start, or prior phase change), or we need to prime the next - * file for transmission. - * We special-case dry-run mode. - */ - - if (up->cur->idx < 0) { - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); - - if (sess->opts->server && sess->rver > 27) { - if (!io_lowbuffer_alloc(sess, - wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); - } - up->stat.curst = BLKSTAT_PHASE; - } else if (sess->opts->dry_run) { - if (!sess->opts->server) - LOG1(sess, "%s", fl[up->cur->idx].wpath); - - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx); - up->stat.curst = BLKSTAT_NEXT; - } else { - assert(up->stat.fd != -1); - - /* - * FIXME: use the nice output of log_file() and so on in - * downloader.c, which means moving this into - * BLKSTAT_DONE instead of having it be here. - */ - - if (!sess->opts->server) - LOG1(sess, "%s", fl[up->cur->idx].wpath); - - if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) { - ERRX1(sess, "io_lowbuffer_alloc"); - return 0; - } - assert(sizeof(buf) == 20); - blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx); - io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20); - - LOG3(sess, "%s: primed for %jd B total", - fl[up->cur->idx].path, - (intmax_t)up->cur->blks->size); - up->stat.curst = BLKSTAT_NEXT; - } - - return 1; - } - - /* * Enqueue a download request, getting it off the read channel as * quickly a possible. * This frees up the read channel for further incoming requests. --- 87,96 ---- p->stat.offs = 0; p->stat.hint = 0; p->stat.curst = BLKSTAT_NONE; + p->primed = 0; } /* * Enqueue a download request, getting it off the read channel as * quickly a possible. * This frees up the read channel for further incoming requests. *************** *** 284,290 **** int32_t idx, const struct flist *fl, size_t flsz, int fd) { struct send_dl *s; ! /* End-of-phase marker. */ if (idx == -1) { --- 103,109 ---- int32_t idx, const struct flist *fl, size_t flsz, int fd) { struct send_dl *s; ! /* End-of-phase marker. */ if (idx == -1) { *************** *** 299,305 **** } /* Validate the index. */ ! if (idx < 0 || (uint32_t)idx >= flsz) { ERRX(sess, "file index out of bounds: invalid %" PRId32 " out of %zu", idx, flsz); --- 118,124 ---- } /* Validate the index. */ ! if (idx < 0 || (uint32_t)idx >= flsz) { ERRX(sess, "file index out of bounds: invalid %" PRId32 " out of %zu", idx, flsz); *************** *** 359,364 **** --- 178,184 ---- struct flist *fl = NULL; const struct flist *f; size_t i, flsz = 0, phase = 0, excl; + off_t sz; int rc = 0, c; int32_t idx; struct pollfd pfd[3]; *************** *** 366,373 **** struct send_dl *dl; struct send_up up; struct stat st; void *wbuf = NULL; ! size_t wbufpos = 0, wbufsz = 0, wbufmax = 0; ssize_t ssz; if (pledge("stdio getpw rpath unveil", NULL) == -1) { --- 186,194 ---- struct send_dl *dl; struct send_up up; struct stat st; + unsigned char filemd[MD4_DIGEST_LENGTH]; void *wbuf = NULL; ! size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; ssize_t ssz; if (pledge("stdio getpw rpath unveil", NULL) == -1) { *************** *** 394,400 **** /* Client sends zero-length exclusions if deleting. */ if (!sess->opts->server && sess->opts->del && ! !io_write_int(sess, fdout, 0)) { ERRX1(sess, "io_write_int"); goto out; } --- 215,221 ---- /* Client sends zero-length exclusions if deleting. */ if (!sess->opts->server && sess->opts->del && ! !io_write_int(sess, fdout, 0)) { ERRX1(sess, "io_write_int"); goto out; } *************** *** 563,582 **** sess->total_write += ssz; } ! /* ! * Engage the FSM for the current transfer. ! * If our phase changes, stop processing. ! */ ! ! if (pfd[1].revents & POLLOUT && up.cur != NULL) { assert(pfd[2].fd == -1); assert(wbufpos == 0 && wbufsz == 0); ! if (!send_up_fsm(sess, &phase, ! &up, &wbuf, &wbufsz, &wbufmax, fl)) { ! ERRX1(sess, "send_up_fsm"); ! goto out; ! } else if (phase > 1) ! break; } /* --- 384,553 ---- sess->total_write += ssz; } ! if (pfd[1].revents & POLLOUT) { assert(pfd[2].fd == -1); assert(wbufpos == 0 && wbufsz == 0); ! ! /* ! * If we have data to write, do it now according ! * to the data finite state machine. ! * If we receive an invalid index (-1), then ! * we're either promoted to the second phase or ! * it's time to exit, depending upon which phase ! * we're in. ! * Otherwise, we either start a transfer ! * sequence (if not primed) or continue one. ! */ ! ! pos = 0; ! if (BLKSTAT_DATA == up.stat.curst) { ! /* ! * A data segment to be written: buffer ! * both the length and the data, then ! * put is in the token phase. ! */ ! ! sz = MINIMUM(MAX_CHUNK, ! up.stat.curlen - up.stat.curpos); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sizeof(int32_t))) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_int(sess, ! wbuf, &pos, wbufsz, sz); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sz)) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, ! up.stat.map + up.stat.curpos, sz); ! up.stat.curpos += sz; ! if (up.stat.curpos == up.stat.curlen) ! up.stat.curst = BLKSTAT_TOK; ! } else if (BLKSTAT_TOK == up.stat.curst) { ! /* ! * The data token following (maybe) a ! * data segment. ! * These can also come standalone if, ! * say, the file's being fully written. ! * It's followed by a hash or another ! * data segment, depending on the token. ! */ ! ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sizeof(int32_t))) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_int(sess, wbuf, ! &pos, wbufsz, up.stat.curtok); ! up.stat.curst = up.stat.curtok ? ! BLKSTAT_NONE : BLKSTAT_HASH; ! } else if (BLKSTAT_HASH == up.stat.curst) { ! /* ! * The hash following transmission of ! * all file contents. ! * This is always followed by the state ! * that we're finished with the file. ! */ ! ! hash_file(up.stat.map, ! up.stat.mapsz, filemd, sess); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_buf(sess, wbuf, &pos, ! wbufsz, filemd, MD4_DIGEST_LENGTH); ! up.stat.curst = BLKSTAT_DONE; ! } else if (BLKSTAT_DONE == up.stat.curst) { ! /* ! * The data has been written. ! * Clear our current send file and allow ! * the block below to find another. ! */ ! ! LOG3(sess, "%s: flushed %jd KB total, " ! "%.2f%% uploaded", ! fl[up.cur->idx].path, ! (intmax_t)up.stat.total / 1024, ! 100.0 * up.stat.dirty / up.stat.total); ! send_up_reset(&up); ! } else if (up.cur != NULL && up.cur->idx < 0) { ! /* ! * We've hit the phase change following ! * the last file (or start, or prior ! * phase change). ! * Simply acknowledge it. ! * FIXME: use buffering. ! */ ! ! if (!io_write_int(sess, fdout, -1)) { ! ERRX1(sess, "io_write_int"); ! goto out; ! } ! if (sess->opts->server && sess->rver > 27 && ! !io_write_int(sess, fdout, -1)) { ! ERRX1(sess, "io_write_int"); ! goto out; ! } ! send_up_reset(&up); ! ! /* ! * This is where we actually stop the ! * algorithm: we're already at the ! * second phase. ! */ ! ! if (phase++) ! break; ! } else if (up.cur != NULL && up.primed == 0) { ! /* ! * We're getting ready to send the file ! * contents to the receiver. ! * FIXME: use buffering. ! */ ! ! if (!sess->opts->server) ! LOG1(sess, "%s", fl[up.cur->idx].wpath); ! ! /* Dry-running does nothing but a response. */ ! ! if (sess->opts->dry_run && ! !io_write_int(sess, fdout, up.cur->idx)) { ! ERRX1(sess, "io_write_int"); ! goto out; ! } ! ! /* Actually perform the block send. */ ! ! assert(up.stat.fd != -1); ! if (!blk_recv_ack(sess, fdout, ! up.cur->blks, up.cur->idx)) { ! ERRX1(sess, "blk_recv_ack"); ! goto out; ! } ! LOG3(sess, "%s: primed for %jd B total", ! fl[up.cur->idx].path, ! (intmax_t)up.cur->blks->size); ! up.primed = 1; ! } else if (up.cur != NULL) { ! /* ! * Our last case: we need to find the ! * next block (and token) to transmit to ! * the receiver. ! * These will drive the finite state ! * machine in the first few conditional ! * blocks of this set. ! */ ! ! assert(up.stat.fd != -1); ! blk_match(sess, up.cur->blks, ! fl[up.cur->idx].path, &up.stat); ! } } /* *************** *** 615,621 **** pfd[1].fd = fdout; continue; } ! /* * Non-blocking open of file. * This will be picked up in the state machine --- 586,592 ---- pfd[1].fd = fdout; continue; } ! /* * Non-blocking open of file. * This will be picked up in the state machine *************** *** 657,663 **** out: send_up_reset(&up); while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { - TAILQ_REMOVE(&sdlq, dl, entries); free(dl->blks); free(dl); } --- 628,633 ----