version 1.14, 2019/02/18 21:34:54 |
version 1.15, 2019/02/18 21:55:27 |
|
|
* A request from the receiver to download updated file data. |
* A request from the receiver to download updated file data. |
*/ |
*/ |
struct send_dl { |
struct send_dl { |
int32_t idx; /* index in our file list */ |
int32_t idx; /* index in our file list */ |
struct blkset *blks; /* the sender's block information */ |
struct blkset *blks; /* the sender's block information */ |
TAILQ_ENTRY(send_dl) entries; |
TAILQ_ENTRY(send_dl) entries; |
}; |
}; |
|
|
struct send_up { |
struct send_up { |
struct send_dl *cur; /* file being updated or NULL */ |
struct send_dl *cur; /* file being updated or NULL */ |
struct blkstat stat; /* status of file being updated */ |
struct blkstat stat; /* status of file being updated */ |
|
int primed; /* blk_recv_ack() was called */ |
}; |
}; |
|
|
TAILQ_HEAD(send_dlq, send_dl); |
TAILQ_HEAD(send_dlq, send_dl); |
|
|
p->stat.offs = 0; |
p->stat.offs = 0; |
p->stat.hint = 0; |
p->stat.hint = 0; |
p->stat.curst = BLKSTAT_NONE; |
p->stat.curst = BLKSTAT_NONE; |
|
p->primed = 0; |
} |
} |
|
|
/* |
/* |
* This is the bulk of the sender work. |
|
* Here we tend to an output buffer that responds to receiver requests |
|
* for data. |
|
* This does not act upon the output descriptor itself so as to avoid |
|
* blocking, which otherwise would deadlock the protocol. |
|
* Returns zero on failure, non-zero on success. |
|
*/ |
|
static int |
|
send_up_fsm(struct sess *sess, size_t *phase, |
|
struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax, |
|
const struct flist *fl) |
|
{ |
|
size_t pos = 0, isz = sizeof(int32_t), |
|
dsz = MD4_DIGEST_LENGTH; |
|
unsigned char fmd[MD4_DIGEST_LENGTH]; |
|
off_t sz; |
|
char buf[20]; |
|
|
|
switch (up->stat.curst) { |
|
case BLKSTAT_DATA: |
|
/* |
|
* A data segment to be written: buffer both the length |
|
* and the data. |
|
* If we've finished the transfer, move on to the token; |
|
* otherwise, keep sending data. |
|
*/ |
|
|
|
sz = MINIMUM(MAX_CHUNK, |
|
up->stat.curlen - up->stat.curpos); |
|
if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz); |
|
if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
io_lowbuffer_buf(sess, *wb, &pos, *wbsz, |
|
up->stat.map + up->stat.curpos, sz); |
|
|
|
up->stat.curpos += sz; |
|
if (up->stat.curpos == up->stat.curlen) |
|
up->stat.curst = BLKSTAT_TOK; |
|
return 1; |
|
case BLKSTAT_TOK: |
|
/* |
|
* The data token following (maybe) a data segment. |
|
* These can also come standalone if, say, the file's |
|
* being fully written. |
|
* It's followed by a hash or another data segment, |
|
* depending on the token. |
|
*/ |
|
|
|
if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
io_lowbuffer_int(sess, *wb, |
|
&pos, *wbsz, up->stat.curtok); |
|
up->stat.curst = up->stat.curtok ? |
|
BLKSTAT_NEXT : BLKSTAT_HASH; |
|
return 1; |
|
case BLKSTAT_HASH: |
|
/* |
|
* The hash following transmission of all file contents. |
|
* This is always followed by the state that we're |
|
* finished with the file. |
|
*/ |
|
|
|
hash_file(up->stat.map, up->stat.mapsz, fmd, sess); |
|
if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz); |
|
up->stat.curst = BLKSTAT_DONE; |
|
return 1; |
|
case BLKSTAT_DONE: |
|
/* |
|
* The data has been written. |
|
* Clear our current send file and allow the block below |
|
* to find another. |
|
*/ |
|
|
|
LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded", |
|
fl[up->cur->idx].path, |
|
(intmax_t)up->stat.total / 1024, |
|
100.0 * up->stat.dirty / up->stat.total); |
|
send_up_reset(up); |
|
return 1; |
|
case BLKSTAT_PHASE: |
|
/* |
|
* This is where we actually stop the algorithm: we're |
|
* already at the second phase. |
|
*/ |
|
|
|
send_up_reset(up); |
|
(*phase)++; |
|
return 1; |
|
case BLKSTAT_NEXT: |
|
/* |
|
* Our last case: we need to find the |
|
* next block (and token) to transmit to |
|
* the receiver. |
|
* These will drive the finite state |
|
* machine in the first few conditional |
|
* blocks of this set. |
|
*/ |
|
|
|
assert(up->stat.fd != -1); |
|
blk_match(sess, up->cur->blks, |
|
fl[up->cur->idx].path, &up->stat); |
|
return 1; |
|
case BLKSTAT_NONE: |
|
break; |
|
} |
|
|
|
assert(BLKSTAT_NONE == up->stat.curst); |
|
|
|
/* |
|
* We've either hit the phase change following the last file (or |
|
* start, or prior phase change), or we need to prime the next |
|
* file for transmission. |
|
* We special-case dry-run mode. |
|
*/ |
|
|
|
if (up->cur->idx < 0) { |
|
if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); |
|
|
|
if (sess->opts->server && sess->rver > 27) { |
|
if (!io_lowbuffer_alloc(sess, |
|
wb, wbsz, wbmax, isz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); |
|
} |
|
up->stat.curst = BLKSTAT_PHASE; |
|
} else if (sess->opts->dry_run) { |
|
if (!sess->opts->server) |
|
LOG1(sess, "%s", fl[up->cur->idx].wpath); |
|
|
|
if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx); |
|
up->stat.curst = BLKSTAT_NEXT; |
|
} else { |
|
assert(up->stat.fd != -1); |
|
|
|
/* |
|
* FIXME: use the nice output of log_file() and so on in |
|
* downloader.c, which means moving this into |
|
* BLKSTAT_DONE instead of having it be here. |
|
*/ |
|
|
|
if (!sess->opts->server) |
|
LOG1(sess, "%s", fl[up->cur->idx].wpath); |
|
|
|
if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
return 0; |
|
} |
|
assert(sizeof(buf) == 20); |
|
blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx); |
|
io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20); |
|
|
|
LOG3(sess, "%s: primed for %jd B total", |
|
fl[up->cur->idx].path, |
|
(intmax_t)up->cur->blks->size); |
|
up->stat.curst = BLKSTAT_NEXT; |
|
} |
|
|
|
return 1; |
|
} |
|
|
|
/* |
|
* Enqueue a download request, getting it off the read channel as |
* Enqueue a download request, getting it off the read channel as |
* quickly a possible. |
* quickly a possible. |
* This frees up the read channel for further incoming requests. |
* This frees up the read channel for further incoming requests. |
|
|
int32_t idx, const struct flist *fl, size_t flsz, int fd) |
int32_t idx, const struct flist *fl, size_t flsz, int fd) |
{ |
{ |
struct send_dl *s; |
struct send_dl *s; |
|
|
/* End-of-phase marker. */ |
/* End-of-phase marker. */ |
|
|
if (idx == -1) { |
if (idx == -1) { |
|
|
} |
} |
|
|
/* Validate the index. */ |
/* Validate the index. */ |
|
|
if (idx < 0 || (uint32_t)idx >= flsz) { |
if (idx < 0 || (uint32_t)idx >= flsz) { |
ERRX(sess, "file index out of bounds: invalid %" |
ERRX(sess, "file index out of bounds: invalid %" |
PRId32 " out of %zu", idx, flsz); |
PRId32 " out of %zu", idx, flsz); |
|
|
struct flist *fl = NULL; |
struct flist *fl = NULL; |
const struct flist *f; |
const struct flist *f; |
size_t i, flsz = 0, phase = 0, excl; |
size_t i, flsz = 0, phase = 0, excl; |
|
off_t sz; |
int rc = 0, c; |
int rc = 0, c; |
int32_t idx; |
int32_t idx; |
struct pollfd pfd[3]; |
struct pollfd pfd[3]; |
|
|
struct send_dl *dl; |
struct send_dl *dl; |
struct send_up up; |
struct send_up up; |
struct stat st; |
struct stat st; |
|
unsigned char filemd[MD4_DIGEST_LENGTH]; |
void *wbuf = NULL; |
void *wbuf = NULL; |
size_t wbufpos = 0, wbufsz = 0, wbufmax = 0; |
size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; |
ssize_t ssz; |
ssize_t ssz; |
|
|
if (pledge("stdio getpw rpath unveil", NULL) == -1) { |
if (pledge("stdio getpw rpath unveil", NULL) == -1) { |
|
|
/* Client sends zero-length exclusions if deleting. */ |
/* Client sends zero-length exclusions if deleting. */ |
|
|
if (!sess->opts->server && sess->opts->del && |
if (!sess->opts->server && sess->opts->del && |
!io_write_int(sess, fdout, 0)) { |
!io_write_int(sess, fdout, 0)) { |
ERRX1(sess, "io_write_int"); |
ERRX1(sess, "io_write_int"); |
goto out; |
goto out; |
} |
} |
|
|
sess->total_write += ssz; |
sess->total_write += ssz; |
} |
} |
|
|
/* |
if (pfd[1].revents & POLLOUT) { |
* Engage the FSM for the current transfer. |
|
* If our phase changes, stop processing. |
|
*/ |
|
|
|
if (pfd[1].revents & POLLOUT && up.cur != NULL) { |
|
assert(pfd[2].fd == -1); |
assert(pfd[2].fd == -1); |
assert(wbufpos == 0 && wbufsz == 0); |
assert(wbufpos == 0 && wbufsz == 0); |
if (!send_up_fsm(sess, &phase, |
|
&up, &wbuf, &wbufsz, &wbufmax, fl)) { |
/* |
ERRX1(sess, "send_up_fsm"); |
* If we have data to write, do it now according |
goto out; |
* to the data finite state machine. |
} else if (phase > 1) |
* If we receive an invalid index (-1), then |
break; |
* we're either promoted to the second phase or |
|
* it's time to exit, depending upon which phase |
|
* we're in. |
|
* Otherwise, we either start a transfer |
|
* sequence (if not primed) or continue one. |
|
*/ |
|
|
|
pos = 0; |
|
if (BLKSTAT_DATA == up.stat.curst) { |
|
/* |
|
* A data segment to be written: buffer |
|
* both the length and the data, then |
|
* put is in the token phase. |
|
*/ |
|
|
|
sz = MINIMUM(MAX_CHUNK, |
|
up.stat.curlen - up.stat.curpos); |
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, sizeof(int32_t))) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_int(sess, |
|
wbuf, &pos, wbufsz, sz); |
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, sz)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, |
|
up.stat.map + up.stat.curpos, sz); |
|
up.stat.curpos += sz; |
|
if (up.stat.curpos == up.stat.curlen) |
|
up.stat.curst = BLKSTAT_TOK; |
|
} else if (BLKSTAT_TOK == up.stat.curst) { |
|
/* |
|
* The data token following (maybe) a |
|
* data segment. |
|
* These can also come standalone if, |
|
* say, the file's being fully written. |
|
* It's followed by a hash or another |
|
* data segment, depending on the token. |
|
*/ |
|
|
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, sizeof(int32_t))) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_int(sess, wbuf, |
|
&pos, wbufsz, up.stat.curtok); |
|
up.stat.curst = up.stat.curtok ? |
|
BLKSTAT_NONE : BLKSTAT_HASH; |
|
} else if (BLKSTAT_HASH == up.stat.curst) { |
|
/* |
|
* The hash following transmission of |
|
* all file contents. |
|
* This is always followed by the state |
|
* that we're finished with the file. |
|
*/ |
|
|
|
hash_file(up.stat.map, |
|
up.stat.mapsz, filemd, sess); |
|
if (!io_lowbuffer_alloc(sess, &wbuf, |
|
&wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { |
|
ERRX1(sess, "io_lowbuffer_alloc"); |
|
goto out; |
|
} |
|
io_lowbuffer_buf(sess, wbuf, &pos, |
|
wbufsz, filemd, MD4_DIGEST_LENGTH); |
|
up.stat.curst = BLKSTAT_DONE; |
|
} else if (BLKSTAT_DONE == up.stat.curst) { |
|
/* |
|
* The data has been written. |
|
* Clear our current send file and allow |
|
* the block below to find another. |
|
*/ |
|
|
|
LOG3(sess, "%s: flushed %jd KB total, " |
|
"%.2f%% uploaded", |
|
fl[up.cur->idx].path, |
|
(intmax_t)up.stat.total / 1024, |
|
100.0 * up.stat.dirty / up.stat.total); |
|
send_up_reset(&up); |
|
} else if (up.cur != NULL && up.cur->idx < 0) { |
|
/* |
|
* We've hit the phase change following |
|
* the last file (or start, or prior |
|
* phase change). |
|
* Simply acknowledge it. |
|
* FIXME: use buffering. |
|
*/ |
|
|
|
if (!io_write_int(sess, fdout, -1)) { |
|
ERRX1(sess, "io_write_int"); |
|
goto out; |
|
} |
|
if (sess->opts->server && sess->rver > 27 && |
|
!io_write_int(sess, fdout, -1)) { |
|
ERRX1(sess, "io_write_int"); |
|
goto out; |
|
} |
|
send_up_reset(&up); |
|
|
|
/* |
|
* This is where we actually stop the |
|
* algorithm: we're already at the |
|
* second phase. |
|
*/ |
|
|
|
if (phase++) |
|
break; |
|
} else if (up.cur != NULL && up.primed == 0) { |
|
/* |
|
* We're getting ready to send the file |
|
* contents to the receiver. |
|
* FIXME: use buffering. |
|
*/ |
|
|
|
if (!sess->opts->server) |
|
LOG1(sess, "%s", fl[up.cur->idx].wpath); |
|
|
|
/* Dry-running does nothing but a response. */ |
|
|
|
if (sess->opts->dry_run && |
|
!io_write_int(sess, fdout, up.cur->idx)) { |
|
ERRX1(sess, "io_write_int"); |
|
goto out; |
|
} |
|
|
|
/* Actually perform the block send. */ |
|
|
|
assert(up.stat.fd != -1); |
|
if (!blk_recv_ack(sess, fdout, |
|
up.cur->blks, up.cur->idx)) { |
|
ERRX1(sess, "blk_recv_ack"); |
|
goto out; |
|
} |
|
LOG3(sess, "%s: primed for %jd B total", |
|
fl[up.cur->idx].path, |
|
(intmax_t)up.cur->blks->size); |
|
up.primed = 1; |
|
} else if (up.cur != NULL) { |
|
/* |
|
* Our last case: we need to find the |
|
* next block (and token) to transmit to |
|
* the receiver. |
|
* These will drive the finite state |
|
* machine in the first few conditional |
|
* blocks of this set. |
|
*/ |
|
|
|
assert(up.stat.fd != -1); |
|
blk_match(sess, up.cur->blks, |
|
fl[up.cur->idx].path, &up.stat); |
|
} |
} |
} |
|
|
/* |
/* |
|
|
pfd[1].fd = fdout; |
pfd[1].fd = fdout; |
continue; |
continue; |
} |
} |
|
|
/* |
/* |
* Non-blocking open of file. |
* Non-blocking open of file. |
* This will be picked up in the state machine |
* This will be picked up in the state machine |
|
|
out: |
out: |
send_up_reset(&up); |
send_up_reset(&up); |
while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { |
while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { |
TAILQ_REMOVE(&sdlq, dl, entries); |
|
free(dl->blks); |
free(dl->blks); |
free(dl); |
free(dl); |
} |
} |