Annotation of src/usr.bin/rsync/io.c, Revision 1.1
1.1 ! benno 1: /* $Id$ */
! 2: /*
! 3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
! 4: *
! 5: * Permission to use, copy, modify, and distribute this software for any
! 6: * purpose with or without fee is hereby granted, provided that the above
! 7: * copyright notice and this permission notice appear in all copies.
! 8: *
! 9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
! 10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
! 11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
! 12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
! 13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
! 14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
! 15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
! 16: */
! 17: #include <sys/stat.h>
! 18:
! 19: #include <assert.h>
! 20: #include <endian.h>
! 21: #include <errno.h>
! 22: #include <poll.h>
! 23: #include <stdint.h>
! 24: #include <stdio.h>
! 25: #include <stdlib.h>
! 26: #include <string.h>
! 27: #include <unistd.h>
! 28:
! 29: #include "extern.h"
! 30:
! 31: int
! 32: io_read_check(struct sess *sess, int fd)
! 33: {
! 34: struct pollfd pfd;
! 35:
! 36: pfd.fd = fd;
! 37: pfd.events = POLLIN;
! 38:
! 39: if (poll(&pfd, 1, 0) < 0) {
! 40: ERR(sess, "poll");
! 41: return -1;
! 42: }
! 43: return pfd.revents & POLLIN;
! 44: }
! 45:
! 46: /*
! 47: * Write buffer to non-blocking descriptor.
! 48: * Returns zero on failure, non-zero on success (zero or more bytes).
! 49: */
! 50: static int
! 51: io_write_nonblocking(struct sess *sess,
! 52: int fd, const void *buf, size_t bsz, size_t *sz)
! 53: {
! 54: struct pollfd pfd;
! 55: ssize_t wsz;
! 56:
! 57: *sz = 0;
! 58:
! 59: if (0 == bsz)
! 60: return 1;
! 61:
! 62: pfd.fd = fd;
! 63: pfd.events = POLLOUT;
! 64:
! 65: if (poll(&pfd, 1, INFTIM) < 0) {
! 66: ERR(sess, "poll");
! 67: return 0;
! 68: }
! 69: if ((pfd.revents & (POLLERR|POLLNVAL))) {
! 70: ERRX(sess, "poll: bad fd");
! 71: return 0;
! 72: } else if ((pfd.revents & POLLHUP)) {
! 73: ERRX(sess, "poll: hangup");
! 74: return 0;
! 75: } else if ( ! (pfd.revents & POLLOUT)) {
! 76: ERRX(sess, "poll: unknown event");
! 77: return 0;
! 78: }
! 79:
! 80: if ((wsz = write(fd, buf, bsz)) < 0) {
! 81: ERR(sess, "write");
! 82: return 0;
! 83: }
! 84:
! 85: *sz = wsz;
! 86: return 1;
! 87: }
! 88:
! 89: /*
! 90: * Blocking write of the full size of the buffer.
! 91: * Returns 0 on failure, non-zero on success (all bytes written).
! 92: */
! 93: static int
! 94: io_write_blocking(struct sess *sess,
! 95: int fd, const void *buf, size_t sz)
! 96: {
! 97: size_t wsz;
! 98: int c;
! 99:
! 100: while (sz > 0) {
! 101: c = io_write_nonblocking(sess, fd, buf, sz, &wsz);
! 102: if ( ! c) {
! 103: ERRX1(sess, "io_write_nonblocking");
! 104: return 0;
! 105: } else if (0 == wsz) {
! 106: ERRX(sess, "io_write_nonblocking: short write");
! 107: return 0;
! 108: }
! 109: buf += wsz;
! 110: sz -= wsz;
! 111: }
! 112:
! 113: return 1;
! 114: }
! 115:
! 116: /*
! 117: * Write "buf" of size "sz" to non-blocking descriptor.
! 118: * Returns zero on failure, non-zero on success (all bytes written to
! 119: * the descriptor).
! 120: */
! 121: int
! 122: io_write_buf(struct sess *sess, int fd, const void *buf, size_t sz)
! 123: {
! 124: int32_t tag, tagbuf;
! 125: size_t wsz;
! 126: int c;
! 127:
! 128: if ( ! sess->mplex_writes) {
! 129: c = io_write_blocking(sess, fd, buf, sz);
! 130: sess->total_write += sz;
! 131: return c;
! 132: }
! 133:
! 134: while (sz > 0) {
! 135: wsz = sz & 0xFFFFFF;
! 136: tag = (7 << 24) + wsz;
! 137: tagbuf = htole32(tag);
! 138: if ( ! io_write_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
! 139: ERRX1(sess, "io_write_blocking");
! 140: return 0;
! 141: }
! 142: if ( ! io_write_blocking(sess, fd, buf, wsz)) {
! 143: ERRX1(sess, "io_write_blocking");
! 144: return 0;
! 145: }
! 146: sess->total_write += wsz;
! 147: sz -= wsz;
! 148: buf += wsz;
! 149: }
! 150:
! 151: return 1;
! 152: }
! 153:
! 154: /*
! 155: * Write "line" (NUL-terminated) followed by a newline.
! 156: * Returns zero on failure, non-zero on succcess.
! 157: */
! 158: int
! 159: io_write_line(struct sess *sess, int fd, const char *line)
! 160: {
! 161:
! 162: if ( ! io_write_buf(sess, fd, line, strlen(line)))
! 163: ERRX1(sess, "io_write_buf");
! 164: else if ( ! io_write_byte(sess, fd, '\n'))
! 165: ERRX1(sess, "io_write_byte");
! 166: else
! 167: return 1;
! 168:
! 169: return 0;
! 170: }
! 171:
! 172: /*
! 173: * Read buffer from non-blocking descriptor.
! 174: * Returns zero on failure, non-zero on success (zero or more bytes).
! 175: */
! 176: static int
! 177: io_read_nonblocking(struct sess *sess,
! 178: int fd, void *buf, size_t bsz, size_t *sz)
! 179: {
! 180: struct pollfd pfd;
! 181: ssize_t rsz;
! 182:
! 183: *sz = 0;
! 184:
! 185: if (0 == bsz)
! 186: return 1;
! 187:
! 188: pfd.fd = fd;
! 189: pfd.events = POLLIN;
! 190:
! 191: if (poll(&pfd, 1, INFTIM) < 0) {
! 192: ERR(sess, "poll");
! 193: return 0;
! 194: }
! 195: if ((pfd.revents & (POLLERR|POLLNVAL))) {
! 196: ERRX(sess, "poll: bad fd");
! 197: return 0;
! 198: } else if ( ! (pfd.revents & (POLLIN|POLLHUP))) {
! 199: ERRX(sess, "poll: unknown event");
! 200: return 0;
! 201: }
! 202:
! 203: if ((rsz = read(fd, buf, bsz)) < 0) {
! 204: ERR(sess, "read");
! 205: return 0;
! 206: } else if (0 == rsz) {
! 207: ERRX(sess, "unexpected end of file");
! 208: return 0;
! 209: }
! 210:
! 211: *sz = rsz;
! 212: return 1;
! 213: }
! 214:
! 215: /*
! 216: * Blocking read of the full size of the buffer.
! 217: * This can be called from either the error type message or a regular
! 218: * message---or for that matter, multiplexed or not.
! 219: * Returns 0 on failure, non-zero on success (all bytes read).
! 220: */
! 221: static int
! 222: io_read_blocking(struct sess *sess,
! 223: int fd, void *buf, size_t sz)
! 224: {
! 225: size_t rsz;
! 226: int c;
! 227:
! 228: while (sz > 0) {
! 229: c = io_read_nonblocking(sess, fd, buf, sz, &rsz);
! 230: if ( ! c) {
! 231: ERRX1(sess, "io_read_nonblocking");
! 232: return 0;
! 233: } else if (0 == rsz) {
! 234: ERRX(sess, "io_read_nonblocking: short read");
! 235: return 0;
! 236: }
! 237: buf += rsz;
! 238: sz -= rsz;
! 239: }
! 240:
! 241: return 1;
! 242: }
! 243:
! 244: /*
! 245: * When we do a lot of writes in a row (such as when the sender emits
! 246: * the file list), the server might be sending us multiplexed log
! 247: * messages.
! 248: * If it sends too many, it clogs the socket.
! 249: * This function looks into the read buffer and clears out any log
! 250: * messages pending.
! 251: * If called when there are valid data reads available, this function
! 252: * does nothing.
! 253: * Returns zero on failure, non-zero on success.
! 254: */
! 255: int
! 256: io_read_flush(struct sess *sess, int fd)
! 257: {
! 258: int32_t tagbuf, tag;
! 259: char mpbuf[1024];
! 260:
! 261: if (sess->mplex_read_remain)
! 262: return 1;
! 263:
! 264: /*
! 265: * First, read the 4-byte multiplex tag.
! 266: * The first byte is the tag identifier (7 for normal
! 267: * data, !7 for out-of-band data), the last three are
! 268: * for the remaining data size.
! 269: */
! 270:
! 271: if ( ! io_read_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
! 272: ERRX1(sess, "io_read_blocking");
! 273: return 0;
! 274: }
! 275: tag = le32toh(tagbuf);
! 276: sess->mplex_read_remain = tag & 0xFFFFFF;
! 277: tag >>= 24;
! 278: if (7 == tag)
! 279: return 1;
! 280:
! 281: tag -= 7;
! 282:
! 283: if (sess->mplex_read_remain > sizeof(mpbuf)) {
! 284: ERRX(sess, "multiplex buffer overflow");
! 285: return 0;
! 286: } else if (0 == sess->mplex_read_remain)
! 287: return 1;
! 288:
! 289: if ( ! io_read_blocking(sess, fd,
! 290: mpbuf, sess->mplex_read_remain)) {
! 291: ERRX1(sess, "io_read_blocking");
! 292: return 0;
! 293: }
! 294: if ('\n' == mpbuf[sess->mplex_read_remain - 1])
! 295: mpbuf[--sess->mplex_read_remain] = '\0';
! 296:
! 297: /*
! 298: * Always print the server's messages, as the server
! 299: * will control its own log levelling.
! 300: */
! 301:
! 302: LOG0(sess, "%.*s", (int)sess->mplex_read_remain, mpbuf);
! 303: sess->mplex_read_remain = 0;
! 304:
! 305: /*
! 306: * I only know that a tag of one means an error.
! 307: * This means that we should exit.
! 308: */
! 309:
! 310: if (1 == tag) {
! 311: ERRX1(sess, "error from remote host");
! 312: return 0;
! 313: }
! 314: return 1;
! 315: }
! 316:
! 317: /*
! 318: * Read buffer from non-blocking descriptor, possibly in multiplex read
! 319: * mode.
! 320: * Returns zero on failure, non-zero on success (all bytes read from
! 321: * the descriptor).
! 322: */
! 323: int
! 324: io_read_buf(struct sess *sess, int fd, void *buf, size_t sz)
! 325: {
! 326: size_t rsz;
! 327: int c;
! 328:
! 329: /* If we're not multiplexing, read directly. */
! 330:
! 331: if ( ! sess->mplex_reads) {
! 332: assert(0 == sess->mplex_read_remain);
! 333: c = io_read_blocking(sess, fd, buf, sz);
! 334: sess->total_read += sz;
! 335: return c;
! 336: }
! 337:
! 338: while (sz > 0) {
! 339: /*
! 340: * First, check to see if we have any regular data
! 341: * hanging around waiting to be read.
! 342: * If so, read the lesser of that data and whatever
! 343: * amount we currently want.
! 344: */
! 345:
! 346: if (sess->mplex_read_remain) {
! 347: rsz = sess->mplex_read_remain < sz ?
! 348: sess->mplex_read_remain : sz;
! 349: if ( ! io_read_blocking(sess, fd, buf, rsz)) {
! 350: ERRX1(sess, "io_read_blocking");
! 351: return 0;
! 352: }
! 353: sz -= rsz;
! 354: sess->mplex_read_remain -= rsz;
! 355: buf += rsz;
! 356: sess->total_read += rsz;
! 357: continue;
! 358: }
! 359:
! 360: assert(0 == sess->mplex_read_remain);
! 361: if ( ! io_read_flush(sess, fd)) {
! 362: ERRX1(sess, "io_read_flush");
! 363: return 0;
! 364: }
! 365: }
! 366:
! 367: return 1;
! 368: }
! 369:
! 370: int
! 371: io_write_long(struct sess *sess, int fd, int64_t val)
! 372: {
! 373: int64_t nv;
! 374:
! 375: /* Short-circuit: send as an integer if possible. */
! 376:
! 377: if (val <= INT32_MAX && val >= 0)
! 378: return io_write_int(sess, fd, (int32_t)val);
! 379:
! 380: /* Otherwise, pad with max integer, then send 64-bit. */
! 381:
! 382: nv = htole64(val);
! 383:
! 384: if ( ! io_write_int(sess, fd, INT32_MAX))
! 385: ERRX(sess, "io_write_int");
! 386: else if ( ! io_write_buf(sess, fd, &nv, sizeof(int64_t)))
! 387: ERRX(sess, "io_write_buf");
! 388: else
! 389: return 1;
! 390:
! 391: return 0;
! 392: }
! 393:
! 394: int
! 395: io_write_int(struct sess *sess, int fd, int32_t val)
! 396: {
! 397: int32_t nv;
! 398:
! 399: nv = htole32(val);
! 400:
! 401: if ( ! io_write_buf(sess, fd, &nv, sizeof(int32_t))) {
! 402: ERRX(sess, "io_write_buf");
! 403: return 0;
! 404: }
! 405: return 1;
! 406: }
! 407:
! 408: /*
! 409: * A simple assertion-protected memory copy from th einput "val" or size
! 410: * "valsz" into our buffer "buf", full size "buflen", position "bufpos".
! 411: * Increases our "bufpos" appropriately.
! 412: * This has no return value, but will assert() if the size of the buffer
! 413: * is insufficient for the new data.
! 414: */
! 415: void
! 416: io_buffer_buf(struct sess *sess, void *buf,
! 417: size_t *bufpos, size_t buflen, const void *val, size_t valsz)
! 418: {
! 419:
! 420: assert(*bufpos + valsz <= buflen);
! 421: memcpy(buf + *bufpos, val, valsz);
! 422: *bufpos += valsz;
! 423: }
! 424:
! 425: /*
! 426: * Converts "val" to LE prior to io_buffer_buf().
! 427: */
! 428: void
! 429: io_buffer_int(struct sess *sess, void *buf,
! 430: size_t *bufpos, size_t buflen, int32_t val)
! 431: {
! 432: int32_t nv = htole32(val);
! 433:
! 434: io_buffer_buf(sess, buf, bufpos,
! 435: buflen, &nv, sizeof(int32_t));
! 436: }
! 437:
! 438: int
! 439: io_read_ulong(struct sess *sess, int fd, uint64_t *val)
! 440: {
! 441: int64_t oval;
! 442:
! 443: if ( ! io_read_long(sess, fd, &oval)) {
! 444: ERRX(sess, "io_read_int");
! 445: return 0;
! 446: } else if (oval < 0) {
! 447: ERRX(sess, "io_read_size: negative value");
! 448: return 1;
! 449: }
! 450:
! 451: *val = oval;
! 452: return 1;
! 453: }
! 454:
! 455: int
! 456: io_read_long(struct sess *sess, int fd, int64_t *val)
! 457: {
! 458: int64_t oval;
! 459: int32_t sval;
! 460:
! 461: /* Start with the short-circuit: read as an int. */
! 462:
! 463: if ( ! io_read_int(sess, fd, &sval)) {
! 464: ERRX(sess, "io_read_int");
! 465: return 0;
! 466: } else if (INT32_MAX != sval) {
! 467: *val = sval;
! 468: return 1;
! 469: }
! 470:
! 471: /* If the int is maximal, read as 64 bits. */
! 472:
! 473: if ( ! io_read_buf(sess, fd, &oval, sizeof(int64_t))) {
! 474: ERRX(sess, "io_read_buf");
! 475: return 0;
! 476: }
! 477:
! 478: *val = le64toh(oval);
! 479: return 1;
! 480: }
! 481:
! 482: /*
! 483: * One thing we often need to do is read a size_t.
! 484: * These are transmitted as int32_t, so make sure that the value
! 485: * transmitted is not out of range.
! 486: * FIXME: I assume that size_t can handle int32_t's max.
! 487: */
! 488: int
! 489: io_read_size(struct sess *sess, int fd, size_t *val)
! 490: {
! 491: int32_t oval;
! 492:
! 493: if ( ! io_read_int(sess, fd, &oval)) {
! 494: ERRX(sess, "io_read_int");
! 495: return 0;
! 496: } else if (oval < 0) {
! 497: ERRX(sess, "io_read_size: negative value");
! 498: return 0;
! 499: }
! 500:
! 501: *val = oval;
! 502: return 1;
! 503: }
! 504:
! 505: int
! 506: io_read_int(struct sess *sess, int fd, int32_t *val)
! 507: {
! 508: int32_t oval;
! 509:
! 510: if ( ! io_read_buf(sess, fd, &oval, sizeof(int32_t))) {
! 511: ERRX(sess, "io_read_buf");
! 512: return 0;
! 513: }
! 514:
! 515: *val = le32toh(oval);
! 516: return 1;
! 517: }
! 518:
! 519: /*
! 520: * Copies "valsz" from "buf", full size "bufsz" at position" bufpos",
! 521: * into "val".
! 522: * Calls assert() if the source doesn't have enough data.
! 523: * Increases "bufpos" to the new position.
! 524: */
! 525: void
! 526: io_unbuffer_buf(struct sess *sess, const void *buf,
! 527: size_t *bufpos, size_t bufsz, void *val, size_t valsz)
! 528: {
! 529:
! 530: assert(*bufpos + valsz <= bufsz);
! 531: memcpy(val, buf + *bufpos, valsz);
! 532: *bufpos += valsz;
! 533: }
! 534:
! 535: /*
! 536: * Calls io_unbuffer_buf() and converts from LE.
! 537: */
! 538: void
! 539: io_unbuffer_int(struct sess *sess, const void *buf,
! 540: size_t *bufpos, size_t bufsz, int32_t *val)
! 541: {
! 542: int32_t oval;
! 543:
! 544: io_unbuffer_buf(sess, buf, bufpos,
! 545: bufsz, &oval, sizeof(int32_t));
! 546: *val = le32toh(oval);
! 547: }
! 548:
! 549: int
! 550: io_unbuffer_size(struct sess *sess, const void *buf,
! 551: size_t *bufpos, size_t bufsz, size_t *val)
! 552: {
! 553: int32_t oval;
! 554:
! 555: io_unbuffer_int(sess, buf, bufpos, bufsz, &oval);
! 556: if (oval < 0) {
! 557: ERRX(sess, "io_unbuffer_size: negative value");
! 558: return 0;
! 559: }
! 560: *val = oval;
! 561: return 1;
! 562: }
! 563:
! 564: int
! 565: io_read_byte(struct sess *sess, int fd, uint8_t *val)
! 566: {
! 567:
! 568: if ( ! io_read_buf(sess, fd, val, sizeof(uint8_t))) {
! 569: ERRX(sess, "io_read_buf");
! 570: return 0;
! 571: }
! 572: return 1;
! 573: }
! 574:
! 575: int
! 576: io_write_byte(struct sess *sess, int fd, uint8_t val)
! 577: {
! 578:
! 579: if ( ! io_write_buf(sess, fd, &val, sizeof(uint8_t))) {
! 580: ERRX(sess, "io_write_buf");
! 581: return 0;
! 582: }
! 583: return 1;
! 584: }
! 585: