[BACK]Return to sender.c CVS log [TXT][DIR] Up to [local] / src / usr.bin / rsync

Annotation of src/usr.bin/rsync/sender.c, Revision 1.8

1.8     ! florian     1: /*     $Id: sender.c,v 1.7 2019/02/16 05:06:30 deraadt Exp $ */
1.1       benno       2: /*
                      3:  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
                      4:  *
                      5:  * Permission to use, copy, modify, and distribute this software for any
                      6:  * purpose with or without fee is hereby granted, provided that the above
                      7:  * copyright notice and this permission notice appear in all copies.
                      8:  *
                      9:  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
                     10:  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
                     11:  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
                     12:  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
                     13:  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
                     14:  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
                     15:  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
                     16:  */
1.8     ! florian    17: #include <sys/mman.h>
        !            18: #include <sys/queue.h>
1.1       benno      19: #include <sys/stat.h>
                     20:
                     21: #include <assert.h>
1.8     ! florian    22: #include <fcntl.h>
1.1       benno      23: #include <inttypes.h>
1.8     ! florian    24: #include <poll.h>
1.1       benno      25: #include <stdlib.h>
                     26: #include <string.h>
                     27: #include <unistd.h>
                     28:
                     29: #include "extern.h"
                     30:
                     31: /*
1.8     ! florian    32:  * A request from the receiver to download updated file data.
        !            33:  */
        !            34: struct send_dl {
        !            35:        int32_t              idx; /* index in our file list */
        !            36:        struct blkset       *blks; /* the sender's block information */
        !            37:        TAILQ_ENTRY(send_dl) entries;
        !            38: };
        !            39:
        !            40: /*
        !            41:  * The current file being "updated": sent from sender to receiver.
        !            42:  * If there is no file being uploaded, "cur" is NULL.
        !            43:  */
        !            44: struct send_up {
        !            45:        struct send_dl  *cur; /* file being updated or NULL */
        !            46:        struct blkstat   stat; /* status of file being updated */
        !            47:        int              primed; /* blk_recv_ack() was called */
        !            48: };
        !            49:
        !            50: TAILQ_HEAD(send_dlq, send_dl);
        !            51:
        !            52: /*
        !            53:  * We have finished updating the receiver's file with sender data.
        !            54:  * Deallocate and wipe clean all resources required for that.
        !            55:  */
        !            56: static void
        !            57: send_up_reset(struct send_up *p)
        !            58: {
        !            59:
        !            60:        assert(NULL != p);
        !            61:
        !            62:        /* Free the download request, if applicable. */
        !            63:
        !            64:        if (p->cur != NULL) {
        !            65:                free(p->cur->blks);
        !            66:                free(p->cur);
        !            67:                p->cur = NULL;
        !            68:        }
        !            69:
        !            70:        /* If we mapped a file for scanning, unmap it and close. */
        !            71:
        !            72:        if (p->stat.map != MAP_FAILED)
        !            73:                munmap(p->stat.map, p->stat.mapsz);
        !            74:
        !            75:        p->stat.map = MAP_FAILED;
        !            76:        p->stat.mapsz = 0;
        !            77:
        !            78:        if (p->stat.fd != -1)
        !            79:                close(p->stat.fd);
        !            80:
        !            81:        p->stat.fd = -1;
        !            82:
        !            83:        /* Now clear the in-transfer information. */
        !            84:
        !            85:        p->stat.offs = 0;
        !            86:        p->stat.hint = 0;
        !            87:        p->primed = 0;
        !            88: }
        !            89:
        !            90: /*
        !            91:  * Enqueue a download request, getting it off the read channel as
        !            92:  * quickly a possible.
        !            93:  * This frees up the read channel for further incoming requests.
        !            94:  * We'll handle each element in turn, up to and including the last
        !            95:  * request (phase change), which is always a -1 idx.
        !            96:  * Returns zero on failure, non-zero on success.
        !            97:  */
        !            98: static int
        !            99: send_dl_enqueue(struct sess *sess, struct send_dlq *q,
        !           100:        int32_t idx, const struct flist *fl, size_t flsz, int fd)
        !           101: {
        !           102:        struct send_dl  *s;
        !           103:
        !           104:        /* End-of-phase marker. */
        !           105:
        !           106:        if (idx == -1) {
        !           107:                if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
        !           108:                        ERR(sess, "calloc");
        !           109:                        return 0;
        !           110:                }
        !           111:                s->idx = -1;
        !           112:                s->blks = NULL;
        !           113:                TAILQ_INSERT_TAIL(q, s, entries);
        !           114:                return 1;
        !           115:        }
        !           116:
        !           117:        /* Validate the index. */
        !           118:
        !           119:        if (idx < 0 || (uint32_t)idx >= flsz) {
        !           120:                ERRX(sess, "file index out of bounds: invalid %"
        !           121:                        PRId32 " out of %zu", idx, flsz);
        !           122:                return 0;
        !           123:        } else if (S_ISDIR(fl[idx].st.mode)) {
        !           124:                ERRX(sess, "blocks requested for "
        !           125:                        "directory: %s", fl[idx].path);
        !           126:                return 0;
        !           127:        } else if (S_ISLNK(fl[idx].st.mode)) {
        !           128:                ERRX(sess, "blocks requested for "
        !           129:                        "symlink: %s", fl[idx].path);
        !           130:                return 0;
        !           131:        } else if (!S_ISREG(fl[idx].st.mode)) {
        !           132:                ERRX(sess, "blocks requested for "
        !           133:                        "special: %s", fl[idx].path);
        !           134:                return 0;
        !           135:        }
        !           136:
        !           137:        if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
        !           138:                ERR(sess, "callloc");
        !           139:                return 0;
        !           140:        }
        !           141:        s->idx = idx;
        !           142:        s->blks = NULL;
        !           143:        TAILQ_INSERT_TAIL(q, s, entries);
        !           144:
        !           145:        /*
        !           146:         * This blocks til the full blockset has been read.
        !           147:         * That's ok, because the most important thing is getting data
        !           148:         * off the wire.
        !           149:         */
        !           150:
        !           151:        if (!sess->opts->dry_run) {
        !           152:                s->blks = blk_recv(sess, fd, fl[idx].path);
        !           153:                if (s->blks == NULL) {
        !           154:                        ERRX1(sess, "blk_recv");
        !           155:                        return 0;
        !           156:                }
        !           157:        }
        !           158:        return 1;
        !           159: }
        !           160:
        !           161: /*
1.1       benno     162:  * A client sender manages the read-only source files and sends data to
                    163:  * the receiver as requested.
                    164:  * First it sends its list of files, then it waits for the server to
                    165:  * request updates to individual files.
1.8     ! florian   166:  * It queues requests for updates as soon as it receives them.
1.1       benno     167:  * Returns zero on failure, non-zero on success.
                    168:  *
                    169:  * Pledges: stdio, rpath, unveil.
                    170:  */
                    171: int
                    172: rsync_sender(struct sess *sess, int fdin,
                    173:        int fdout, size_t argc, char **argv)
                    174: {
                    175:        struct flist    *fl = NULL;
1.8     ! florian   176:        size_t           i, flsz = 0, phase = 0, excl;
1.1       benno     177:        int              rc = 0, c;
                    178:        int32_t          idx;
1.8     ! florian   179:        struct pollfd    pfd[3];
        !           180:        struct send_dlq  sdlq;
        !           181:        struct send_up   up;
        !           182:        struct stat      st;
1.1       benno     183:
1.6       benno     184:        if (pledge("stdio getpw rpath unveil", NULL) == -1) {
1.1       benno     185:                ERR(sess, "pledge");
                    186:                return 0;
                    187:        }
                    188:
1.8     ! florian   189:        memset(&up, 0, sizeof(struct send_up));
        !           190:        TAILQ_INIT(&sdlq);
        !           191:        up.stat.fd = -1;
        !           192:        up.stat.map = MAP_FAILED;
        !           193:
1.1       benno     194:        /*
                    195:         * Generate the list of files we want to send from our
                    196:         * command-line input.
                    197:         * This will also remove all invalid files.
                    198:         */
                    199:
1.4       deraadt   200:        if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
1.1       benno     201:                ERRX1(sess, "flist_gen");
                    202:                goto out;
                    203:        }
                    204:
                    205:        /* Client sends zero-length exclusions if deleting. */
                    206:
