[BACK]Return to sender.c CVS log [TXT][DIR] Up to [local] / src / usr.bin / rsync

Diff for /src/usr.bin/rsync/sender.c between version 1.8 and 1.9

version 1.8, 2019/02/16 16:57:17 version 1.9, 2019/02/16 16:58:39
Line 26 
Line 26 
 #include <string.h>  #include <string.h>
 #include <unistd.h>  #include <unistd.h>
   
   #include <openssl/md4.h>
   
 #include "extern.h"  #include "extern.h"
   
 /*  /*
Line 84 
Line 86 
   
         p->stat.offs = 0;          p->stat.offs = 0;
         p->stat.hint = 0;          p->stat.hint = 0;
           p->stat.curst = BLKSTAT_NONE;
         p->primed = 0;          p->primed = 0;
 }  }
   
Line 172 
Line 175 
 rsync_sender(struct sess *sess, int fdin,  rsync_sender(struct sess *sess, int fdin,
         int fdout, size_t argc, char **argv)          int fdout, size_t argc, char **argv)
 {  {
         struct flist    *fl = NULL;          struct flist       *fl = NULL;
         size_t           i, flsz = 0, phase = 0, excl;          const struct flist *f;
         int              rc = 0, c;          size_t              i, flsz = 0, phase = 0, excl;
         int32_t          idx;          off_t               sz;
         struct pollfd    pfd[3];          int                 rc = 0, c;
         struct send_dlq  sdlq;          int32_t             idx;
         struct send_up   up;          struct pollfd       pfd[3];
         struct stat      st;          struct send_dlq     sdlq;
           struct send_dl     *dl;
           struct send_up      up;
           struct stat         st;
           unsigned char       filemd[MD4_DIGEST_LENGTH];
           void               *wbuf = NULL;
           size_t              wbufpos = 0, pos, wbufsz = 0, wbufmax = 0;
           ssize_t             ssz;
   
         if (pledge("stdio getpw rpath unveil", NULL) == -1) {          if (pledge("stdio getpw rpath unveil", NULL) == -1) {
                 ERR(sess, "pledge");                  ERR(sess, "pledge");
Line 260 
Line 270 
         pfd[2].fd = -1; /* from local file */          pfd[2].fd = -1; /* from local file */
         pfd[2].events = POLLIN;          pfd[2].events = POLLIN;
   
         /* The main sender loop runs into phase == 2. */  
   
         for (;;) {          for (;;) {
                 assert(pfd[0].fd != -1);                  assert(pfd[0].fd != -1);
                 if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {                  if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
Line 271 
Line 279 
                         ERRX(sess, "poll: timeout");                          ERRX(sess, "poll: timeout");
                         goto out;                          goto out;
                 }                  }
   
                 for (i = 0; i < 3; i++)                  for (i = 0; i < 3; i++)
                         if (pfd[i].revents & (POLLERR|POLLNVAL)) {                          if (pfd[i].revents & (POLLERR|POLLNVAL)) {
                                 ERRX(sess, "poll: bad fd");                                  ERRX(sess, "poll: bad fd");
Line 281 
Line 288 
                                 goto out;                                  goto out;
                         }                          }
   
                 /*  
                  * Flush out multiplexed messages.  
                  * These might otherwise clog the reader.  
                  */  
   
                 if (sess->mplex_reads &&  
                     (POLLIN & pfd[0].revents)) {  
                         if (!io_read_flush(sess, fdin)) {  
                                 ERRX1(sess, "io_read_flush");  
                                 goto out;  
                         } else if (sess->mplex_read_remain == 0)  
                                 pfd[0].revents &= ~POLLIN;  
                 }  
   
                 /*                  /*
                  * If we have a request coming down off the wire, pull                   * If we have a request coming down off the wire, pull
                  * it in as quickly as possible into our buffer.                   * it in as quickly as possible into our buffer.
                  * This unclogs the socket buffers so the data can flow.                   * This unclogs the socket buffers so the data can flow.
                    * FIXME: if we're multiplexing, we might stall here if
                    * there's only a log message and no actual data.
                    * This can be fixed by doing a conditional test.
                  */                   */
   
                 if (pfd[0].revents & POLLIN)                  if (pfd[0].revents & POLLIN)
Line 321 
Line 317 
                         }                          }
   
                 /*                  /*
                  * One of our local files has been opened (in response                   * One of our local files has been opened in response
                  * to a receiver request) and now we can map it.                   * to a receiver request and now we can map it.
                  * We'll respond to the event by looking at the map when                   * We'll respond to the event by looking at the map when
                  * the writer is available.                   * the writer is available.
                  * Here we also enable the poll event for output.                   * Here we also enable the poll event for output.
Line 333 
Line 329 
                         assert(up.stat.fd != -1);                          assert(up.stat.fd != -1);
                         assert(up.stat.map == MAP_FAILED);                          assert(up.stat.map == MAP_FAILED);
                         assert(up.stat.mapsz == 0);                          assert(up.stat.mapsz == 0);
                           f = &fl[up.cur->idx];
   
                         if (fstat(up.stat.fd, &st) == -1) {                          if (fstat(up.stat.fd, &st) == -1) {
                                 ERR(sess, "%s: fstat", fl[up.cur->idx].path);                                  ERR(sess, "%s: fstat", f->path);
                                 goto out;                                  goto out;
                         }                          }
   
                         /*                          /*
                          * If the file is zero-length, the map will                           * If the file is zero-length, the map will
                          * fail, but either way we want to unset that                           * fail, but either way we want to unset that
                          * we're waiting for the file to open.                           * we're waiting for the file to open and set
                          * We'll close the descriptor after processing.                           * that we're ready for the output channel.
                          */                           */
   
                         if ((up.stat.mapsz = st.st_size) > 0) {                          if ((up.stat.mapsz = st.st_size) > 0) {
                                 up.stat.map = mmap(NULL, up.stat.mapsz,                                  up.stat.map = mmap(NULL,
                                         PROT_READ, MAP_SHARED, up.stat.fd, 0);                                          up.stat.mapsz, PROT_READ,
                                           MAP_SHARED, up.stat.fd, 0);
                                 if (up.stat.map == MAP_FAILED) {                                  if (up.stat.map == MAP_FAILED) {
                                         ERR(sess, "%s: mmap", fl[up.cur->idx].path);                                          ERR(sess, "%s: mmap", f->path);
                                         goto out;                                          goto out;
                                 }                                  }
                         }                          }
   
                         pfd[2].fd = -1;                          pfd[2].fd = -1;
                         pfd[1].fd = fdout;                          pfd[1].fd = fdout;
                 }                  }
   
                 /*                  /*
                  * Our outbound is ready to process the current event.                   * If we have buffers waiting to write, write them out
                  * This means we've already opened the file and possibly                   * as soon as we can in a non-blocking fashion.
                  * mapped it, and we're ready to send blocks.                   * We must not be waiting for any local files.
                  * Do this one block at a time lest we block the channel                   * ALL WRITES MUST HAPPEN HERE.
                  * while read events are coming in.                   * This keeps the sender deadlock-free.
                  */                   */
   
                   if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
                           assert(pfd[2].fd == -1);
                           assert(wbufsz - wbufpos);
                           ssz = write(fdout,
                                   wbuf + wbufpos, wbufsz - wbufpos);
                           if (ssz < 0) {
                                   ERR(sess, "write");
                                   goto out;
                           }
                           wbufpos += ssz;
                           if (wbufpos == wbufsz)
                                   wbufpos = wbufsz = 0;
                           pfd[1].revents &= ~POLLOUT;
   
                           /* This is usually in io.c... */
   
                           sess->total_write += ssz;
                   }
   
                 if (pfd[1].revents & POLLOUT) {                  if (pfd[1].revents & POLLOUT) {
                         assert(up.cur != NULL);  
                         assert(pfd[2].fd == -1);                          assert(pfd[2].fd == -1);
                           assert(0 == wbufpos && 0 == wbufsz);
   
                         /*                          /*
                          * If we receive an invalid index (-1), then we're                           * If we have data to write, do it now according
                          * either promoted to the second phase or it's time to                           * to the data finite state machine.
                          * exit, depending upon which phase we're in.                           * If we receive an invalid index (-1), then
                            * we're either promoted to the second phase or
                            * it's time to exit, depending upon which phase
                            * we're in.
                          * Otherwise, we either start a transfer                           * Otherwise, we either start a transfer
                          * sequence (if not primed) or continue one.                           * sequence (if not primed) or continue one.
                          */                           */
   
                         if (up.cur->idx < 0) {                          pos = 0;
                                 pfd[1].fd = -1;                          if (BLKSTAT_DATA == up.stat.curst) {
                                   /*
                                    * A data segment to be written: buffer
                                    * both the length and the data, then
                                    * put is in the token phase.
                                    */
   
                                   sz = MIN(MAX_CHUNK,
                                           up.stat.curlen - up.stat.curpos);
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, sizeof(int32_t))) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_int(sess,
                                           wbuf, &pos, wbufsz, sz);
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, sz)) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_buf(sess, wbuf, &pos, wbufsz,
                                           up.stat.map + up.stat.curpos, sz);
                                   up.stat.curpos += sz;
                                   if (up.stat.curpos == up.stat.curlen)
                                           up.stat.curst = BLKSTAT_TOK;
                           } else if (BLKSTAT_TOK == up.stat.curst) {
                                   /*
                                    * The data token following (maybe) a
                                    * data segment.
                                    * These can also come standalone if,
                                    * say, the file's being fully written.
                                    * It's followed by a hash or another
                                    * data segment, depending on the token.
                                    */
   
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, sizeof(int32_t))) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_int(sess, wbuf,
                                           &pos, wbufsz, up.stat.curtok);
                                   up.stat.curst = up.stat.curtok ?
                                           BLKSTAT_NONE : BLKSTAT_HASH;
                           } else if (BLKSTAT_HASH == up.stat.curst) {
                                   /*
                                    * The hash following transmission of
                                    * all file contents.
                                    * This is always followed by the state
                                    * that we're finished with the file.
                                    */
   
                                   hash_file(up.stat.map,
                                           up.stat.mapsz, filemd, sess);
                                   if (!io_lowbuffer_alloc(sess, &wbuf,
                                       &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) {
                                           ERRX1(sess, "io_lowbuffer_alloc");
                                           goto out;
                                   }
                                   io_lowbuffer_buf(sess, wbuf, &pos,
                                           wbufsz, filemd, MD4_DIGEST_LENGTH);
                                   up.stat.curst = BLKSTAT_DONE;
                           } else if (BLKSTAT_DONE == up.stat.curst) {
                                   /*
                                    * The data has been written.
                                    * Clear our current send file and allow
                                    * the block below to find another.
                                    */
   
                                   LOG3(sess, "%s: flushed %jd KB total, "
                                           "%.2f%% uploaded",
                                           fl[up.cur->idx].path,
                                           (intmax_t)up.stat.total / 1024,
                                           100.0 * up.stat.dirty / up.stat.total);
                                 send_up_reset(&up);                                  send_up_reset(&up);
                           } else if (NULL != up.cur && up.cur->idx < 0) {
                                   /*
                                    * We've hit the phase change following
                                    * the last file (or start, or prior
                                    * phase change).
                                    * Simply acknowledge it.
                                    * FIXME: use buffering.
                                    */
   
                                 if (!io_write_int(sess, fdout, -1)) {                                  if (!io_write_int(sess, fdout, -1)) {
                                         ERRX1(sess, "io_write_int");                                          ERRX1(sess, "io_write_int");
                                         goto out;                                          goto out;
                                 }                                  }
   
                                 /* Send superfluous ack. */  
   
                                 if (sess->opts->server && sess->rver > 27 &&                                  if (sess->opts->server && sess->rver > 27 &&
                                     !io_write_int(sess, fdout, -1)) {                                      !io_write_int(sess, fdout, -1)) {
                                         ERRX1(sess, "io_write_int");                                          ERRX1(sess, "io_write_int");
                                         goto out;                                          goto out;
                                 }                                  }
                                   send_up_reset(&up);
   
                                   /*
                                    * This is where we actually stop the
                                    * algorithm: we're already at the
                                    * second phase.
                                    */
   
                                 if (phase++)                                  if (phase++)
                                         break;                                          break;
                         } else if (0 == up.primed) {                          } else if (NULL != up.cur && 0 == up.primed) {
                                   /*
                                    * We're getting ready to send the file
                                    * contents to the receiver.
                                    * FIXME: use buffering.
                                    */
   
                                 if (!sess->opts->server)                                  if (!sess->opts->server)
                                         LOG1(sess, "%s", fl[up.cur->idx].wpath);                                          LOG1(sess, "%s", fl[up.cur->idx].wpath);
   
Line 416 
Line 530 
                                         ERRX1(sess, "blk_recv_ack");                                          ERRX1(sess, "blk_recv_ack");
                                         goto out;                                          goto out;
                                 }                                  }
                                   LOG3(sess, "%s: primed for %jd B total",
                                           fl[up.cur->idx].path,
                                           (intmax_t)up.cur->blks->size);
                                 up.primed = 1;                                  up.primed = 1;
                         } else {                          } else if (NULL != up.cur) {
                                   /*
                                    * Our last case: we need to find the
                                    * next block (and token) to transmit to
                                    * the receiver.
                                    * These will drive the finite state
                                    * machine in the first few conditional
                                    * blocks of this set.
                                    */
   
                                 assert(up.stat.fd != -1);                                  assert(up.stat.fd != -1);
                                 c = blk_match(sess, fdout, up.cur->blks,                                  blk_match(sess, up.cur->blks,
                                         fl[up.cur->idx].path, &up.stat);                                          fl[up.cur->idx].path, &up.stat);
                                 if (c < 0) {  
                                         ERRX1(sess, "blk_match");  
                                         goto out;  
                                 } else if (c > 0) {  
                                         send_up_reset(&up);  
                                         pfd[1].fd = -1;  
                                 }  
                         }                          }
                 }                  }
   
