[BACK]Return to sender.c CVS log [TXT][DIR] Up to [local] / src / usr.bin / rsync

Diff for /src/usr.bin/rsync/sender.c between version 1.14 and 1.15

version 1.14, 2019/02/18 21:34:54 version 1.15, 2019/02/18 21:55:27
Line 34 
Line 34 
  * A request from the receiver to download updated file data.   * A request from the receiver to download updated file data.
  */   */
 struct  send_dl {  struct  send_dl {
         int32_t              idx; /* index in our file list */          int32_t              idx; /* index in our file list */
         struct blkset       *blks; /* the sender's block information */          struct blkset       *blks; /* the sender's block information */
         TAILQ_ENTRY(send_dl) entries;          TAILQ_ENTRY(send_dl) entries;
 };  };
Line 46 
Line 46 
 struct  send_up {  struct  send_up {
         struct send_dl  *cur; /* file being updated or NULL */          struct send_dl  *cur; /* file being updated or NULL */
         struct blkstat   stat; /* status of file being updated */          struct blkstat   stat; /* status of file being updated */
           int              primed; /* blk_recv_ack() was called */
 };  };
   
 TAILQ_HEAD(send_dlq, send_dl);  TAILQ_HEAD(send_dlq, send_dl);
Line 86 
Line 87 
         p->stat.offs = 0;          p->stat.offs = 0;
         p->stat.hint = 0;          p->stat.hint = 0;
         p->stat.curst = BLKSTAT_NONE;          p->stat.curst = BLKSTAT_NONE;
           p->primed = 0;
 }  }
   
 /*  /*
  * This is the bulk of the sender work.  
  * Here we tend to an output buffer that responds to receiver requests  
  * for data.  
  * This does not act upon the output descriptor itself so as to avoid  
  * blocking, which otherwise would deadlock the protocol.  
  * Returns zero on failure, non-zero on success.  
  */  
 static int  
 send_up_fsm(struct sess *sess, size_t *phase,  
         struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,  
         const struct flist *fl)  
 {  
         size_t           pos = 0, isz = sizeof(int32_t),  
                          dsz = MD4_DIGEST_LENGTH;  
         unsigned char    fmd[MD4_DIGEST_LENGTH];  
         off_t            sz;  
         char             buf[20];  
   
         switch (up->stat.curst) {  
         case BLKSTAT_DATA:  
                 /*  
                  * A data segment to be written: buffer both the length  
                  * and the data.  
                  * If we've finished the transfer, move on to the token;  
                  * otherwise, keep sending data.  
                  */  
   
                 sz = MINIMUM(MAX_CHUNK,  
                         up->stat.curlen - up->stat.curpos);  
                 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {  
                         ERRX1(sess, "io_lowbuffer_alloc");  
                         return 0;  
                 }  
                 io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);  
                 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {  
                         ERRX1(sess, "io_lowbuffer_alloc");  
                         return 0;  
                 }  
                 io_lowbuffer_buf(sess, *wb, &pos, *wbsz,  
                         up->stat.map + up->stat.curpos, sz);  
   
                 up->stat.curpos += sz;  
                 if (up->stat.curpos == up->stat.curlen)  
                         up->stat.curst = BLKSTAT_TOK;  
                 return 1;  
         case BLKSTAT_TOK:  
                 /*  
                  * The data token following (maybe) a data segment.  
                  * These can also come standalone if, say, the file's  
                  * being fully written.  
                  * It's followed by a hash or another data segment,  
                  * depending on the token.  
                  */  
   
                 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {  
                         ERRX1(sess, "io_lowbuffer_alloc");  
                         return 0;  
                 }  
                 io_lowbuffer_int(sess, *wb,  
                         &pos, *wbsz, up->stat.curtok);  
                 up->stat.curst = up->stat.curtok ?  
                         BLKSTAT_NEXT : BLKSTAT_HASH;  
                 return 1;  
         case BLKSTAT_HASH:  
                 /*  
                  * The hash following transmission of all file contents.  
                  * This is always followed by the state that we're  
                  * finished with the file.  
                  */  
   
                 hash_file(up->stat.map, up->stat.mapsz, fmd, sess);  
                 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {  
                         ERRX1(sess, "io_lowbuffer_alloc");  
                         return 0;  
                 }  
                 io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);  
                 up->stat.curst = BLKSTAT_DONE;  
                 return 1;  
         case BLKSTAT_DONE:  
                 /*  
                  * The data has been written.  
                  * Clear our current send file and allow the block below  
                  * to find another.  
                  */  
   
                 LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded",  
                         fl[up->cur->idx].path,  
                         (intmax_t)up->stat.total / 1024,  
                         100.0 * up->stat.dirty / up->stat.total);  
                 send_up_reset(up);  
                 return 1;  
         case BLKSTAT_PHASE:  
                 /*  
                  * This is where we actually stop the algorithm: we're  
                  * already at the second phase.  
                  */  
   
                 send_up_reset(up);  
                 (*phase)++;  
                 return 1;  
         case BLKSTAT_NEXT:  
                 /*  
                  * Our last case: we need to find the  
                  * next block (and token) to transmit to  
                  * the receiver.  
                  * These will drive the finite state  
                  * machine in the first few conditional  
                  * blocks of this set.  
                  */  
   
                 assert(up->stat.fd != -1);  
                 blk_match(sess, up->cur->blks,  
                         fl[up->cur->idx].path, &up->stat);  
                 return 1;  
         case BLKSTAT_NONE:  
                 break;  
         }  
   
         assert(BLKSTAT_NONE == up->stat.curst);  
   
         /*  
          * We've either hit the phase change following the last file (or  
          * start, or prior phase change), or we need to prime the next  
          * file for transmission.  
          * We special-case dry-run mode.  
          */  
   
         if (up->cur->idx < 0) {  
                 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {  
                         ERRX1(sess, "io_lowbuffer_alloc");  
                         return 0;  
                 }  
                 io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);  
   
                 if (sess->opts->server && sess->rver > 27) {  
                         if (!io_lowbuffer_alloc(sess,  
                             wb, wbsz, wbmax, isz)) {  
                                 ERRX1(sess, "io_lowbuffer_alloc");  
                                 return 0;  
                         }  
                         io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);  
                 }  
                 up->stat.curst = BLKSTAT_PHASE;  
         } else if (sess->opts->dry_run) {  
                 if (!sess->opts->server)  
                         LOG1(sess, "%s", fl[up->cur->idx].wpath);  
   
                 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {  
                         ERRX1(sess, "io_lowbuffer_alloc");  
                         return 0;  
                 }  
                 io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);  
                 up->stat.curst = BLKSTAT_NEXT;  
         } else {  
                 assert(up->stat.fd != -1);  
   
                 /*  
                  * FIXME: use the nice output of log_file() and so on in  
                  * downloader.c, which means moving this into  
                  * BLKSTAT_DONE instead of having it be here.  
                  */  
   
                 if (!sess->opts->server)  
                         LOG1(sess, "%s", fl[up->cur->idx].wpath);  
   
                 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {  
                         ERRX1(sess, "io_lowbuffer_alloc");  
                         return 0;  
                 }  
                 assert(sizeof(buf) == 20);  
                 blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx);  
                 io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);  
   
                 LOG3(sess, "%s: primed for %jd B total",  
                         fl[up->cur->idx].path,  
                         (intmax_t)up->cur->blks->size);  
                 up->stat.curst = BLKSTAT_NEXT;  
         }  
   
         return 1;  
 }  
   
 /*  
  * Enqueue a download request, getting it off the read channel as   * Enqueue a download request, getting it off the read channel as
  * quickly a possible.   * quickly a possible.
  * This frees up the read channel for further incoming requests.   * This frees up the read channel for further incoming requests.
Line 284 
Line 103 
         int32_t idx, const struct flist *fl, size_t flsz, int fd)          int32_t idx, const struct flist *fl, size_t flsz, int fd)
 {  {
         struct send_dl  *s;          struct send_dl  *s;
   
         /* End-of-phase marker. */          /* End-of-phase marker. */
   
         if (idx == -1) {          if (idx == -1) {
Line 299 
Line 118 
         }          }
   
         /* Validate the index. */          /* Validate the index. */
   
         if (idx < 0 || (uint32_t)idx >= flsz) {          if (idx < 0 || (uint32_t)idx >= flsz) {
                 ERRX(sess, "file index out of bounds: invalid %"                  ERRX(sess, "file index out of bounds: invalid %"
                         PRId32 " out of %zu", idx, flsz);                          PRId32 " out of %zu", idx, flsz);
Line 359 
Line 178 
         struct flist       *fl = NULL;          struct flist       *fl = NULL;
         const struct flist *f;          const struct flist *f;
         size_t              i, flsz = 0, phase = 0, excl;          size_t              i, flsz = 0, phase = 0, excl;
           off_t               sz;
         int                 rc = 0, c;          int                 rc = 0, c;
         int32_t             idx;          int32_t             idx;
         struct pollfd       pfd[3];          struct pollfd       pfd[3];
Line 366 
Line 186 
         struct send_dl     *dl;          struct send_dl     *dl;
         struct send_up      up;          struct send_up      up;
         struct stat         st;          struct stat         st;
           unsigned char       filemd[MD4_DIGEST_LENGTH];
         void               *wbuf = NULL;          void               *wbuf = NULL;
         size_t              wbufpos = 0, wbufsz = 0, wbufmax = 0;          size_t              wbufpos = 0, pos, wbufsz = 0, wbufmax = 0;
         ssize_t             ssz;          ssize_t             ssz;
   
         if (pledge("stdio getpw rpath unveil", NULL) == -1) {          if (pledge("stdio getpw rpath unveil", NULL) == -1) {
Line 394 
Line 215 
         /* Client sends zero-length exclusions if deleting. */          /* Client sends zero-length exclusions if deleting. */
   
         if (!sess->opts->server && sess->opts->del &&          if (!sess->opts->server && sess->opts->del &&
              !io_write_int(sess, fdout, 0)) {              !io_write_int(sess, fdout, 0)) {
                 ERRX1(sess, "io_write_int");                  ERRX1(sess, "io_write_int");
                 goto out;                  goto out;
         }          }
Line 563 
Line 384 
                         sess->total_write += ssz;                          sess->total_write += ssz;
                 }                  }
   
                 /*                  if (pfd[1].revents & POLLOUT) {
                  * Engage the FSM for the current transfer.  
                  * If our phase changes, stop processing.  
                  */  
   
                 if (pfd[1].revents & POLLOUT && up.cur != NULL) {  
                         assert(pfd[2].fd == -1);                          assert(pfd[2].fd == -1);
                         assert(wbufpos == 0 && wbufsz == 0);                          assert(wbufpos == 0 && wbufsz == 0);
                         if (!send_up_fsm(sess, &phase,  
                             &up, &wbuf, &wbufsz, &wbufmax, fl)) {                          /*
                                 ERRX1(sess, "send_up_fsm");                           * If we have data to write, do it now according
                                 goto out;                           * to the data finite state machine.
                         } else if (phase > 1)                           * If we receive an invalid index (-1), then
                                 break;                           * we're either promoted to the second phase or
                            * it's time to exit, depending upon which phase
                            * we're in.
                            * Otherwise, we either start a transfer
                            * sequence (if not primed) or continue one.
                            */
   
                           pos = 0;
                           if (BLKSTAT_DATA == up.stat.curst) {
                                   /*
                                    * A data segment to be written: buffer
                                    * both the length and the data, then
                                    * put is in the token phase.
                                    */
   
                                   sz = MINIMUM(MAX_CHUNK,
                                       up.stat.curlen - up.stat.curpos);
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, sizeof(int32_t))) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_int(sess,
                                           wbuf, &pos, wbufsz, sz);
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, sz)) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_buf(sess, wbuf, &pos, wbufsz,
                                           up.stat.map + up.stat.curpos, sz);
                                   up.stat.curpos += sz;
                                   if (up.stat.curpos == up.stat.curlen)
                                           up.stat.curst = BLKSTAT_TOK;
                           } else if (BLKSTAT_TOK == up.stat.curst) {
                                   /*
                                    * The data token following (maybe) a
                                    * data segment.
                                    * These can also come standalone if,
                                    * say, the file's being fully written.
                                    * It's followed by a hash or another
                                    * data segment, depending on the token.
                                    */
   
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, sizeof(int32_t))) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_int(sess, wbuf,
                                           &pos, wbufsz, up.stat.curtok);
                                   up.stat.curst = up.stat.curtok ?
                                           BLKSTAT_NONE : BLKSTAT_HASH;
                           } else if (BLKSTAT_HASH == up.stat.curst) {
                                   /*
                                    * The hash following transmission of
                                    * all file contents.
                                    * This is always followed by the state
                                    * that we're finished with the file.
                                    */
   
                                   hash_file(up.stat.map,
                                           up.stat.mapsz, filemd, sess);
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_buf(sess, wbuf, &pos,
                                           wbufsz, filemd, MD4_DIGEST_LENGTH);
                                   up.stat.curst = BLKSTAT_DONE;
                           } else if (BLKSTAT_DONE == up.stat.curst) {
                                   /*
                                    * The data has been written.
                                    * Clear our current send file and allow
                                    * the block below to find another.
                                    */
   
                                   LOG3(sess, "%s: flushed %jd KB total, "
                                           "%.2f%% uploaded",
                                           fl[up.cur->idx].path,
                                           (intmax_t)up.stat.total / 1024,
                                           100.0 * up.stat.dirty / up.stat.total);
                                   send_up_reset(&up);
                           } else if (up.cur != NULL && up.cur->idx < 0) {
                                   /*
                                    * We've hit the phase change following
                                    * the last file (or start, or prior
                                    * phase change).
                                    * Simply acknowledge it.
                                    * FIXME: use buffering.
                                    */
   
                                   if (!io_write_int(sess, fdout, -1)) {
                                           ERRX1(sess, "io_write_int");
                                           goto out;
                                   }
                                   if (sess->opts->server && sess->rver > 27 &&
                                       !io_write_int(sess, fdout, -1)) {
                                           ERRX1(sess, "io_write_int");
                                           goto out;
                                   }
                                   send_up_reset(&up);
   
                                   /*
                                    * This is where we actually stop the
                                    * algorithm: we're already at the
                                    * second phase.
                                    */
   
                                   if (phase++)
                                           break;
                           } else if (up.cur != NULL && up.primed == 0) {
                                   /*
                                    * We're getting ready to send the file
                                    * contents to the receiver.
                                    * FIXME: use buffering.
                                    */
   
                                   if (!sess->opts->server)
                                           LOG1(sess, "%s", fl[up.cur->idx].wpath);
   
                                   /* Dry-running does nothing but a response. */
   
                                   if (sess->opts->dry_run &&
                                       !io_write_int(sess, fdout, up.cur->idx)) {
                                           ERRX1(sess, "io_write_int");
                                           goto out;
                                   }
   
                                   /* Actually perform the block send. */
   
                                   assert(up.stat.fd != -1);
                                   if (!blk_recv_ack(sess, fdout,
                                       up.cur->blks, up.cur->idx)) {
                                           ERRX1(sess, "blk_recv_ack");
                                           goto out;
                                   }
                                   LOG3(sess, "%s: primed for %jd B total",
                                           fl[up.cur->idx].path,
                                           (intmax_t)up.cur->blks->size);
                                   up.primed = 1;
                           } else if (up.cur != NULL) {
                                   /*
                                    * Our last case: we need to find the
                                    * next block (and token) to transmit to
                                    * the receiver.
                                    * These will drive the finite state
                                    * machine in the first few conditional
                                    * blocks of this set.
                                    */
   
                                   assert(up.stat.fd != -1);
                                   blk_match(sess, up.cur->blks,
                                           fl[up.cur->idx].path, &up.stat);
                           }
                 }                  }
   
                 /*                  /*
Line 615 
Line 586 
                                 pfd[1].fd = fdout;                                  pfd[1].fd = fdout;
                                 continue;                                  continue;
                         }                          }
   
                         /*                          /*
                          * Non-blocking open of file.                           * Non-blocking open of file.
                          * This will be picked up in the state machine                           * This will be picked up in the state machine
Line 657 
Line 628 
 out:  out:
         send_up_reset(&up);          send_up_reset(&up);
         while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {          while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
                 TAILQ_REMOVE(&sdlq, dl, entries);  
                 free(dl->blks);                  free(dl->blks);
                 free(dl);                  free(dl);
         }          }

Legend:
Removed from v.1.14  
changed lines
  Added in v.1.15