version 1.8, 2019/02/16 16:57:17 |
version 1.9, 2019/02/16 16:58:39 |
|
|
#include <string.h> |
#include <string.h> |
#include <unistd.h> |
#include <unistd.h> |
|
|
|
#include <openssl/md4.h> |
|
|
#include "extern.h" |
#include "extern.h" |
|
|
/* |
/* |
|
|
|
|
p->stat.offs = 0; |
p->stat.offs = 0; |
p->stat.hint = 0; |
p->stat.hint = 0; |
|
p->stat.curst = BLKSTAT_NONE; |
p->primed = 0; |
p->primed = 0; |
} |
} |
|
|
|
|
rsync_sender(struct sess *sess, int fdin, |
rsync_sender(struct sess *sess, int fdin, |
int fdout, size_t argc, char **argv) |
int fdout, size_t argc, char **argv) |
{ |
{ |
struct flist *fl = NULL; |
struct flist *fl = NULL; |
size_t i, flsz = 0, phase = 0, excl; |
const struct flist *f; |
int rc = 0, c; |
size_t i, flsz = 0, phase = 0, excl; |
int32_t idx; |
off_t sz; |
struct pollfd pfd[3]; |
int rc = 0, c; |
struct send_dlq sdlq; |
int32_t idx; |
struct send_up up; |
struct pollfd pfd[3]; |
struct stat st; |
struct send_dlq sdlq; |
|
struct send_dl *dl; |
|
struct send_up up; |
|
struct stat st; |
|
unsigned char filemd[MD4_DIGEST_LENGTH]; |
|
void *wbuf = NULL; |
|
size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; |
|
ssize_t ssz; |
|
|
if (pledge("stdio getpw rpath unveil", NULL) == -1) { |
if (pledge("stdio getpw rpath unveil", NULL) == -1) { |
ERR(sess, "pledge"); |
ERR(sess, "pledge"); |
|
|
pfd[2].fd = -1; /* from local file */ |
pfd[2].fd = -1; /* from local file */ |
pfd[2].events = POLLIN; |
pfd[2].events = POLLIN; |
|
|
/* The main sender loop runs into phase == 2. */ |
|
|
|
for (;;) { |
for (;;) { |
assert(pfd[0].fd != -1); |
assert(pfd[0].fd != -1); |
if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) { |
if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) { |
|
|
ERRX(sess, "poll: timeout"); |
ERRX(sess, "poll: timeout"); |
goto out; |
goto out; |
} |
} |
|
|
for (i = 0; i < 3; i++) |
for (i = 0; i < 3; i++) |
if (pfd[i].revents & (POLLERR|POLLNVAL)) { |
if (pfd[i].revents & (POLLERR|POLLNVAL)) { |
ERRX(sess, "poll: bad fd"); |
ERRX(sess, "poll: bad fd"); |
|
|
goto out; |
goto out; |
} |
} |
|
|
/* |
|
* Flush out multiplexed messages. |
|
* These might otherwise clog the reader. |
|
*/ |
|
|
|
if (sess->mplex_reads && |
|
(POLLIN & pfd[0].revents)) { |
|
if (!io_read_flush(sess, fdin)) { |
|
ERRX1(sess, "io_read_flush"); |
|
goto out; |
|
} else if (sess->mplex_read_remain == 0) |
|
pfd[0].revents &= ~POLLIN; |
|
} |
|
|
|
/* |
/* |
* If we have a request coming down off the wire, pull |
* If we have a request coming down off the wire, pull |
* it in as quickly as possible into our buffer. |
* it in as quickly as possible into our buffer. |
* This unclogs the socket buffers so the data can flow. |
* This unclogs the socket buffers so the data can flow. |
|
* FIXME: if we're multiplexing, we might stall here if |
|
* there's only a log message and no actual data. |
|
* This can be fixed by doing a conditional test. |
*/ |
*/ |
|
|
if (pfd[0].revents & POLLIN) |
if (pfd[0].revents & POLLIN) |
|
|
} |
} |
|
|
/* |
/* |
* One of our local files has been opened (in response |
* One of our local files has been opened in response |
* to a receiver request) and now we can map it. |
* to a receiver request and now we can map it. |
* We'll respond to the event by looking at the map when |
* We'll respond to the event by looking at the map when |
* the writer is available. |
* the writer is available. |
* Here we also enable the poll event for output. |
* Here we also enable the poll event for output. |
|
|
assert(up.stat.fd != -1); |
assert(up.stat.fd != -1); |
assert(up.stat.map == MAP_FAILED); |
assert(up.stat.map == MAP_FAILED); |
assert(up.stat.mapsz == 0); |
assert(up.stat.mapsz == 0); |
|
f = &fl[up.cur->idx]; |
|
|
if (fstat(up.stat.fd, &st) == -1) { |
if (fstat(up.stat.fd, &st) == -1) { |
ERR(sess, "%s: fstat", fl[up.cur->idx].path); |
ERR(sess, "%s: fstat", f->path); |
goto out; |
goto out; |
} |
} |
|
|
/* |
/* |
* If the file is zero-length, the map will |
* If the file is zero-length, the map will |
* fail, but either way we want to unset that |
* fail, but either way we want to unset that |
* we're waiting for the file to open. |
* we're waiting for the file to open and set |
* We'll close the descriptor after processing. |
* that we're ready for the output channel. |
*/ |
*/ |
|
|
if ((up.stat.mapsz = st.st_size) > 0) { |
if ((up.stat.mapsz = st.st_size) > 0) { |
up.stat.map = mmap(NULL, up.stat.mapsz, |
up.stat.map = mmap(NULL, |
PROT_READ, MAP_SHARED, up.stat.fd, 0); |
up.stat.mapsz, PROT_READ, |
|
MAP_SHARED, up.stat.fd, 0); |
if (up.stat.map == MAP_FAILED) { |
if (up.stat.map == MAP_FAILED) { |
ERR(sess, "%s: mmap", fl[up.cur->idx].path); |
ERR(sess, "%s: mmap", f->path); |
goto out; |
goto out; |
} |
} |
} |
} |
|
|
pfd[2].fd = -1; |
pfd[2].fd = -1; |
pfd[1].fd = fdout; |
pfd[1].fd = fdout; |
} |
} |
|
|
/* |
/* |
* Our outbound is ready to process the current event. |
* If we have buffers waiting to write, write them out |
* This means we've already opened the file and possibly |
* as soon as we can in a non-blocking fashion. |
* mapped it, and we're ready to send blocks. |
* We must not be waiting for any local files. |
* Do this one block at a time lest we block the channel |
* ALL WRITES MUST HAPPEN HERE. |
* while read events are coming in. |
* This keeps the sender deadlock-free. |
*/ |
*/ |
|
|
|
if ((pfd[1].revents & POLLOUT) && wbufsz > 0) { |
|
assert(pfd[2].fd == -1); |
|
assert(wbufsz - wbufpos); |
|
ssz = write(fdout, |
|
wbuf + wbufpos, wbufsz - wbufpos); |
|
if (ssz < 0) { |
|
ERR(sess, "write"); |
|
goto out; |
|
} |
|
wbufpos += ssz; |
|
if (wbufpos == wbufsz) |
|
wbufpos = wbufsz = 0; |
|
pfd[1].revents &= ~POLLOUT; |
|
|
|
/* This is usually in io.c... */ |
|
|
|
sess->total_write += ssz; |
|
} |
|
|
if (pfd[1].revents & POLLOUT) { |
if (pfd[1].revents & POLLOUT) { |
assert(up.cur != NULL); |
|
assert(pfd[2].fd == -1); |
assert(pfd[2].fd == -1); |
|
assert(0 == wbufpos && 0 == wbufsz); |
|
|
/* |
/* |
* If we receive an invalid index (-1), then we're |
* If we have data to write, do it now according |
* either promoted to the second phase or it's time to |
* to the data finite state machine. |
* exit, depending upon which phase we're in. |
* If we receive an invalid index (-1), then |
|
* we're either promoted to the second phase or |
|
* it's time to exit, depending upon which phase |
|
* we're in. |
* Otherwise, we either start a transfer |
* Otherwise, we either start a transfer |
* sequence (if not primed) or continue one. |
* sequence (if not primed) or continue one. |
*/ |
*/ |
|
|
if (up.cur->idx < 0) { |
pos = 0; |
pfd[1].fd = -1; |
if (BLKSTAT_DATA == up.stat.curst) { |
|
/* |
|
* A data segment to be written: buffer |
|
* both the length and the data, then |
|
* put is in the token phase. |
|
*/ |
|
|
|
sz = MIN(MAX_CHUNK, |
|
up.stat.curlen - up.stat.curpos); |
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, sizeof(int32_t))) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_int(sess, |
|
wbuf, &pos, wbufsz, sz); |
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, sz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, |
|
up.stat.map + up.stat.curpos, sz); |
|
up.stat.curpos += sz; |
|
if (up.stat.curpos == up.stat.curlen) |
|
up.stat.curst = BLKSTAT_TOK; |
|
} else if (BLKSTAT_TOK == up.stat.curst) { |
|
/* |
|
* The data token following (maybe) a |
|
* data segment. |
|
* These can also come standalone if, |
|
* say, the file's being fully written. |
|
* It's followed by a hash or another |
|
* data segment, depending on the token. |
|
*/ |
|
|
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, sizeof(int32_t))) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_int(sess, wbuf, |
|
&pos, wbufsz, up.stat.curtok); |
|
up.stat.curst = up.stat.curtok ? |
|
BLKSTAT_NONE : BLKSTAT_HASH; |
|
} else if (BLKSTAT_HASH == up.stat.curst) { |
|
/* |
|
* The hash following transmission of |
|
* all file contents. |
|
* This is always followed by the state |
|
* that we're finished with the file. |
|
*/ |
|
|
|
hash_file(up.stat.map, |
|
up.stat.mapsz, filemd, sess); |
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_buf(sess, wbuf, &pos, |
|
wbufsz, filemd, MD4_DIGEST_LENGTH); |
|
up.stat.curst = BLKSTAT_DONE; |
|
} else if (BLKSTAT_DONE == up.stat.curst) { |
|
/* |
|
* The data has been written. |
|
* Clear our current send file and allow |
|
* the block below to find another. |
|
*/ |
|
|
|
LOG3(sess, "%s: flushed %jd KB total, " |
|
"%.2f%% uploaded", |
|
fl[up.cur->idx].path, |
|
(intmax_t)up.stat.total / 1024, |
|
100.0 * up.stat.dirty / up.stat.total); |
send_up_reset(&up); |
send_up_reset(&up); |
|
} else if (NULL != up.cur && up.cur->idx < 0) { |
|
/* |
|
* We've hit the phase change following |
|
* the last file (or start, or prior |
|
* phase change). |
|
* Simply acknowledge it. |
|
* FIXME: use buffering. |
|
*/ |
|
|
if (!io_write_int(sess, fdout, -1)) { |
if (!io_write_int(sess, fdout, -1)) { |
ERRX1(sess, "io_write_int"); |
ERRX1(sess, "io_write_int"); |
goto out; |
goto out; |
} |
} |
|
|
/* Send superfluous ack. */ |
|
|
|
if (sess->opts->server && sess->rver > 27 && |
if (sess->opts->server && sess->rver > 27 && |
!io_write_int(sess, fdout, -1)) { |
!io_write_int(sess, fdout, -1)) { |
ERRX1(sess, "io_write_int"); |
ERRX1(sess, "io_write_int"); |
goto out; |
goto out; |
} |
} |
|
send_up_reset(&up); |
|
|
|
/* |
|
* This is where we actually stop the |
|
* algorithm: we're already at the |
|
* second phase. |
|
*/ |
|
|
if (phase++) |
if (phase++) |
break; |
break; |
} else if (0 == up.primed) { |
} else if (NULL != up.cur && 0 == up.primed) { |
|
/* |
|
* We're getting ready to send the file |
|
* contents to the receiver. |
|
* FIXME: use buffering. |
|
*/ |
|
|
if (!sess->opts->server) |
if (!sess->opts->server) |
LOG1(sess, "%s", fl[up.cur->idx].wpath); |
LOG1(sess, "%s", fl[up.cur->idx].wpath); |
|
|
|
|
ERRX1(sess, "blk_recv_ack"); |
ERRX1(sess, "blk_recv_ack"); |
goto out; |
goto out; |
} |
} |
|
LOG3(sess, "%s: primed for %jd B total", |
|
fl[up.cur->idx].path, |
|
(intmax_t)up.cur->blks->size); |
up.primed = 1; |
up.primed = 1; |
} else { |
} else if (NULL != up.cur) { |
|
/* |
|
* Our last case: we need to find the |
|
* next block (and token) to transmit to |
|
* the receiver. |
|
* These will drive the finite state |
|
* machine in the first few conditional |
|
* blocks of this set. |
|
*/ |
|
|
assert(up.stat.fd != -1); |
assert(up.stat.fd != -1); |
c = blk_match(sess, fdout, up.cur->blks, |
blk_match(sess, up.cur->blks, |
fl[up.cur->idx].path, &up.stat); |
fl[up.cur->idx].path, &up.stat); |
if (c < 0) { |
|
ERRX1(sess, "blk_match"); |
|
goto out; |
|
} else if (c > 0) { |
|
send_up_reset(&up); |
|
pfd[1].fd = -1; |
|
} |
|
} |
} |
} |
} |
|
|
|
|
assert(up.stat.fd == -1); |
assert(up.stat.fd == -1); |
assert(up.stat.map == MAP_FAILED); |
assert(up.stat.map == MAP_FAILED); |
assert(up.stat.mapsz == 0); |
assert(up.stat.mapsz == 0); |
|
assert(wbufsz == 0 && wbufpos == 0); |
|
pfd[1].fd = -1; |
|
|
|
/* |
|
* If there's nothing in the queue, then keep |
|
* the output channel disabled and wait for |
|
* whatever comes next from the reader. |
|
*/ |
|
|
if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL) |
if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL) |
continue; |
continue; |
|
|
TAILQ_REMOVE(&sdlq, up.cur, entries); |
TAILQ_REMOVE(&sdlq, up.cur, entries); |
|
|
/* End of phase: enable channel to receiver. */ |
/* |
|
* End of phase: enable channel to receiver. |
|
* We'll need our output buffer enabled in order |
|
* to process this event. |
|
*/ |
|
|
if (up.cur->idx == -1) { |
if (up.cur->idx == -1) { |
pfd[1].fd = fdout; |
pfd[1].fd = fdout; |
continue; |
continue; |
} |
} |
|
|
/* Non-blocking open of file. */ |
/* |
|
* Non-blocking open of file. |
|
* This will be picked up in the state machine |
|
* block of not being primed. |
|
*/ |
|
|
up.stat.fd = open(fl[up.cur->idx].path, |
up.stat.fd = open(fl[up.cur->idx].path, |
O_RDONLY|O_NONBLOCK, 0); |
O_RDONLY|O_NONBLOCK, 0); |
if (up.stat.fd == -1) { |
if (up.stat.fd == -1) { |
ERR(sess, "%s: open", fl[up.cur->idx].path); |
ERR(sess, "%s: open", fl[up.cur->idx].path); |
|
|
} |
} |
} |
} |
|
|
|
if (!TAILQ_EMPTY(&sdlq)) { |
|
ERRX(sess, "phases complete with files still queued"); |
|
goto out; |
|
} |
|
|
if (!sess_stats_send(sess, fdout)) { |
if (!sess_stats_send(sess, fdout)) { |
ERRX1(sess, "sess_stats_end"); |
ERRX1(sess, "sess_stats_end"); |
goto out; |
goto out; |
|
|
LOG2(sess, "sender finished updating"); |
LOG2(sess, "sender finished updating"); |
rc = 1; |
rc = 1; |
out: |
out: |
|
send_up_reset(&up); |
|
while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { |
|
free(dl->blks); |
|
free(dl); |
|
} |
flist_free(fl, flsz); |
flist_free(fl, flsz); |
|
free(wbuf); |
return rc; |
return rc; |
} |
} |