[BACK]Return to sender.c CVS log [TXT][DIR] Up to [local] / src / usr.bin / rsync

Annotation of src/usr.bin/rsync/sender.c, Revision 1.29

1.29    ! claudio     1: /*     $OpenBSD: sender.c,v 1.28 2021/04/05 18:17:37 deraadt Exp $ */
1.1       benno       2: /*
                      3:  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
                      4:  *
                      5:  * Permission to use, copy, modify, and distribute this software for any
                      6:  * purpose with or without fee is hereby granted, provided that the above
                      7:  * copyright notice and this permission notice appear in all copies.
                      8:  *
                      9:  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
                     10:  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
                     11:  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
                     12:  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
                     13:  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
                     14:  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
                     15:  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
                     16:  */
1.8       florian    17: #include <sys/mman.h>
                     18: #include <sys/queue.h>
1.1       benno      19: #include <sys/stat.h>
                     20:
                     21: #include <assert.h>
1.8       florian    22: #include <fcntl.h>
1.1       benno      23: #include <inttypes.h>
1.8       florian    24: #include <poll.h>
1.1       benno      25: #include <stdlib.h>
                     26: #include <string.h>
                     27: #include <unistd.h>
                     28:
1.9       florian    29: #include <openssl/md4.h>
                     30:
1.1       benno      31: #include "extern.h"
                     32:
                     33: /*
1.8       florian    34:  * A request from the receiver to download updated file data.
                     35:  */
                     36: struct send_dl {
1.21      deraadt    37:        int32_t                  idx; /* index in our file list */
                     38:        struct blkset           *blks; /* the sender's block information */
                     39:        TAILQ_ENTRY(send_dl)     entries;
1.8       florian    40: };
                     41:
                     42: /*
                     43:  * The current file being "updated": sent from sender to receiver.
                     44:  * If there is no file being uploaded, "cur" is NULL.
                     45:  */
                     46: struct send_up {
                     47:        struct send_dl  *cur; /* file being updated or NULL */
                     48:        struct blkstat   stat; /* status of file being updated */
                     49: };
                     50:
                     51: TAILQ_HEAD(send_dlq, send_dl);
                     52:
                     53: /*
                     54:  * We have finished updating the receiver's file with sender data.
                     55:  * Deallocate and wipe clean all resources required for that.
                     56:  */
                     57: static void
                     58: send_up_reset(struct send_up *p)
                     59: {
                     60:
1.13      deraadt    61:        assert(p != NULL);
1.8       florian    62:
                     63:        /* Free the download request, if applicable. */
                     64:
                     65:        if (p->cur != NULL) {
                     66:                free(p->cur->blks);
                     67:                free(p->cur);
                     68:                p->cur = NULL;
                     69:        }
                     70:
                     71:        /* If we mapped a file for scanning, unmap it and close. */
                     72:
                     73:        if (p->stat.map != MAP_FAILED)
                     74:                munmap(p->stat.map, p->stat.mapsz);
                     75:
                     76:        p->stat.map = MAP_FAILED;
                     77:        p->stat.mapsz = 0;
                     78:
                     79:        if (p->stat.fd != -1)
                     80:                close(p->stat.fd);
                     81:
                     82:        p->stat.fd = -1;
                     83:
                     84:        /* Now clear the in-transfer information. */
                     85:
                     86:        p->stat.offs = 0;
                     87:        p->stat.hint = 0;
1.9       florian    88:        p->stat.curst = BLKSTAT_NONE;
1.16      benno      89: }
                     90:
                     91: /*
                     92:  * This is the bulk of the sender work.
                     93:  * Here we tend to an output buffer that responds to receiver requests
                     94:  * for data.
                     95:  * This does not act upon the output descriptor itself so as to avoid
                     96:  * blocking, which otherwise would deadlock the protocol.
                     97:  * Returns zero on failure, non-zero on success.
                     98:  */
                     99: static int
                    100: send_up_fsm(struct sess *sess, size_t *phase,
                    101:        struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,
                    102:        const struct flist *fl)
                    103: {
                    104:        size_t           pos = 0, isz = sizeof(int32_t),
                    105:                         dsz = MD4_DIGEST_LENGTH;
                    106:        unsigned char    fmd[MD4_DIGEST_LENGTH];
                    107:        off_t            sz;
                    108:        char             buf[20];
                    109:
                    110:        switch (up->stat.curst) {
                    111:        case BLKSTAT_DATA:
                    112:                /*
                    113:                 * A data segment to be written: buffer both the length
                    114:                 * and the data.
                    115:                 * If we've finished the transfer, move on to the token;
                    116:                 * otherwise, keep sending data.
                    117:                 */
                    118:
                    119:                sz = MINIMUM(MAX_CHUNK,
                    120:                        up->stat.curlen - up->stat.curpos);
                    121:                if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
1.22      benno     122:                        ERRX1("io_lowbuffer_alloc");
1.16      benno     123:                        return 0;
                    124:                }
                    125:                io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);
                    126:                if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {
1.22      benno     127:                        ERRX1("io_lowbuffer_alloc");
1.16      benno     128:                        return 0;
                    129:                }
                    130:                io_lowbuffer_buf(sess, *wb, &pos, *wbsz,
                    131:                        up->stat.map + up->stat.curpos, sz);
                    132:
                    133:                up->stat.curpos += sz;
                    134:                if (up->stat.curpos == up->stat.curlen)
                    135:                        up->stat.curst = BLKSTAT_TOK;
                    136:                return 1;
                    137:        case BLKSTAT_TOK:
                    138:                /*
                    139:                 * The data token following (maybe) a data segment.
                    140:                 * These can also come standalone if, say, the file's
                    141:                 * being fully written.
                    142:                 * It's followed by a hash or another data segment,
                    143:                 * depending on the token.
                    144:                 */
                    145:
                    146:                if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
1.22      benno     147:                        ERRX1("io_lowbuffer_alloc");
1.16      benno     148:                        return 0;
                    149:                }
                    150:                io_lowbuffer_int(sess, *wb,
                    151:                        &pos, *wbsz, up->stat.curtok);
                    152:                up->stat.curst = up->stat.curtok ?
                    153:                        BLKSTAT_NEXT : BLKSTAT_HASH;
                    154:                return 1;
                    155:        case BLKSTAT_HASH:
                    156:                /*
                    157:                 * The hash following transmission of all file contents.
                    158:                 * This is always followed by the state that we're
                    159:                 * finished with the file.
                    160:                 */
                    161:
                    162:                hash_file(up->stat.map, up->stat.mapsz, fmd, sess);
                    163:                if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {
1.22      benno     164:                        ERRX1("io_lowbuffer_alloc");
1.16      benno     165:                        return 0;
                    166:                }
                    167:                io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);
                    168:                up->stat.curst = BLKSTAT_DONE;
                    169:                return 1;
                    170:        case BLKSTAT_DONE:
                    171:                /*
                    172:                 * The data has been written.
                    173:                 * Clear our current send file and allow the block below
                    174:                 * to find another.
                    175:                 */
                    176:
1.17      benno     177:                if (!sess->opts->dry_run)
1.22      benno     178:                        LOG3("%s: flushed %jd KB total, %.2f%% uploaded",
1.20      deraadt   179:                            fl[up->cur->idx].path,
                    180:                            (intmax_t)up->stat.total / 1024,
                    181:                            100.0 * up->stat.dirty / up->stat.total);
