=================================================================== RCS file: /cvsrepo/anoncvs/cvs/src/usr.bin/rsync/sender.c,v retrieving revision 1.8 retrieving revision 1.9 diff -c -r1.8 -r1.9 *** src/usr.bin/rsync/sender.c 2019/02/16 16:57:17 1.8 --- src/usr.bin/rsync/sender.c 2019/02/16 16:58:39 1.9 *************** *** 1,4 **** ! /* $Id: sender.c,v 1.8 2019/02/16 16:57:17 florian Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * --- 1,4 ---- ! /* $Id: sender.c,v 1.9 2019/02/16 16:58:39 florian Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * *************** *** 26,31 **** --- 26,33 ---- #include #include + #include + #include "extern.h" /* *************** *** 84,89 **** --- 86,92 ---- p->stat.offs = 0; p->stat.hint = 0; + p->stat.curst = BLKSTAT_NONE; p->primed = 0; } *************** *** 172,185 **** rsync_sender(struct sess *sess, int fdin, int fdout, size_t argc, char **argv) { ! struct flist *fl = NULL; ! size_t i, flsz = 0, phase = 0, excl; ! int rc = 0, c; ! int32_t idx; ! struct pollfd pfd[3]; ! struct send_dlq sdlq; ! struct send_up up; ! struct stat st; if (pledge("stdio getpw rpath unveil", NULL) == -1) { ERR(sess, "pledge"); --- 175,195 ---- rsync_sender(struct sess *sess, int fdin, int fdout, size_t argc, char **argv) { ! struct flist *fl = NULL; ! const struct flist *f; ! size_t i, flsz = 0, phase = 0, excl; ! off_t sz; ! int rc = 0, c; ! int32_t idx; ! struct pollfd pfd[3]; ! struct send_dlq sdlq; ! struct send_dl *dl; ! struct send_up up; ! struct stat st; ! unsigned char filemd[MD4_DIGEST_LENGTH]; ! void *wbuf = NULL; ! size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; ! ssize_t ssz; if (pledge("stdio getpw rpath unveil", NULL) == -1) { ERR(sess, "pledge"); *************** *** 260,267 **** pfd[2].fd = -1; /* from local file */ pfd[2].events = POLLIN; - /* The main sender loop runs into phase == 2. */ - for (;;) { assert(pfd[0].fd != -1); if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) { --- 270,275 ---- *************** *** 271,277 **** ERRX(sess, "poll: timeout"); goto out; } - for (i = 0; i < 3; i++) if (pfd[i].revents & (POLLERR|POLLNVAL)) { ERRX(sess, "poll: bad fd"); --- 279,284 ---- *************** *** 281,304 **** goto out; } - /* - * Flush out multiplexed messages. - * These might otherwise clog the reader. - */ - - if (sess->mplex_reads && - (POLLIN & pfd[0].revents)) { - if (!io_read_flush(sess, fdin)) { - ERRX1(sess, "io_read_flush"); - goto out; - } else if (sess->mplex_read_remain == 0) - pfd[0].revents &= ~POLLIN; - } - /* * If we have a request coming down off the wire, pull * it in as quickly as possible into our buffer. * This unclogs the socket buffers so the data can flow. */ if (pfd[0].revents & POLLIN) --- 288,300 ---- goto out; } /* * If we have a request coming down off the wire, pull * it in as quickly as possible into our buffer. * This unclogs the socket buffers so the data can flow. + * FIXME: if we're multiplexing, we might stall here if + * there's only a log message and no actual data. + * This can be fixed by doing a conditional test. */ if (pfd[0].revents & POLLIN) *************** *** 321,328 **** } /* ! * One of our local files has been opened (in response ! * to a receiver request) and now we can map it. * We'll respond to the event by looking at the map when * the writer is available. * Here we also enable the poll event for output. --- 317,324 ---- } /* ! * One of our local files has been opened in response ! * to a receiver request and now we can map it. * We'll respond to the event by looking at the map when * the writer is available. * Here we also enable the poll event for output. *************** *** 333,402 **** assert(up.stat.fd != -1); assert(up.stat.map == MAP_FAILED); assert(up.stat.mapsz == 0); if (fstat(up.stat.fd, &st) == -1) { ! ERR(sess, "%s: fstat", fl[up.cur->idx].path); goto out; } /* * If the file is zero-length, the map will * fail, but either way we want to unset that ! * we're waiting for the file to open. ! * We'll close the descriptor after processing. */ if ((up.stat.mapsz = st.st_size) > 0) { ! up.stat.map = mmap(NULL, up.stat.mapsz, ! PROT_READ, MAP_SHARED, up.stat.fd, 0); if (up.stat.map == MAP_FAILED) { ! ERR(sess, "%s: mmap", fl[up.cur->idx].path); goto out; } ! } pfd[2].fd = -1; pfd[1].fd = fdout; } /* ! * Our outbound is ready to process the current event. ! * This means we've already opened the file and possibly ! * mapped it, and we're ready to send blocks. ! * Do this one block at a time lest we block the channel ! * while read events are coming in. */ if (pfd[1].revents & POLLOUT) { - assert(up.cur != NULL); assert(pfd[2].fd == -1); /* ! * If we receive an invalid index (-1), then we're ! * either promoted to the second phase or it's time to ! * exit, depending upon which phase we're in. * Otherwise, we either start a transfer * sequence (if not primed) or continue one. */ ! if (up.cur->idx < 0) { ! pfd[1].fd = -1; send_up_reset(&up); if (!io_write_int(sess, fdout, -1)) { ERRX1(sess, "io_write_int"); goto out; } - - /* Send superfluous ack. */ - if (sess->opts->server && sess->rver > 27 && !io_write_int(sess, fdout, -1)) { ERRX1(sess, "io_write_int"); goto out; } if (phase++) break; ! } else if (0 == up.primed) { if (!sess->opts->server) LOG1(sess, "%s", fl[up.cur->idx].wpath); --- 329,516 ---- assert(up.stat.fd != -1); assert(up.stat.map == MAP_FAILED); assert(up.stat.mapsz == 0); + f = &fl[up.cur->idx]; if (fstat(up.stat.fd, &st) == -1) { ! ERR(sess, "%s: fstat", f->path); goto out; } /* * If the file is zero-length, the map will * fail, but either way we want to unset that ! * we're waiting for the file to open and set ! * that we're ready for the output channel. */ if ((up.stat.mapsz = st.st_size) > 0) { ! up.stat.map = mmap(NULL, ! up.stat.mapsz, PROT_READ, ! MAP_SHARED, up.stat.fd, 0); if (up.stat.map == MAP_FAILED) { ! ERR(sess, "%s: mmap", f->path); goto out; } ! } ! pfd[2].fd = -1; pfd[1].fd = fdout; } /* ! * If we have buffers waiting to write, write them out ! * as soon as we can in a non-blocking fashion. ! * We must not be waiting for any local files. ! * ALL WRITES MUST HAPPEN HERE. ! * This keeps the sender deadlock-free. */ + if ((pfd[1].revents & POLLOUT) && wbufsz > 0) { + assert(pfd[2].fd == -1); + assert(wbufsz - wbufpos); + ssz = write(fdout, + wbuf + wbufpos, wbufsz - wbufpos); + if (ssz < 0) { + ERR(sess, "write"); + goto out; + } + wbufpos += ssz; + if (wbufpos == wbufsz) + wbufpos = wbufsz = 0; + pfd[1].revents &= ~POLLOUT; + + /* This is usually in io.c... */ + + sess->total_write += ssz; + } + if (pfd[1].revents & POLLOUT) { assert(pfd[2].fd == -1); + assert(0 == wbufpos && 0 == wbufsz); /* ! * If we have data to write, do it now according ! * to the data finite state machine. ! * If we receive an invalid index (-1), then ! * we're either promoted to the second phase or ! * it's time to exit, depending upon which phase ! * we're in. * Otherwise, we either start a transfer * sequence (if not primed) or continue one. */ ! pos = 0; ! if (BLKSTAT_DATA == up.stat.curst) { ! /* ! * A data segment to be written: buffer ! * both the length and the data, then ! * put is in the token phase. ! */ ! ! sz = MIN(MAX_CHUNK, ! up.stat.curlen - up.stat.curpos); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sizeof(int32_t))) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_int(sess, ! wbuf, &pos, wbufsz, sz); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sz)) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, ! up.stat.map + up.stat.curpos, sz); ! up.stat.curpos += sz; ! if (up.stat.curpos == up.stat.curlen) ! up.stat.curst = BLKSTAT_TOK; ! } else if (BLKSTAT_TOK == up.stat.curst) { ! /* ! * The data token following (maybe) a ! * data segment. ! * These can also come standalone if, ! * say, the file's being fully written. ! * It's followed by a hash or another ! * data segment, depending on the token. ! */ ! ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, sizeof(int32_t))) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_int(sess, wbuf, ! &pos, wbufsz, up.stat.curtok); ! up.stat.curst = up.stat.curtok ? ! BLKSTAT_NONE : BLKSTAT_HASH; ! } else if (BLKSTAT_HASH == up.stat.curst) { ! /* ! * The hash following transmission of ! * all file contents. ! * This is always followed by the state ! * that we're finished with the file. ! */ ! ! hash_file(up.stat.map, ! up.stat.mapsz, filemd, sess); ! if (!io_lowbuffer_alloc(sess, &wbuf, ! &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { ! ERRX1(sess, "io_lowbuffer_alloc"); ! goto out; ! } ! io_lowbuffer_buf(sess, wbuf, &pos, ! wbufsz, filemd, MD4_DIGEST_LENGTH); ! up.stat.curst = BLKSTAT_DONE; ! } else if (BLKSTAT_DONE == up.stat.curst) { ! /* ! * The data has been written. ! * Clear our current send file and allow ! * the block below to find another. ! */ ! ! LOG3(sess, "%s: flushed %jd KB total, " ! "%.2f%% uploaded", ! fl[up.cur->idx].path, ! (intmax_t)up.stat.total / 1024, ! 100.0 * up.stat.dirty / up.stat.total); send_up_reset(&up); + } else if (NULL != up.cur && up.cur->idx < 0) { + /* + * We've hit the phase change following + * the last file (or start, or prior + * phase change). + * Simply acknowledge it. + * FIXME: use buffering. + */ + if (!io_write_int(sess, fdout, -1)) { ERRX1(sess, "io_write_int"); goto out; } if (sess->opts->server && sess->rver > 27 && !io_write_int(sess, fdout, -1)) { ERRX1(sess, "io_write_int"); goto out; } + send_up_reset(&up); + /* + * This is where we actually stop the + * algorithm: we're already at the + * second phase. + */ + if (phase++) break; ! } else if (NULL != up.cur && 0 == up.primed) { ! /* ! * We're getting ready to send the file ! * contents to the receiver. ! * FIXME: use buffering. ! */ ! if (!sess->opts->server) LOG1(sess, "%s", fl[up.cur->idx].wpath); *************** *** 416,433 **** ERRX1(sess, "blk_recv_ack"); goto out; } up.primed = 1; ! } else { assert(up.stat.fd != -1); ! c = blk_match(sess, fdout, up.cur->blks, fl[up.cur->idx].path, &up.stat); - if (c < 0) { - ERRX1(sess, "blk_match"); - goto out; - } else if (c > 0) { - send_up_reset(&up); - pfd[1].fd = -1; - } } } --- 530,552 ---- ERRX1(sess, "blk_recv_ack"); goto out; } + LOG3(sess, "%s: primed for %jd B total", + fl[up.cur->idx].path, + (intmax_t)up.cur->blks->size); up.primed = 1; ! } else if (NULL != up.cur) { ! /* ! * Our last case: we need to find the ! * next block (and token) to transmit to ! * the receiver. ! * These will drive the finite state ! * machine in the first few conditional ! * blocks of this set. ! */ ! assert(up.stat.fd != -1); ! blk_match(sess, up.cur->blks, fl[up.cur->idx].path, &up.stat); } } *************** *** 443,463 **** assert(up.stat.fd == -1); assert(up.stat.map == MAP_FAILED); assert(up.stat.mapsz == 0); if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL) continue; TAILQ_REMOVE(&sdlq, up.cur, entries); ! /* End of phase: enable channel to receiver. */ if (up.cur->idx == -1) { pfd[1].fd = fdout; continue; } ! /* Non-blocking open of file. */ ! up.stat.fd = open(fl[up.cur->idx].path, O_RDONLY|O_NONBLOCK, 0); if (up.stat.fd == -1) { ERR(sess, "%s: open", fl[up.cur->idx].path); --- 562,599 ---- assert(up.stat.fd == -1); assert(up.stat.map == MAP_FAILED); assert(up.stat.mapsz == 0); + assert(wbufsz == 0 && wbufpos == 0); + pfd[1].fd = -1; + /* + * If there's nothing in the queue, then keep + * the output channel disabled and wait for + * whatever comes next from the reader. + */ + if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL) continue; + TAILQ_REMOVE(&sdlq, up.cur, entries); ! /* ! * End of phase: enable channel to receiver. ! * We'll need our output buffer enabled in order ! * to process this event. ! */ if (up.cur->idx == -1) { pfd[1].fd = fdout; continue; } ! /* ! * Non-blocking open of file. ! * This will be picked up in the state machine ! * block of not being primed. ! */ ! up.stat.fd = open(fl[up.cur->idx].path, O_RDONLY|O_NONBLOCK, 0); if (up.stat.fd == -1) { ERR(sess, "%s: open", fl[up.cur->idx].path); *************** *** 467,472 **** --- 603,613 ---- } } + if (!TAILQ_EMPTY(&sdlq)) { + ERRX(sess, "phases complete with files still queued"); + goto out; + } + if (!sess_stats_send(sess, fdout)) { ERRX1(sess, "sess_stats_end"); goto out; *************** *** 485,490 **** --- 626,637 ---- LOG2(sess, "sender finished updating"); rc = 1; out: + send_up_reset(&up); + while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { + free(dl->blks); + free(dl); + } flist_free(fl, flsz); + free(wbuf); return rc; }