1.4       deraadt   207:        if (!sess->opts->server && sess->opts->del &&
1.7       deraadt   208:            !io_write_int(sess, fdout, 0)) {
1.1       benno     209:                ERRX1(sess, "io_write_int");
                    210:                goto out;
1.2       benno     211:        }
1.1       benno     212:
1.2       benno     213:        /*
1.1       benno     214:         * Then the file list in any mode.
                    215:         * Finally, the IO error (always zero for us).
                    216:         */
1.2       benno     217:
1.4       deraadt   218:        if (!flist_send(sess, fdin, fdout, fl, flsz)) {
1.1       benno     219:                ERRX1(sess, "flist_send");
                    220:                goto out;
1.4       deraadt   221:        } else if (!io_write_int(sess, fdout, 0)) {
1.1       benno     222:                ERRX1(sess, "io_write_int");
                    223:                goto out;
1.2       benno     224:        }
1.1       benno     225:
                    226:        /* Exit if we're the server with zero files. */
                    227:
1.5       deraadt   228:        if (flsz == 0 && sess->opts->server) {
1.1       benno     229:                WARNX(sess, "sender has empty file list: exiting");
                    230:                rc = 1;
                    231:                goto out;
1.4       deraadt   232:        } else if (!sess->opts->server)
1.1       benno     233:                LOG1(sess, "Transfer starting: %zu files", flsz);
                    234:
                    235:        /*
                    236:         * If we're the server, read our exclusion list.
                    237:         * This is always 0 for now.
                    238:         */
                    239:
                    240:        if (sess->opts->server) {
1.4       deraadt   241:                if (!io_read_size(sess, fdin, &excl)) {
1.1       benno     242:                        ERRX1(sess, "io_read_size");
                    243:                        goto out;
1.5       deraadt   244:                } else if (excl != 0) {
1.1       benno     245:                        ERRX1(sess, "exclusion list is non-empty");
                    246:                        goto out;
                    247:                }
                    248:        }
                    249:
1.8     ! florian   250:        /*
        !           251:         * Set up our poll events.
        !           252:         * We start by polling only in receiver requests, enabling other
        !           253:         * poll events on demand.
1.1       benno     254:         */
                    255:
1.8     ! florian   256:        pfd[0].fd = fdin; /* from receiver */
        !           257:        pfd[0].events = POLLIN;
        !           258:        pfd[1].fd = -1; /* to receiver */
        !           259:        pfd[1].events = POLLOUT;
        !           260:        pfd[2].fd = -1; /* from local file */
        !           261:        pfd[2].events = POLLIN;
        !           262:
        !           263:        /* The main sender loop runs into phase == 2. */
1.1       benno     264:
                    265:        for (;;) {
1.8     ! florian   266:                assert(pfd[0].fd != -1);
        !           267:                if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
        !           268:                        ERR(sess, "poll");
        !           269:                        goto out;
        !           270:                } else if (c == 0) {
        !           271:                        ERRX(sess, "poll: timeout");
1.1       benno     272:                        goto out;
                    273:                }
                    274:
1.8     ! florian   275:                for (i = 0; i < 3; i++)
        !           276:                        if (pfd[i].revents & (POLLERR|POLLNVAL)) {
        !           277:                                ERRX(sess, "poll: bad fd");
        !           278:                                goto out;
        !           279:                        } else if (pfd[i].revents & POLLHUP) {
        !           280:                                ERRX(sess, "poll: hangup");
        !           281:                                goto out;
        !           282:                        }
        !           283:
        !           284:                /*
        !           285:                 * Flush out multiplexed messages.
        !           286:                 * These might otherwise clog the reader.
        !           287:                 */
        !           288:
        !           289:                if (sess->mplex_reads &&
        !           290:                    (POLLIN & pfd[0].revents)) {
        !           291:                        if (!io_read_flush(sess, fdin)) {
        !           292:                                ERRX1(sess, "io_read_flush");
        !           293:                                goto out;
        !           294:                        } else if (sess->mplex_read_remain == 0)
        !           295:                                pfd[0].revents &= ~POLLIN;
        !           296:                }
        !           297:
1.1       benno     298:                /*
1.8     ! florian   299:                 * If we have a request coming down off the wire, pull
        !           300:                 * it in as quickly as possible into our buffer.
        !           301:                 * This unclogs the socket buffers so the data can flow.
1.1       benno     302:                 */
                    303:
1.8     ! florian   304:                if (pfd[0].revents & POLLIN)
        !           305:                        for (;;) {
        !           306:                                if (!io_read_int(sess, fdin, &idx)) {
        !           307:                                        ERRX1(sess, "io_read_int");
        !           308:                                        goto out;
        !           309:                                }
        !           310:                                if (!send_dl_enqueue(sess,
        !           311:                                    &sdlq, idx, fl, flsz, fdin)) {
        !           312:                                        ERRX1(sess, "send_dl_enqueue");
        !           313:                                        goto out;
        !           314:                                }
        !           315:                                c = io_read_check(sess, fdin);
        !           316:                                if (c < 0) {
        !           317:                                        ERRX1(sess, "io_read_check");
        !           318:                                        goto out;
        !           319:                                } else if (c == 0)
        !           320:                                        break;
        !           321:                        }
        !           322:
        !           323:                /*
        !           324:                 * One of our local files has been opened (in response
        !           325:                 * to a receiver request) and now we can map it.
        !           326:                 * We'll respond to the event by looking at the map when
        !           327:                 * the writer is available.
        !           328:                 * Here we also enable the poll event for output.
        !           329:                 */
        !           330:
        !           331:                if (pfd[2].revents & POLLIN) {
        !           332:                        assert(up.cur != NULL);
        !           333:                        assert(up.stat.fd != -1);
        !           334:                        assert(up.stat.map == MAP_FAILED);
        !           335:                        assert(up.stat.mapsz == 0);
        !           336:
        !           337:                        if (fstat(up.stat.fd, &st) == -1) {
        !           338:                                ERR(sess, "%s: fstat", fl[up.cur->idx].path);
1.1       benno     339:                                goto out;
                    340:                        }
                    341:
1.8     ! florian   342:                        /*
        !           343:                         * If the file is zero-length, the map will
        !           344:                         * fail, but either way we want to unset that
        !           345:                         * we're waiting for the file to open.
        !           346:                         * We'll close the descriptor after processing.
        !           347:                         */
        !           348:
        !           349:                        if ((up.stat.mapsz = st.st_size) > 0) {
        !           350:                                up.stat.map = mmap(NULL, up.stat.mapsz,
        !           351:                                        PROT_READ, MAP_SHARED, up.stat.fd, 0);
        !           352:                                if (up.stat.map == MAP_FAILED) {
        !           353:                                        ERR(sess, "%s: mmap", fl[up.cur->idx].path);
        !           354:                                        goto out;
        !           355:                                }
        !           356:                        }
        !           357:                        pfd[2].fd = -1;
        !           358:                        pfd[1].fd = fdout;
        !           359:                }
        !           360:
        !           361:                /*
        !           362:                 * Our outbound is ready to process the current event.
        !           363:                 * This means we've already opened the file and possibly
        !           364:                 * mapped it, and we're ready to send blocks.
        !           365:                 * Do this one block at a time lest we block the channel
        !           366:                 * while read events are coming in.
        !           367:                 */