1.16      benno     182:                send_up_reset(up);
                    183:                return 1;
                    184:        case BLKSTAT_PHASE:
                    185:                /*
                    186:                 * This is where we actually stop the algorithm: we're
                    187:                 * already at the second phase.
                    188:                 */
                    189:
                    190:                send_up_reset(up);
                    191:                (*phase)++;
                    192:                return 1;
                    193:        case BLKSTAT_NEXT:
                    194:                /*
                    195:                 * Our last case: we need to find the
                    196:                 * next block (and token) to transmit to
                    197:                 * the receiver.
                    198:                 * These will drive the finite state
                    199:                 * machine in the first few conditional
                    200:                 * blocks of this set.
                    201:                 */
                    202:
                    203:                assert(up->stat.fd != -1);
                    204:                blk_match(sess, up->cur->blks,
                    205:                        fl[up->cur->idx].path, &up->stat);
                    206:                return 1;
                    207:        case BLKSTAT_NONE:
                    208:                break;
                    209:        }
                    210:
                    211:        assert(BLKSTAT_NONE == up->stat.curst);
                    212:
                    213:        /*
                    214:         * We've either hit the phase change following the last file (or
                    215:         * start, or prior phase change), or we need to prime the next
                    216:         * file for transmission.
                    217:         * We special-case dry-run mode.
                    218:         */
                    219:
                    220:        if (up->cur->idx < 0) {
                    221:                if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
1.22      benno     222:                        ERRX1("io_lowbuffer_alloc");
1.16      benno     223:                        return 0;
                    224:                }
                    225:                io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
                    226:
                    227:                if (sess->opts->server && sess->rver > 27) {
                    228:                        if (!io_lowbuffer_alloc(sess,
                    229:                            wb, wbsz, wbmax, isz)) {
1.22      benno     230:                                ERRX1("io_lowbuffer_alloc");
1.16      benno     231:                                return 0;
                    232:                        }
                    233:                        io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
                    234:                }
                    235:                up->stat.curst = BLKSTAT_PHASE;
                    236:        } else if (sess->opts->dry_run) {
                    237:                if (!sess->opts->server)
1.22      benno     238:                        LOG1("%s", fl[up->cur->idx].wpath);
1.16      benno     239:
                    240:                if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
1.22      benno     241:                        ERRX1("io_lowbuffer_alloc");
1.16      benno     242:                        return 0;
                    243:                }
                    244:                io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);
1.17      benno     245:                up->stat.curst = BLKSTAT_DONE;
1.16      benno     246:        } else {
                    247:                assert(up->stat.fd != -1);
                    248:
                    249:                /*
                    250:                 * FIXME: use the nice output of log_file() and so on in
                    251:                 * downloader.c, which means moving this into
                    252:                 * BLKSTAT_DONE instead of having it be here.
                    253:                 */
                    254:
                    255:                if (!sess->opts->server)
1.22      benno     256:                        LOG1("%s", fl[up->cur->idx].wpath);
1.16      benno     257:
                    258:                if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {
1.22      benno     259:                        ERRX1("io_lowbuffer_alloc");
1.16      benno     260:                        return 0;
                    261:                }
                    262:                assert(sizeof(buf) == 20);
1.23      benno     263:                blk_recv_ack(buf, up->cur->blks, up->cur->idx);
1.16      benno     264:                io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);
                    265:
1.22      benno     266:                LOG3("%s: primed for %jd B total",
1.20      deraadt   267:                    fl[up->cur->idx].path, (intmax_t)up->cur->blks->size);
1.16      benno     268:                up->stat.curst = BLKSTAT_NEXT;
                    269:        }
                    270:
                    271:        return 1;
1.8       florian   272: }
                    273:
                    274: /*
                    275:  * Enqueue a download request, getting it off the read channel as
                    276:  * quickly a possible.
                    277:  * This frees up the read channel for further incoming requests.
                    278:  * We'll handle each element in turn, up to and including the last
                    279:  * request (phase change), which is always a -1 idx.
                    280:  * Returns zero on failure, non-zero on success.
                    281:  */
                    282: static int
                    283: send_dl_enqueue(struct sess *sess, struct send_dlq *q,
                    284:        int32_t idx, const struct flist *fl, size_t flsz, int fd)
                    285: {
                    286:        struct send_dl  *s;
1.15      benno     287:
1.8       florian   288:        /* End-of-phase marker. */
                    289:
                    290:        if (idx == -1) {
                    291:                if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
1.22      benno     292:                        ERR("calloc");
1.8       florian   293:                        return 0;
                    294:                }
                    295:                s->idx = -1;
                    296:                s->blks = NULL;
                    297:                TAILQ_INSERT_TAIL(q, s, entries);
                    298:                return 1;
                    299:        }
                    300:
                    301:        /* Validate the index. */
1.15      benno     302:
1.8       florian   303:        if (idx < 0 || (uint32_t)idx >= flsz) {
1.22      benno     304:                ERRX("file index out of bounds: invalid %d out of %zu",
1.20      deraadt   305:                    idx, flsz);
1.8       florian   306:                return 0;
                    307:        } else if (S_ISDIR(fl[idx].st.mode)) {
1.22      benno     308:                ERRX("blocks requested for "
1.8       florian   309:                        "directory: %s", fl[idx].path);
                    310:                return 0;
                    311:        } else if (S_ISLNK(fl[idx].st.mode)) {
1.22      benno     312:                ERRX("blocks requested for "
1.8       florian   313:                        "symlink: %s", fl[idx].path);
                    314:                return 0;
                    315:        } else if (!S_ISREG(fl[idx].st.mode)) {
1.22      benno     316:                ERRX("blocks requested for "
1.8       florian   317:                        "special: %s", fl[idx].path);
                    318:                return 0;
                    319:        }
                    320:
                    321:        if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
