[BACK]Return to sender.c CVS log [TXT][DIR] Up to [local] / src / usr.bin / rsync

Diff for /src/usr.bin/rsync/sender.c between version 1.13 and 1.14

version 1.13, 2019/02/17 16:34:04 version 1.14, 2019/02/18 21:34:54
Line 34 
Line 34 
  * A request from the receiver to download updated file data.   * A request from the receiver to download updated file data.
  */   */
 struct  send_dl {  struct  send_dl {
         int32_t              idx; /* index in our file list */          int32_t              idx; /* index in our file list */
         struct blkset       *blks; /* the sender's block information */          struct blkset       *blks; /* the sender's block information */
         TAILQ_ENTRY(send_dl) entries;          TAILQ_ENTRY(send_dl) entries;
 };  };
Line 46 
Line 46 
 struct  send_up {  struct  send_up {
         struct send_dl  *cur; /* file being updated or NULL */          struct send_dl  *cur; /* file being updated or NULL */
         struct blkstat   stat; /* status of file being updated */          struct blkstat   stat; /* status of file being updated */
         int              primed; /* blk_recv_ack() was called */  
 };  };
   
 TAILQ_HEAD(send_dlq, send_dl);  TAILQ_HEAD(send_dlq, send_dl);
Line 87 
Line 86 
         p->stat.offs = 0;          p->stat.offs = 0;
         p->stat.hint = 0;          p->stat.hint = 0;
         p->stat.curst = BLKSTAT_NONE;          p->stat.curst = BLKSTAT_NONE;
         p->primed = 0;  
 }  }
   
 /*  /*
    * This is the bulk of the sender work.
    * Here we tend to an output buffer that responds to receiver requests
    * for data.
    * This does not act upon the output descriptor itself so as to avoid
    * blocking, which otherwise would deadlock the protocol.
    * Returns zero on failure, non-zero on success.
    */
   static int
   send_up_fsm(struct sess *sess, size_t *phase,
           struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,
           const struct flist *fl)
   {
           size_t           pos = 0, isz = sizeof(int32_t),
                            dsz = MD4_DIGEST_LENGTH;
           unsigned char    fmd[MD4_DIGEST_LENGTH];
           off_t            sz;
           char             buf[20];
   
           switch (up->stat.curst) {
           case BLKSTAT_DATA:
                   /*
                    * A data segment to be written: buffer both the length
                    * and the data.
                    * If we've finished the transfer, move on to the token;
                    * otherwise, keep sending data.
                    */
   
                   sz = MINIMUM(MAX_CHUNK,
                           up->stat.curlen - up->stat.curpos);
                   if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
                           ERRX1(sess, "io_lowbuffer_alloc");
                           return 0;
                   }
                   io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);
                   if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {
                           ERRX1(sess, "io_lowbuffer_alloc");
                           return 0;
                   }
                   io_lowbuffer_buf(sess, *wb, &pos, *wbsz,
                           up->stat.map + up->stat.curpos, sz);
   
                   up->stat.curpos += sz;
                   if (up->stat.curpos == up->stat.curlen)
                           up->stat.curst = BLKSTAT_TOK;
                   return 1;
           case BLKSTAT_TOK:
                   /*
                    * The data token following (maybe) a data segment.
                    * These can also come standalone if, say, the file's
                    * being fully written.
                    * It's followed by a hash or another data segment,
                    * depending on the token.
                    */
   
                   if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
                           ERRX1(sess, "io_lowbuffer_alloc");
                           return 0;
                   }
                   io_lowbuffer_int(sess, *wb,
                           &pos, *wbsz, up->stat.curtok);
                   up->stat.curst = up->stat.curtok ?
                           BLKSTAT_NEXT : BLKSTAT_HASH;
                   return 1;
           case BLKSTAT_HASH:
                   /*
                    * The hash following transmission of all file contents.
                    * This is always followed by the state that we're
                    * finished with the file.
                    */
   
                   hash_file(up->stat.map, up->stat.mapsz, fmd, sess);
                   if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {
                           ERRX1(sess, "io_lowbuffer_alloc");
                           return 0;
                   }
                   io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);
                   up->stat.curst = BLKSTAT_DONE;
                   return 1;
           case BLKSTAT_DONE:
                   /*
                    * The data has been written.
                    * Clear our current send file and allow the block below
                    * to find another.
                    */
   
                   LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded",
                           fl[up->cur->idx].path,
                           (intmax_t)up->stat.total / 1024,
                           100.0 * up->stat.dirty / up->stat.total);
                   send_up_reset(up);
                   return 1;
           case BLKSTAT_PHASE:
                   /*
                    * This is where we actually stop the algorithm: we're
                    * already at the second phase.
                    */
   
                   send_up_reset(up);
                   (*phase)++;
                   return 1;
           case BLKSTAT_NEXT:
                   /*
                    * Our last case: we need to find the
                    * next block (and token) to transmit to
                    * the receiver.
                    * These will drive the finite state
                    * machine in the first few conditional
                    * blocks of this set.
                    */
   
                   assert(up->stat.fd != -1);
                   blk_match(sess, up->cur->blks,
                           fl[up->cur->idx].path, &up->stat);
                   return 1;
           case BLKSTAT_NONE:
                   break;
           }
   
           assert(BLKSTAT_NONE == up->stat.curst);
   
           /*
            * We've either hit the phase change following the last file (or
            * start, or prior phase change), or we need to prime the next
            * file for transmission.
            * We special-case dry-run mode.
            */
   
           if (up->cur->idx < 0) {
                   if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
                           ERRX1(sess, "io_lowbuffer_alloc");
                           return 0;
                   }
                   io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
   
                   if (sess->opts->server && sess->rver > 27) {
                           if (!io_lowbuffer_alloc(sess,
                               wb, wbsz, wbmax, isz)) {
                                   ERRX1(sess, "io_lowbuffer_alloc");
                                   return 0;
                           }
                           io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
                   }
                   up->stat.curst = BLKSTAT_PHASE;
           } else if (sess->opts->dry_run) {
                   if (!sess->opts->server)
                           LOG1(sess, "%s", fl[up->cur->idx].wpath);
   
                   if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
                           ERRX1(sess, "io_lowbuffer_alloc");
                           return 0;
                   }
                   io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);
                   up->stat.curst = BLKSTAT_NEXT;
           } else {
                   assert(up->stat.fd != -1);
   
                   /*
                    * FIXME: use the nice output of log_file() and so on in
                    * downloader.c, which means moving this into
                    * BLKSTAT_DONE instead of having it be here.
                    */
   
                   if (!sess->opts->server)
                           LOG1(sess, "%s", fl[up->cur->idx].wpath);
   
                   if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {
                           ERRX1(sess, "io_lowbuffer_alloc");
                           return 0;
                   }
                   assert(sizeof(buf) == 20);
                   blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx);
                   io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);
   
                   LOG3(sess, "%s: primed for %jd B total",
                           fl[up->cur->idx].path,
                           (intmax_t)up->cur->blks->size);
                   up->stat.curst = BLKSTAT_NEXT;
           }
   
           return 1;
   }
   
   /*
  * Enqueue a download request, getting it off the read channel as   * Enqueue a download request, getting it off the read channel as
  * quickly a possible.   * quickly a possible.
  * This frees up the read channel for further incoming requests.   * This frees up the read channel for further incoming requests.
Line 103 
Line 284 
         int32_t idx, const struct flist *fl, size_t flsz, int fd)          int32_t idx, const struct flist *fl, size_t flsz, int fd)
 {  {
         struct send_dl  *s;          struct send_dl  *s;
   
         /* End-of-phase marker. */          /* End-of-phase marker. */
   
         if (idx == -1) {          if (idx == -1) {
Line 118 
Line 299 
         }          }
   
         /* Validate the index. */          /* Validate the index. */
   
         if (idx < 0 || (uint32_t)idx >= flsz) {          if (idx < 0 || (uint32_t)idx >= flsz) {
                 ERRX(sess, "file index out of bounds: invalid %"                  ERRX(sess, "file index out of bounds: invalid %"
                         PRId32 " out of %zu", idx, flsz);                          PRId32 " out of %zu", idx, flsz);
Line 178 
Line 359 
         struct flist       *fl = NULL;          struct flist       *fl = NULL;
         const struct flist *f;          const struct flist *f;
         size_t              i, flsz = 0, phase = 0, excl;          size_t              i, flsz = 0, phase = 0, excl;
         off_t               sz;  
         int                 rc = 0, c;          int                 rc = 0, c;
         int32_t             idx;          int32_t             idx;
         struct pollfd       pfd[3];          struct pollfd       pfd[3];
Line 186 
Line 366 
         struct send_dl     *dl;          struct send_dl     *dl;
         struct send_up      up;          struct send_up      up;
         struct stat         st;          struct stat         st;
         unsigned char       filemd[MD4_DIGEST_LENGTH];  
         void               *wbuf = NULL;          void               *wbuf = NULL;
         size_t              wbufpos = 0, pos, wbufsz = 0, wbufmax = 0;          size_t              wbufpos = 0, wbufsz = 0, wbufmax = 0;
         ssize_t             ssz;          ssize_t             ssz;
   
         if (pledge("stdio getpw rpath unveil", NULL) == -1) {          if (pledge("stdio getpw rpath unveil", NULL) == -1) {
Line 215 
Line 394 
         /* Client sends zero-length exclusions if deleting. */          /* Client sends zero-length exclusions if deleting. */
   
         if (!sess->opts->server && sess->opts->del &&          if (!sess->opts->server && sess->opts->del &&
             !io_write_int(sess, fdout, 0)) {               !io_write_int(sess, fdout, 0)) {
                 ERRX1(sess, "io_write_int");                  ERRX1(sess, "io_write_int");
                 goto out;                  goto out;
         }          }
Line 384 
Line 563 
                         sess->total_write += ssz;                          sess->total_write += ssz;
                 }                  }
   
                 if (pfd[1].revents & POLLOUT) {                  /*
                    * Engage the FSM for the current transfer.
                    * If our phase changes, stop processing.
                    */
   
                   if (pfd[1].revents & POLLOUT && up.cur != NULL) {
                         assert(pfd[2].fd == -1);                          assert(pfd[2].fd == -1);
                         assert(wbufpos == 0 && wbufsz == 0);                          assert(wbufpos == 0 && wbufsz == 0);
                           if (!send_up_fsm(sess, &phase,
                         /*                              &up, &wbuf, &wbufsz, &wbufmax, fl)) {
                          * If we have data to write, do it now according                                  ERRX1(sess, "send_up_fsm");
                          * to the data finite state machine.                                  goto out;
                          * If we receive an invalid index (-1), then                          } else if (phase > 1)
                          * we're either promoted to the second phase or                                  break;
                          * it's time to exit, depending upon which phase  
                          * we're in.  
                          * Otherwise, we either start a transfer  
                          * sequence (if not primed) or continue one.  
                          */  
   
                         pos = 0;  
                         if (BLKSTAT_DATA == up.stat.curst) {  
                                 /*  
                                  * A data segment to be written: buffer  
                                  * both the length and the data, then  
                                  * put is in the token phase.  
                                  */  
   
                                 sz = MINIMUM(MAX_CHUNK,  
                                     up.stat.curlen - up.stat.curpos);  
                                 if (!io_lowbuffer_alloc(sess, &wbuf,  
                                     &wbufsz, &wbufmax, sizeof(int32_t))) {  
                                         ERRX1(sess, "io_lowbuffer_alloc");  
                                         goto out;  
                                 }  
                                 io_lowbuffer_int(sess,  
                                         wbuf, &pos, wbufsz, sz);  
                                 if (!io_lowbuffer_alloc(sess, &wbuf,  
                                     &wbufsz, &wbufmax, sz)) {  
                                         ERRX1(sess, "io_lowbuffer_alloc");  
                                         goto out;  
                                 }  
                                 io_lowbuffer_buf(sess, wbuf, &pos, wbufsz,  
                                         up.stat.map + up.stat.curpos, sz);  
                                 up.stat.curpos += sz;  
                                 if (up.stat.curpos == up.stat.curlen)  
                                         up.stat.curst = BLKSTAT_TOK;  
                         } else if (BLKSTAT_TOK == up.stat.curst) {  
                                 /*  
                                  * The data token following (maybe) a  
                                  * data segment.  
                                  * These can also come standalone if,  
                                  * say, the file's being fully written.  
                                  * It's followed by a hash or another  
                                  * data segment, depending on the token.  
                                  */  
   
                                 if (!io_lowbuffer_alloc(sess, &wbuf,  
                                     &wbufsz, &wbufmax, sizeof(int32_t))) {  
                                         ERRX1(sess, "io_lowbuffer_alloc");  
                                         goto out;  
                                 }  
                                 io_lowbuffer_int(sess, wbuf,  
                                         &pos, wbufsz, up.stat.curtok);  
                                 up.stat.curst = up.stat.curtok ?  
                                         BLKSTAT_NONE : BLKSTAT_HASH;  
                         } else if (BLKSTAT_HASH == up.stat.curst) {  
                                 /*  
                                  * The hash following transmission of  
                                  * all file contents.  
                                  * This is always followed by the state  
                                  * that we're finished with the file.  
                                  */  
   
                                 hash_file(up.stat.map,  
                                         up.stat.mapsz, filemd, sess);  
                                 if (!io_lowbuffer_alloc(sess, &wbuf,  
                                     &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) {  
                                         ERRX1(sess, "io_lowbuffer_alloc");  
                                         goto out;  
                                 }  
                                 io_lowbuffer_buf(sess, wbuf, &pos,  
                                         wbufsz, filemd, MD4_DIGEST_LENGTH);  
                                 up.stat.curst = BLKSTAT_DONE;  
                         } else if (BLKSTAT_DONE == up.stat.curst) {  
                                 /*  
                                  * The data has been written.  
                                  * Clear our current send file and allow  
                                  * the block below to find another.  
                                  */  
   
                                 LOG3(sess, "%s: flushed %jd KB total, "  
                                         "%.2f%% uploaded",  
                                         fl[up.cur->idx].path,  
                                         (intmax_t)up.stat.total / 1024,  
                                         100.0 * up.stat.dirty / up.stat.total);  
                                 send_up_reset(&up);  
                         } else if (up.cur != NULL && up.cur->idx < 0) {  
                                 /*  
                                  * We've hit the phase change following  
                                  * the last file (or start, or prior  
                                  * phase change).  
                                  * Simply acknowledge it.  
                                  * FIXME: use buffering.  
                                  */  
   
                                 if (!io_write_int(sess, fdout, -1)) {  
                                         ERRX1(sess, "io_write_int");  
                                         goto out;  
                                 }  
                                 if (sess->opts->server && sess->rver > 27 &&  
                                     !io_write_int(sess, fdout, -1)) {  
                                         ERRX1(sess, "io_write_int");  
                                         goto out;  
                                 }  
                                 send_up_reset(&up);  
   
                                 /*  
                                  * This is where we actually stop the  
                                  * algorithm: we're already at the  
                                  * second phase.  
                                  */  
   
                                 if (phase++)  
                                         break;  
                         } else if (up.cur != NULL && up.primed == 0) {  
                                 /*  
                                  * We're getting ready to send the file  
                                  * contents to the receiver.  
                                  * FIXME: use buffering.  
                                  */  
   
                                 if (!sess->opts->server)  
                                         LOG1(sess, "%s", fl[up.cur->idx].wpath);  
   
                                 /* Dry-running does nothing but a response. */  
   
                                 if (sess->opts->dry_run &&  
                                     !io_write_int(sess, fdout, up.cur->idx)) {  
                                         ERRX1(sess, "io_write_int");  
                                         goto out;  
                                 }  
   
                                 /* Actually perform the block send. */  
   
                                 assert(up.stat.fd != -1);  
                                 if (!blk_recv_ack(sess, fdout,  
                                     up.cur->blks, up.cur->idx)) {  
                                         ERRX1(sess, "blk_recv_ack");  
                                         goto out;  
                                 }  
                                 LOG3(sess, "%s: primed for %jd B total",  
                                         fl[up.cur->idx].path,  
                                         (intmax_t)up.cur->blks->size);  
                                 up.primed = 1;  
                         } else if (up.cur != NULL) {  
                                 /*  
                                  * Our last case: we need to find the  
                                  * next block (and token) to transmit to  
                                  * the receiver.  
                                  * These will drive the finite state  
                                  * machine in the first few conditional  
                                  * blocks of this set.  
                                  */  
   
                                 assert(up.stat.fd != -1);  
                                 blk_match(sess, up.cur->blks,  
                                         fl[up.cur->idx].path, &up.stat);  
                         }  
                 }                  }
   
                 /*                  /*
Line 586 
Line 615 
                                 pfd[1].fd = fdout;                                  pfd[1].fd = fdout;
                                 continue;                                  continue;
                         }                          }
   
                         /*                          /*
                          * Non-blocking open of file.                           * Non-blocking open of file.
                          * This will be picked up in the state machine                           * This will be picked up in the state machine
Line 628 
Line 657 
 out:  out:
         send_up_reset(&up);          send_up_reset(&up);
         while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {          while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
                   TAILQ_REMOVE(&sdlq, dl, entries);
                 free(dl->blks);                  free(dl->blks);
                 free(dl);                  free(dl);
         }          }

Legend:
Removed from v.1.13  
changed lines
  Added in v.1.14