Line 443 
Line 562 
                         assert(up.stat.fd == -1);                          assert(up.stat.fd == -1);
                         assert(up.stat.map == MAP_FAILED);                          assert(up.stat.map == MAP_FAILED);
                         assert(up.stat.mapsz == 0);                          assert(up.stat.mapsz == 0);
                           assert(wbufsz == 0 && wbufpos == 0);
                           pfd[1].fd = -1;
   
                           /*
                            * If there's nothing in the queue, then keep
                            * the output channel disabled and wait for
                            * whatever comes next from the reader.
                            */
   
                         if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)                          if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
                                 continue;                                  continue;
   
                         TAILQ_REMOVE(&sdlq, up.cur, entries);                          TAILQ_REMOVE(&sdlq, up.cur, entries);
   
                         /* End of phase: enable channel to receiver. */                          /*
                            * End of phase: enable channel to receiver.
                            * We'll need our output buffer enabled in order
                            * to process this event.
                            */
   
                         if (up.cur->idx == -1) {                          if (up.cur->idx == -1) {
                                 pfd[1].fd = fdout;                                  pfd[1].fd = fdout;
                                 continue;                                  continue;
                         }                          }
   
                         /* Non-blocking open of file. */                          /*
                            * Non-blocking open of file.
                            * This will be picked up in the state machine
                            * block of not being primed.
                            */
   
                         up.stat.fd = open(fl[up.cur->idx].path,                          up.stat.fd = open(fl[up.cur->idx].path,
                                 O_RDONLY|O_NONBLOCK, 0);                                  O_RDONLY|O_NONBLOCK, 0);
                         if (up.stat.fd == -1) {                          if (up.stat.fd == -1) {
                                 ERR(sess, "%s: open", fl[up.cur->idx].path);                                  ERR(sess, "%s: open", fl[up.cur->idx].path);
Line 467 
Line 603 
                 }                  }
         }          }
   
           if (!TAILQ_EMPTY(&sdlq)) {
                   ERRX(sess, "phases complete with files still queued");
                   goto out;
           }
   
         if (!sess_stats_send(sess, fdout)) {          if (!sess_stats_send(sess, fdout)) {
                 ERRX1(sess, "sess_stats_end");                  ERRX1(sess, "sess_stats_end");
                 goto out;                  goto out;
Line 485 
Line 626 
         LOG2(sess, "sender finished updating");          LOG2(sess, "sender finished updating");
         rc = 1;          rc = 1;
 out:  out:
           send_up_reset(&up);
           while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
                   free(dl->blks);
                   free(dl);
           }
         flist_free(fl, flsz);          flist_free(fl, flsz);
           free(wbuf);
         return rc;          return rc;
 }  }

Legend:
Removed from v.1.8  
changed lines
  Added in v.1.9