1.22      benno     322:                ERR("callloc");
1.8       florian   323:                return 0;
                    324:        }
                    325:        s->idx = idx;
                    326:        s->blks = NULL;
                    327:        TAILQ_INSERT_TAIL(q, s, entries);
                    328:
1.10      florian   329:        /*
1.8       florian   330:         * This blocks til the full blockset has been read.
                    331:         * That's ok, because the most important thing is getting data
                    332:         * off the wire.
                    333:         */
                    334:
                    335:        if (!sess->opts->dry_run) {
                    336:                s->blks = blk_recv(sess, fd, fl[idx].path);
                    337:                if (s->blks == NULL) {
1.22      benno     338:                        ERRX1("blk_recv");
1.8       florian   339:                        return 0;
                    340:                }
                    341:        }
                    342:        return 1;
                    343: }
                    344:
                    345: /*
1.1       benno     346:  * A client sender manages the read-only source files and sends data to
                    347:  * the receiver as requested.
                    348:  * First it sends its list of files, then it waits for the server to
                    349:  * request updates to individual files.
1.8       florian   350:  * It queues requests for updates as soon as it receives them.
1.1       benno     351:  * Returns zero on failure, non-zero on success.
                    352:  *
1.27      claudio   353:  * Pledges: stdio, getpw, rpath.
1.1       benno     354:  */
                    355: int
                    356: rsync_sender(struct sess *sess, int fdin,
                    357:        int fdout, size_t argc, char **argv)
                    358: {
1.9       florian   359:        struct flist       *fl = NULL;
                    360:        const struct flist *f;
                    361:        size_t              i, flsz = 0, phase = 0, excl;
                    362:        int                 rc = 0, c;
                    363:        int32_t             idx;
                    364:        struct pollfd       pfd[3];
                    365:        struct send_dlq     sdlq;
                    366:        struct send_dl     *dl;
                    367:        struct send_up      up;
                    368:        struct stat         st;
                    369:        void               *wbuf = NULL;
1.16      benno     370:        size_t              wbufpos = 0, wbufsz = 0, wbufmax = 0;
1.9       florian   371:        ssize_t             ssz;
1.1       benno     372:
1.27      claudio   373:        if (pledge("stdio getpw rpath", NULL) == -1) {
1.22      benno     374:                ERR("pledge");
1.1       benno     375:                return 0;
                    376:        }
                    377:
1.8       florian   378:        memset(&up, 0, sizeof(struct send_up));
                    379:        TAILQ_INIT(&sdlq);
                    380:        up.stat.fd = -1;
                    381:        up.stat.map = MAP_FAILED;
1.24      florian   382:        up.stat.blktab = blkhash_alloc();
1.8       florian   383:
1.1       benno     384:        /*
                    385:         * Generate the list of files we want to send from our
                    386:         * command-line input.
                    387:         * This will also remove all invalid files.
                    388:         */
                    389:
1.4       deraadt   390:        if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
1.22      benno     391:                ERRX1("flist_gen");
1.1       benno     392:                goto out;
                    393:        }
                    394:
                    395:        /* Client sends zero-length exclusions if deleting. */
                    396:
1.4       deraadt   397:        if (!sess->opts->server && sess->opts->del &&
1.28      deraadt   398:            !io_write_int(sess, fdout, 0)) {
1.22      benno     399:                ERRX1("io_write_int");
1.1       benno     400:                goto out;
1.2       benno     401:        }
1.1       benno     402:
1.2       benno     403:        /*
1.1       benno     404:         * Then the file list in any mode.
                    405:         * Finally, the IO error (always zero for us).
                    406:         */
