[BACK]Return to sender.c CVS log [TXT][DIR] Up to [local] / src / usr.bin / rsync

Diff for /src/usr.bin/rsync/sender.c between version 1.7 and 1.8

version 1.7, 2019/02/16 05:06:30 version 1.8, 2019/02/16 16:57:17
Line 14 
Line 14 
  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF   * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.   * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */   */
   #include <sys/mman.h>
   #include <sys/queue.h>
 #include <sys/stat.h>  #include <sys/stat.h>
   
 #include <assert.h>  #include <assert.h>
   #include <fcntl.h>
 #include <inttypes.h>  #include <inttypes.h>
   #include <poll.h>
 #include <stdlib.h>  #include <stdlib.h>
 #include <string.h>  #include <string.h>
 #include <unistd.h>  #include <unistd.h>
Line 25 
Line 29 
 #include "extern.h"  #include "extern.h"
   
 /*  /*
    * A request from the receiver to download updated file data.
    */
   struct  send_dl {
           int32_t              idx; /* index in our file list */
           struct blkset       *blks; /* the sender's block information */
           TAILQ_ENTRY(send_dl) entries;
   };
   
   /*
    * The current file being "updated": sent from sender to receiver.
    * If there is no file being uploaded, "cur" is NULL.
    */
   struct  send_up {
           struct send_dl  *cur; /* file being updated or NULL */
           struct blkstat   stat; /* status of file being updated */
           int              primed; /* blk_recv_ack() was called */
   };
   
   TAILQ_HEAD(send_dlq, send_dl);
   
   /*
    * We have finished updating the receiver's file with sender data.
    * Deallocate and wipe clean all resources required for that.
    */
   static void
   send_up_reset(struct send_up *p)
   {
   
           assert(NULL != p);
   
           /* Free the download request, if applicable. */
   
           if (p->cur != NULL) {
                   free(p->cur->blks);
                   free(p->cur);
                   p->cur = NULL;
           }
   
           /* If we mapped a file for scanning, unmap it and close. */
   
           if (p->stat.map != MAP_FAILED)
                   munmap(p->stat.map, p->stat.mapsz);
   
           p->stat.map = MAP_FAILED;
           p->stat.mapsz = 0;
   
           if (p->stat.fd != -1)
                   close(p->stat.fd);
   
           p->stat.fd = -1;
   
           /* Now clear the in-transfer information. */
   
           p->stat.offs = 0;
           p->stat.hint = 0;
           p->primed = 0;
   }
   
   /*
    * Enqueue a download request, getting it off the read channel as
    * quickly a possible.
    * This frees up the read channel for further incoming requests.
    * We'll handle each element in turn, up to and including the last
    * request (phase change), which is always a -1 idx.
    * Returns zero on failure, non-zero on success.
    */
   static int
   send_dl_enqueue(struct sess *sess, struct send_dlq *q,
           int32_t idx, const struct flist *fl, size_t flsz, int fd)
   {
           struct send_dl  *s;
   
           /* End-of-phase marker. */
   
           if (idx == -1) {
                   if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
                           ERR(sess, "calloc");
                           return 0;
                   }
                   s->idx = -1;
                   s->blks = NULL;
                   TAILQ_INSERT_TAIL(q, s, entries);
                   return 1;
           }
   
           /* Validate the index. */
   
           if (idx < 0 || (uint32_t)idx >= flsz) {
                   ERRX(sess, "file index out of bounds: invalid %"
                           PRId32 " out of %zu", idx, flsz);
                   return 0;
           } else if (S_ISDIR(fl[idx].st.mode)) {
                   ERRX(sess, "blocks requested for "
                           "directory: %s", fl[idx].path);
                   return 0;
           } else if (S_ISLNK(fl[idx].st.mode)) {
                   ERRX(sess, "blocks requested for "
                           "symlink: %s", fl[idx].path);
                   return 0;
           } else if (!S_ISREG(fl[idx].st.mode)) {
                   ERRX(sess, "blocks requested for "
                           "special: %s", fl[idx].path);
                   return 0;
           }
   
           if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
                   ERR(sess, "callloc");
                   return 0;
           }
           s->idx = idx;
           s->blks = NULL;
           TAILQ_INSERT_TAIL(q, s, entries);
   
           /*
            * This blocks til the full blockset has been read.
            * That's ok, because the most important thing is getting data
            * off the wire.
            */
   
           if (!sess->opts->dry_run) {
                   s->blks = blk_recv(sess, fd, fl[idx].path);
                   if (s->blks == NULL) {
                           ERRX1(sess, "blk_recv");
                           return 0;
                   }
           }
           return 1;
   }
   
   /*
  * A client sender manages the read-only source files and sends data to   * A client sender manages the read-only source files and sends data to
  * the receiver as requested.   * the receiver as requested.
  * First it sends its list of files, then it waits for the server to   * First it sends its list of files, then it waits for the server to
  * request updates to individual files.   * request updates to individual files.
    * It queues requests for updates as soon as it receives them.
  * Returns zero on failure, non-zero on success.   * Returns zero on failure, non-zero on success.
  *   *
  * Pledges: stdio, rpath, unveil.   * Pledges: stdio, rpath, unveil.
Line 38 
Line 173 
         int fdout, size_t argc, char **argv)          int fdout, size_t argc, char **argv)
 {  {
         struct flist    *fl = NULL;          struct flist    *fl = NULL;
         size_t           flsz = 0, phase = 0, excl;          size_t           i, flsz = 0, phase = 0, excl;
         int              rc = 0, c;          int              rc = 0, c;
         int32_t          idx;          int32_t          idx;
         struct blkset   *blks = NULL;          struct pollfd    pfd[3];
           struct send_dlq  sdlq;
           struct send_up   up;
           struct stat      st;
   
         if (pledge("stdio getpw rpath unveil", NULL) == -1) {          if (pledge("stdio getpw rpath unveil", NULL) == -1) {
                 ERR(sess, "pledge");                  ERR(sess, "pledge");
                 return 0;                  return 0;
         }          }
   
           memset(&up, 0, sizeof(struct send_up));
           TAILQ_INIT(&sdlq);
           up.stat.fd = -1;
           up.stat.map = MAP_FAILED;
   
         /*          /*
          * Generate the list of files we want to send from our           * Generate the list of files we want to send from our
          * command-line input.           * command-line input.
Line 104 
Line 247 
                 }                  }
         }          }
   
         /*          /*
          * We have two phases: the first has a two-byte checksum, the           * Set up our poll events.
          * second has a full 16-byte checksum.           * We start by polling only in receiver requests, enabling other
            * poll events on demand.
          */           */
   
         LOG2(sess, "sender transmitting phase 1 data");          pfd[0].fd = fdin; /* from receiver */
           pfd[0].events = POLLIN;
           pfd[1].fd = -1; /* to receiver */
           pfd[1].events = POLLOUT;
           pfd[2].fd = -1; /* from local file */
           pfd[2].events = POLLIN;
   
           /* The main sender loop runs into phase == 2. */
   
         for (;;) {          for (;;) {
                 if (!io_read_int(sess, fdin, &idx)) {                  assert(pfd[0].fd != -1);
                         ERRX1(sess, "io_read_int");                  if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
                           ERR(sess, "poll");
                         goto out;                          goto out;
                   } else if (c == 0) {
                           ERRX(sess, "poll: timeout");
                           goto out;
                 }                  }
   
                   for (i = 0; i < 3; i++)
                           if (pfd[i].revents & (POLLERR|POLLNVAL)) {
                                   ERRX(sess, "poll: bad fd");
                                   goto out;
                           } else if (pfd[i].revents & POLLHUP) {
                                   ERRX(sess, "poll: hangup");
                                   goto out;
                           }
   
                   /*
                    * Flush out multiplexed messages.
                    * These might otherwise clog the reader.
                    */
   
                   if (sess->mplex_reads &&
                       (POLLIN & pfd[0].revents)) {
                           if (!io_read_flush(sess, fdin)) {
                                   ERRX1(sess, "io_read_flush");
                                   goto out;
                           } else if (sess->mplex_read_remain == 0)
                                   pfd[0].revents &= ~POLLIN;
                   }
   
                 /*                  /*
                  * If we receive an invalid index (-1), then we're                   * If we have a request coming down off the wire, pull
                  * either promoted to the second phase or it's time to                   * it in as quickly as possible into our buffer.
                  * exit, depending upon which phase we're in.                   * This unclogs the socket buffers so the data can flow.
                  */                   */
   
                 if (idx == -1) {                  if (pfd[0].revents & POLLIN)
                         if (!io_write_int(sess, fdout, idx)) {                          for (;;) {
                                 ERRX1(sess, "io_write_int");                                  if (!io_read_int(sess, fdin, &idx)) {
                                           ERRX1(sess, "io_read_int");
                                           goto out;
                                   }
                                   if (!send_dl_enqueue(sess,
                                       &sdlq, idx, fl, flsz, fdin)) {
                                           ERRX1(sess, "send_dl_enqueue");
                                           goto out;
                                   }
                                   c = io_read_check(sess, fdin);
                                   if (c < 0) {
                                           ERRX1(sess, "io_read_check");
                                           goto out;
                                   } else if (c == 0)
                                           break;
                           }
   
                   /*
                    * One of our local files has been opened (in response
                    * to a receiver request) and now we can map it.
                    * We'll respond to the event by looking at the map when
                    * the writer is available.
                    * Here we also enable the poll event for output.
                    */
   
                   if (pfd[2].revents & POLLIN) {
                           assert(up.cur != NULL);
                           assert(up.stat.fd != -1);
                           assert(up.stat.map == MAP_FAILED);
                           assert(up.stat.mapsz == 0);
   
                           if (fstat(up.stat.fd, &st) == -1) {
                                   ERR(sess, "%s: fstat", fl[up.cur->idx].path);
                                 goto out;                                  goto out;
                         }                          }
   
                         /* FIXME: I don't understand this ack. */                          /*
                            * If the file is zero-length, the map will
                            * fail, but either way we want to unset that
                            * we're waiting for the file to open.
                            * We'll close the descriptor after processing.
                            */
   
                         if (sess->opts->server && sess->rver > 27)                          if ((up.stat.mapsz = st.st_size) > 0) {
                                 if (!io_write_int(sess, fdout, idx)) {                                  up.stat.map = mmap(NULL, up.stat.mapsz,
                                           PROT_READ, MAP_SHARED, up.stat.fd, 0);
                                   if (up.stat.map == MAP_FAILED) {
                                           ERR(sess, "%s: mmap", fl[up.cur->idx].path);
                                           goto out;
                                   }
                           }
                           pfd[2].fd = -1;
                           pfd[1].fd = fdout;
                   }
   
                   /*
                    * Our outbound is ready to process the current event.
                    * This means we've already opened the file and possibly
                    * mapped it, and we're ready to send blocks.
                    * Do this one block at a time lest we block the channel
                    * while read events are coming in.
                    */
   
                   if (pfd[1].revents & POLLOUT) {
                           assert(up.cur != NULL);
                           assert(pfd[2].fd == -1);
   
                           /*
                            * If we receive an invalid index (-1), then we're
                            * either promoted to the second phase or it's time to
                            * exit, depending upon which phase we're in.
                            * Otherwise, we either start a transfer
                            * sequence (if not primed) or continue one.
                            */
   
                           if (up.cur->idx < 0) {
                                   pfd[1].fd = -1;
                                   send_up_reset(&up);
                                   if (!io_write_int(sess, fdout, -1)) {
                                         ERRX1(sess, "io_write_int");                                          ERRX1(sess, "io_write_int");
                                         goto out;                                          goto out;
                                 }                                  }
   
                         if (phase++)                                  /* Send superfluous ack. */
                                 break;  
                         LOG2(sess, "sender transmitting phase 2 data");  
                         continue;  
                 }  
   
                 /* Validate index and file type. */                                  if (sess->opts->server && sess->rver > 27 &&
                                       !io_write_int(sess, fdout, -1)) {
                                           ERRX1(sess, "io_write_int");
                                           goto out;
                                   }
   
                 if (idx < 0 || (uint32_t)idx >= flsz) {                                  if (phase++)
                         ERRX(sess, "file index out of bounds: "                                          break;
                                 "invalid %" PRId32 " out of %zu",                          } else if (0 == up.primed) {
                                 idx, flsz);                                  if (!sess->opts->server)
                         goto out;                                          LOG1(sess, "%s", fl[up.cur->idx].wpath);
                 } else if (S_ISDIR(fl[idx].st.mode)) {  
                         ERRX(sess, "blocks requested for "  
                                 "directory: %s", fl[idx].path);  
                         goto out;  
                 } else if (S_ISLNK(fl[idx].st.mode)) {  
                         ERRX(sess, "blocks requested for "  
                                 "symlink: %s", fl[idx].path);  
                         goto out;  
                 } else if (!S_ISREG(fl[idx].st.mode)) {  
                         ERRX(sess, "blocks requested for "  
                                 "special: %s", fl[idx].path);  
                         goto out;  
                 }  
   
                 if (!sess->opts->server)                                  /* Dry-running does nothing but a response. */
                         LOG1(sess, "%s", fl[idx].wpath);  
   
                 /* Dry-run doesn't do anything. */                                  if (sess->opts->dry_run &&
                                       !io_write_int(sess, fdout, up.cur->idx)) {
                                           ERRX1(sess, "io_write_int");
                                           goto out;
                                   }
   
                 if (sess->opts->dry_run) {                                  /* Actually perform the block send. */
                         if (!io_write_int(sess, fdout, idx)) {  
                                 ERRX1(sess, "io_write_int");                                  assert(up.stat.fd != -1);
                                 goto out;                                  if (!blk_recv_ack(sess, fdout,
                                       up.cur->blks, up.cur->idx)) {
                                           ERRX1(sess, "blk_recv_ack");
                                           goto out;
                                   }
                                   up.primed = 1;
                           } else {
                                   assert(up.stat.fd != -1);
                                   c = blk_match(sess, fdout, up.cur->blks,
                                           fl[up.cur->idx].path, &up.stat);
                                   if (c < 0) {
                                           ERRX1(sess, "blk_match");
                                           goto out;
                                   } else if (c > 0) {
                                           send_up_reset(&up);
                                           pfd[1].fd = -1;
                                   }
                         }                          }
                         continue;  
                 }                  }
   
                 /*                  /*
                  * The server will now send us its view of the file.                   * Incoming queue management.
                  * It does so by cutting a file into a series of blocks                   * If we have no queue component that we're waiting on,
                  * and checksumming each block.                   * then pull off the receiver-request queue and start
                  * We can then compare the blocks in our file and those                   * processing the request.
                  * in theirs, and send them blocks they're missing or  
                  * don't have.  
                  */                   */
   
                 blks = blk_recv(sess, fdin, fl[idx].path);                  if (up.cur == NULL) {
                 if (blks == NULL) {                          assert(pfd[2].fd == -1);
                         ERRX1(sess, "blk_recv");                          assert(up.stat.fd == -1);
                         goto out;                          assert(up.stat.map == MAP_FAILED);
                 } else if (!blk_recv_ack(sess, fdout, blks, idx)) {                          assert(up.stat.mapsz == 0);
                         ERRX1(sess, "blk_recv_ack");  
                         goto out;  
                 }  
   
                 c = blk_match(sess, fdout, blks, fl[idx].path);                          if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
                 blkset_free(blks);                                  continue;
                           TAILQ_REMOVE(&sdlq, up.cur, entries);
   
                 if (!c) {                          /* End of phase: enable channel to receiver. */
                         ERRX1(sess, "blk_match");  
                         goto out;                          if (up.cur->idx == -1) {
                                   pfd[1].fd = fdout;
                                   continue;
                           }
   
                           /* Non-blocking open of file. */
   
                           up.stat.fd = open(fl[up.cur->idx].path,
                                   O_RDONLY|O_NONBLOCK, 0);
                           if (up.stat.fd == -1) {
                                   ERR(sess, "%s: open", fl[up.cur->idx].path);
                                   goto out;
                           }
                           pfd[2].fd = up.stat.fd;
                 }                  }
         }          }
   

Legend:
Removed from v.1.7  
changed lines
  Added in v.1.8