1.1       benno     368:
1.8     ! florian   369:                if (pfd[1].revents & POLLOUT) {
        !           370:                        assert(up.cur != NULL);
        !           371:                        assert(pfd[2].fd == -1);
        !           372:
        !           373:                        /*
        !           374:                         * If we receive an invalid index (-1), then we're
        !           375:                         * either promoted to the second phase or it's time to
        !           376:                         * exit, depending upon which phase we're in.
        !           377:                         * Otherwise, we either start a transfer
        !           378:                         * sequence (if not primed) or continue one.
        !           379:                         */
        !           380:
        !           381:                        if (up.cur->idx < 0) {
        !           382:                                pfd[1].fd = -1;
        !           383:                                send_up_reset(&up);
        !           384:                                if (!io_write_int(sess, fdout, -1)) {
1.1       benno     385:                                        ERRX1(sess, "io_write_int");
                    386:                                        goto out;
                    387:                                }
                    388:
1.8     ! florian   389:                                /* Send superfluous ack. */
        !           390:
        !           391:                                if (sess->opts->server && sess->rver > 27 &&
        !           392:                                    !io_write_int(sess, fdout, -1)) {
        !           393:                                        ERRX1(sess, "io_write_int");
        !           394:                                        goto out;
        !           395:                                }
        !           396:
        !           397:                                if (phase++)
        !           398:                                        break;
        !           399:                        } else if (0 == up.primed) {
        !           400:                                if (!sess->opts->server)
        !           401:                                        LOG1(sess, "%s", fl[up.cur->idx].wpath);
1.1       benno     402:
1.8     ! florian   403:                                /* Dry-running does nothing but a response. */
1.1       benno     404:
1.8     ! florian   405:                                if (sess->opts->dry_run &&
        !           406:                                    !io_write_int(sess, fdout, up.cur->idx)) {
        !           407:                                        ERRX1(sess, "io_write_int");
        !           408:                                        goto out;
        !           409:                                }
1.1       benno     410:
1.8     ! florian   411:                                /* Actually perform the block send. */
1.1       benno     412:
1.8     ! florian   413:                                assert(up.stat.fd != -1);
        !           414:                                if (!blk_recv_ack(sess, fdout,
        !           415:                                    up.cur->blks, up.cur->idx)) {
        !           416:                                        ERRX1(sess, "blk_recv_ack");
        !           417:                                        goto out;
        !           418:                                }
        !           419:                                up.primed = 1;
        !           420:                        } else {
        !           421:                                assert(up.stat.fd != -1);
        !           422:                                c = blk_match(sess, fdout, up.cur->blks,
        !           423:                                        fl[up.cur->idx].path, &up.stat);
        !           424:                                if (c < 0) {
        !           425:                                        ERRX1(sess, "blk_match");
        !           426:                                        goto out;
        !           427:                                } else if (c > 0) {
        !           428:                                        send_up_reset(&up);
        !           429:                                        pfd[1].fd = -1;
        !           430:                                }
1.1       benno     431:                        }
                    432:                }
                    433:
                    434:                /*
1.8     ! florian   435:                 * Incoming queue management.
        !           436:                 * If we have no queue component that we're waiting on,
        !           437:                 * then pull off the receiver-request queue and start
        !           438:                 * processing the request.
1.1       benno     439:                 */
                    440:
1.8     ! florian   441:                if (up.cur == NULL) {
        !           442:                        assert(pfd[2].fd == -1);
        !           443:                        assert(up.stat.fd == -1);
        !           444:                        assert(up.stat.map == MAP_FAILED);
        !           445:                        assert(up.stat.mapsz == 0);
        !           446:
        !           447:                        if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
        !           448:                                continue;
        !           449:                        TAILQ_REMOVE(&sdlq, up.cur, entries);
        !           450:
        !           451:                        /* End of phase: enable channel to receiver. */
        !           452:
        !           453:                        if (up.cur->idx == -1) {
        !           454:                                pfd[1].fd = fdout;
        !           455:                                continue;
        !           456:                        }
        !           457:
        !           458:                        /* Non-blocking open of file. */
1.1       benno     459:
1.8     ! florian   460:                        up.stat.fd = open(fl[up.cur->idx].path,
        !           461:                                O_RDONLY|O_NONBLOCK, 0);
        !           462:                        if (up.stat.fd == -1) {
        !           463:                                ERR(sess, "%s: open", fl[up.cur->idx].path);
        !           464:                                goto out;
        !           465:                        }
        !           466:                        pfd[2].fd = up.stat.fd;
1.1       benno     467:                }
                    468:        }
                    469:
1.4       deraadt   470:        if (!sess_stats_send(sess, fdout)) {
1.1       benno     471:                ERRX1(sess, "sess_stats_end");
                    472:                goto out;
                    473:        }
                    474:
                    475:        /* Final "goodbye" message. */
                    476:
1.4       deraadt   477:        if (!io_read_int(sess, fdin, &idx)) {
1.1       benno     478:                ERRX1(sess, "io_read_int");
                    479:                goto out;
1.5       deraadt   480:        } else if (idx != -1) {
1.1       benno     481:                ERRX(sess, "read incorrect update complete ack");
                    482:                goto out;
                    483:        }
                    484:
                    485:        LOG2(sess, "sender finished updating");
                    486:        rc = 1;
                    487: out:
                    488:        flist_free(fl, flsz);
                    489:        return rc;
                    490: }