1.2       benno     407:
1.4       deraadt   408:        if (!flist_send(sess, fdin, fdout, fl, flsz)) {
1.22      benno     409:                ERRX1("flist_send");
1.1       benno     410:                goto out;
1.4       deraadt   411:        } else if (!io_write_int(sess, fdout, 0)) {
1.22      benno     412:                ERRX1("io_write_int");
1.1       benno     413:                goto out;
1.2       benno     414:        }
1.1       benno     415:
                    416:        /* Exit if we're the server with zero files. */
                    417:
1.5       deraadt   418:        if (flsz == 0 && sess->opts->server) {
1.22      benno     419:                WARNX("sender has empty file list: exiting");
1.1       benno     420:                rc = 1;
                    421:                goto out;
1.4       deraadt   422:        } else if (!sess->opts->server)
1.22      benno     423:                LOG1("Transfer starting: %zu files", flsz);
1.1       benno     424:
                    425:        /*
                    426:         * If we're the server, read our exclusion list.
                    427:         * This is always 0 for now.
                    428:         */
                    429:
                    430:        if (sess->opts->server) {
1.4       deraadt   431:                if (!io_read_size(sess, fdin, &excl)) {
1.22      benno     432:                        ERRX1("io_read_size");
1.1       benno     433:                        goto out;
1.5       deraadt   434:                } else if (excl != 0) {
1.22      benno     435:                        ERRX1("exclusion list is non-empty");
1.1       benno     436:                        goto out;
                    437:                }
                    438:        }
                    439:
1.10      florian   440:        /*
1.8       florian   441:         * Set up our poll events.
                    442:         * We start by polling only in receiver requests, enabling other
                    443:         * poll events on demand.
1.1       benno     444:         */
                    445:
1.8       florian   446:        pfd[0].fd = fdin; /* from receiver */
                    447:        pfd[0].events = POLLIN;
                    448:        pfd[1].fd = -1; /* to receiver */
                    449:        pfd[1].events = POLLOUT;
                    450:        pfd[2].fd = -1; /* from local file */
                    451:        pfd[2].events = POLLIN;
                    452:
1.1       benno     453:        for (;;) {
1.8       florian   454:                assert(pfd[0].fd != -1);
1.26      claudio   455:                if ((c = poll(pfd, 3, poll_timeout)) == -1) {
1.22      benno     456:                        ERR("poll");
1.8       florian   457:                        goto out;
                    458:                } else if (c == 0) {
1.22      benno     459:                        ERRX("poll: timeout");
1.1       benno     460:                        goto out;
                    461:                }
1.8       florian   462:                for (i = 0; i < 3; i++)
                    463:                        if (pfd[i].revents & (POLLERR|POLLNVAL)) {
1.22      benno     464:                                ERRX("poll: bad fd");
1.8       florian   465:                                goto out;
                    466:                        } else if (pfd[i].revents & POLLHUP) {
1.22      benno     467:                                ERRX("poll: hangup");
1.8       florian   468:                                goto out;
                    469:                        }
                    470:
1.1       benno     471:                /*
1.8       florian   472:                 * If we have a request coming down off the wire, pull
                    473:                 * it in as quickly as possible into our buffer.
1.18      benno     474:                 * Start by seeing if we have a log message.
                    475:                 * If we do, pop it off, then see if we have anything
                    476:                 * left and hit it again if so (read priority).
1.1       benno     477:                 */
                    478:
1.19      benno     479:                if (sess->mplex_reads && (pfd[0].revents & POLLIN)) {
1.18      benno     480:                        if (!io_read_flush(sess, fdin)) {
1.22      benno     481:                                ERRX1("io_read_flush");
1.18      benno     482:                                goto out;
                    483:                        } else if (sess->mplex_read_remain == 0) {
1.23      benno     484:                                c = io_read_check(fdin);
1.8       florian   485:                                if (c < 0) {
1.22      benno     486:                                        ERRX1("io_read_check");
1.8       florian   487:                                        goto out;
1.18      benno     488:                                } else if (c > 0)
                    489:                                        continue;
                    490:                                pfd[0].revents &= ~POLLIN;
                    491:                        }
                    492:                }
                    493:
                    494:                /*
                    495:                 * Now that we've handled the log messages, we're left
                    496:                 * here if we have any actual data coming down.
                    497:                 * Enqueue message requests, then loop again if we see
                    498:                 * more data (read priority).
                    499:                 */
                    500:
                    501:                if (pfd[0].revents & POLLIN) {
                    502:                        if (!io_read_int(sess, fdin, &idx)) {
1.22      benno     503:                                ERRX1("io_read_int");
1.18      benno     504:                                goto out;
1.8       florian   505:                        }
1.18      benno     506:                        if (!send_dl_enqueue(sess,
                    507:                            &sdlq, idx, fl, flsz, fdin)) {
1.22      benno     508:                                ERRX1("send_dl_enqueue");
1.18      benno     509:                                goto out;
                    510:                        }
1.23      benno     511:                        c = io_read_check(fdin);
1.18      benno     512:                        if (c < 0) {
1.22      benno     513:                                ERRX1("io_read_check");
1.18      benno     514:                                goto out;
                    515:                        } else if (c > 0)
                    516:                                continue;
                    517:                }
1.8       florian   518:
                    519:                /*
1.9       florian   520:                 * One of our local files has been opened in response
                    521:                 * to a receiver request and now we can map it.
1.8       florian   522:                 * We'll respond to the event by looking at the map when
                    523:                 * the writer is available.
                    524:                 * Here we also enable the poll event for output.
                    525:                 */
                    526:
                    527:                if (pfd[2].revents & POLLIN) {
                    528:                        assert(up.cur != NULL);
                    529:                        assert(up.stat.fd != -1);
                    530:                        assert(up.stat.map == MAP_FAILED);
                    531:                        assert(up.stat.mapsz == 0);
1.9       florian   532:                        f = &fl[up.cur->idx];
1.8       florian   533:
                    534:                        if (fstat(up.stat.fd, &st) == -1) {
1.22      benno     535:                                ERR("%s: fstat", f->path);
1.1       benno     536:                                goto out;
                    537:                        }
                    538:
1.8       florian   539:                        /*
                    540:                         * If the file is zero-length, the map will
                    541:                         * fail, but either way we want to unset that
1.9       florian   542:                         * we're waiting for the file to open and set
                    543:                         * that we're ready for the output channel.
1.8       florian   544:                         */
                    545:
                    546:                        if ((up.stat.mapsz = st.st_size) > 0) {
1.9       florian   547:                                up.stat.map = mmap(NULL,
                    548:                                        up.stat.mapsz, PROT_READ,
                    549:                                        MAP_SHARED, up.stat.fd, 0);
1.8       florian   550:                                if (up.stat.map == MAP_FAILED) {
1.22      benno     551:                                        ERR("%s: mmap", f->path);
1.8       florian   552:                                        goto out;
                    553:                                }
1.9       florian   554:                        }
                    555:
1.8       florian   556:                        pfd[2].fd = -1;
                    557:                        pfd[1].fd = fdout;
                    558:                }
                    559:
                    560:                /*
1.9       florian   561:                 * If we have buffers waiting to write, write them out
                    562:                 * as soon as we can in a non-blocking fashion.
                    563:                 * We must not be waiting for any local files.
                    564:                 * ALL WRITES MUST HAPPEN HERE.
                    565:                 * This keeps the sender deadlock-free.
1.8       florian   566:                 */
1.1       benno     567:
1.9       florian   568:                if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
                    569:                        assert(pfd[2].fd == -1);
                    570:                        assert(wbufsz - wbufpos);
1.28      deraadt   571:                        ssz = write(fdout, wbuf + wbufpos, wbufsz - wbufpos);
1.25      deraadt   572:                        if (ssz == -1) {
1.22      benno     573:                                ERR("write");
1.9       florian   574:                                goto out;
                    575:                        }
                    576:                        wbufpos += ssz;
                    577:                        if (wbufpos == wbufsz)
                    578:                                wbufpos = wbufsz = 0;
                    579:                        pfd[1].revents &= ~POLLOUT;
                    580:
                    581:                        /* This is usually in io.c... */
                    582:
                    583:                        sess->total_write += ssz;
                    584:                }
                    585:
1.16      benno     586:                /*
                    587:                 * Engage the FSM for the current transfer.
                    588:                 * If our phase changes, stop processing.
                    589:                 */
                    590:
                    591:                if (pfd[1].revents & POLLOUT && up.cur != NULL) {
1.8       florian   592:                        assert(pfd[2].fd == -1);
1.13      deraadt   593:                        assert(wbufpos == 0 && wbufsz == 0);
1.16      benno     594:                        if (!send_up_fsm(sess, &phase,
                    595:                            &up, &wbuf, &wbufsz, &wbufmax, fl)) {
1.22      benno     596:                                ERRX1("send_up_fsm");
1.16      benno     597:                                goto out;
                    598:                        } else if (phase > 1)
                    599:                                break;
1.1       benno     600:                }
                    601:
                    602:                /*
1.8       florian   603:                 * Incoming queue management.
                    604:                 * If we have no queue component that we're waiting on,
                    605:                 * then pull off the receiver-request queue and start
                    606:                 * processing the request.
1.1       benno     607:                 */
                    608:
1.8       florian   609:                if (up.cur == NULL) {
                    610:                        assert(pfd[2].fd == -1);
                    611:                        assert(up.stat.fd == -1);
                    612:                        assert(up.stat.map == MAP_FAILED);
                    613:                        assert(up.stat.mapsz == 0);
1.9       florian   614:                        assert(wbufsz == 0 && wbufpos == 0);
                    615:                        pfd[1].fd = -1;
                    616:
                    617:                        /*
                    618:                         * If there's nothing in the queue, then keep
                    619:                         * the output channel disabled and wait for
                    620:                         * whatever comes next from the reader.
                    621:                         */
1.8       florian   622:
                    623:                        if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
                    624:                                continue;
1.24      florian   625:                        TAILQ_REMOVE(&sdlq, up.cur, entries);
                    626:
                    627:                        /* Hash our blocks. */
1.9       florian   628:
1.24      florian   629:                        blkhash_set(up.stat.blktab, up.cur->blks);
1.8       florian   630:
1.10      florian   631:                        /*
1.9       florian   632:                         * End of phase: enable channel to receiver.
                    633:                         * We'll need our output buffer enabled in order
                    634:                         * to process this event.
                    635:                         */
1.8       florian   636:
                    637:                        if (up.cur->idx == -1) {
                    638:                                pfd[1].fd = fdout;
                    639:                                continue;
                    640:                        }
1.15      benno     641:
1.10      florian   642:                        /*
1.9       florian   643:                         * Non-blocking open of file.
                    644:                         * This will be picked up in the state machine
                    645:                         * block of not being primed.
                    646:                         */
1.1       benno     647:
1.9       florian   648:                        up.stat.fd = open(fl[up.cur->idx].path,
1.8       florian   649:                                O_RDONLY|O_NONBLOCK, 0);
                    650:                        if (up.stat.fd == -1) {
1.22      benno     651:                                ERR("%s: open", fl[up.cur->idx].path);
1.8       florian   652:                                goto out;
                    653:                        }
                    654:                        pfd[2].fd = up.stat.fd;
1.1       benno     655:                }
                    656:        }
                    657:
1.9       florian   658:        if (!TAILQ_EMPTY(&sdlq)) {
1.22      benno     659:                ERRX("phases complete with files still queued");
1.9       florian   660:                goto out;
                    661:        }
                    662:
1.4       deraadt   663:        if (!sess_stats_send(sess, fdout)) {
1.22      benno     664:                ERRX1("sess_stats_end");
1.1       benno     665:                goto out;
                    666:        }
                    667:
                    668:        /* Final "goodbye" message. */
                    669:
1.4       deraadt   670:        if (!io_read_int(sess, fdin, &idx)) {
1.22      benno     671:                ERRX1("io_read_int");
1.1       benno     672:                goto out;
1.5       deraadt   673:        } else if (idx != -1) {
1.22      benno     674:                ERRX("read incorrect update complete ack");
1.1       benno     675:                goto out;
                    676:        }
                    677:
1.22      benno     678:        LOG2("sender finished updating");
1.1       benno     679:        rc = 1;
                    680: out:
1.9       florian   681:        send_up_reset(&up);
                    682:        while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
1.16      benno     683:                TAILQ_REMOVE(&sdlq, dl, entries);
1.9       florian   684:                free(dl->blks);
                    685:                free(dl);
                    686:        }
1.1       benno     687:        flist_free(fl, flsz);
1.9       florian   688:        free(wbuf);
1.24      florian   689:        blkhash_free(up.stat.blktab);
1.1       benno     690:        return rc;
                    691: }