Annotation of src/usr.bin/rsync/io.c, Revision 1.5
1.5 ! deraadt 1: /* $Id: io.c,v 1.4 2019/02/11 21:41:22 deraadt Exp $ */
1.1 benno 2: /*
3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4: *
5: * Permission to use, copy, modify, and distribute this software for any
6: * purpose with or without fee is hereby granted, provided that the above
7: * copyright notice and this permission notice appear in all copies.
8: *
9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16: */
17: #include <sys/stat.h>
18:
19: #include <assert.h>
20: #include <endian.h>
21: #include <errno.h>
22: #include <poll.h>
23: #include <stdint.h>
24: #include <stdio.h>
25: #include <stdlib.h>
26: #include <string.h>
27: #include <unistd.h>
28:
29: #include "extern.h"
30:
31: int
32: io_read_check(struct sess *sess, int fd)
33: {
34: struct pollfd pfd;
35:
36: pfd.fd = fd;
37: pfd.events = POLLIN;
38:
39: if (poll(&pfd, 1, 0) < 0) {
40: ERR(sess, "poll");
41: return -1;
42: }
43: return pfd.revents & POLLIN;
44: }
45:
46: /*
47: * Write buffer to non-blocking descriptor.
48: * Returns zero on failure, non-zero on success (zero or more bytes).
49: */
50: static int
1.5 ! deraadt 51: io_write_nonblocking(struct sess *sess, int fd, const void *buf, size_t bsz,
! 52: size_t *sz)
1.1 benno 53: {
54: struct pollfd pfd;
55: ssize_t wsz;
56:
57: *sz = 0;
58:
1.4 deraadt 59: if (bsz == 0)
1.1 benno 60: return 1;
61:
62: pfd.fd = fd;
63: pfd.events = POLLOUT;
64:
65: if (poll(&pfd, 1, INFTIM) < 0) {
66: ERR(sess, "poll");
67: return 0;
68: }
69: if ((pfd.revents & (POLLERR|POLLNVAL))) {
70: ERRX(sess, "poll: bad fd");
71: return 0;
72: } else if ((pfd.revents & POLLHUP)) {
73: ERRX(sess, "poll: hangup");
74: return 0;
1.3 deraadt 75: } else if (!(pfd.revents & POLLOUT)) {
1.1 benno 76: ERRX(sess, "poll: unknown event");
77: return 0;
78: }
79:
80: if ((wsz = write(fd, buf, bsz)) < 0) {
81: ERR(sess, "write");
82: return 0;
83: }
84:
85: *sz = wsz;
86: return 1;
87: }
88:
89: /*
90: * Blocking write of the full size of the buffer.
91: * Returns 0 on failure, non-zero on success (all bytes written).
92: */
93: static int
1.5 ! deraadt 94: io_write_blocking(struct sess *sess, int fd, const void *buf, size_t sz)
1.1 benno 95: {
96: size_t wsz;
97: int c;
98:
99: while (sz > 0) {
100: c = io_write_nonblocking(sess, fd, buf, sz, &wsz);
1.3 deraadt 101: if (!c) {
1.1 benno 102: ERRX1(sess, "io_write_nonblocking");
103: return 0;
1.4 deraadt 104: } else if (wsz == 0) {
1.1 benno 105: ERRX(sess, "io_write_nonblocking: short write");
106: return 0;
107: }
108: buf += wsz;
109: sz -= wsz;
110: }
111:
112: return 1;
113: }
114:
115: /*
116: * Write "buf" of size "sz" to non-blocking descriptor.
117: * Returns zero on failure, non-zero on success (all bytes written to
118: * the descriptor).
119: */
120: int
121: io_write_buf(struct sess *sess, int fd, const void *buf, size_t sz)
122: {
123: int32_t tag, tagbuf;
124: size_t wsz;
125: int c;
126:
1.3 deraadt 127: if (!sess->mplex_writes) {
1.1 benno 128: c = io_write_blocking(sess, fd, buf, sz);
129: sess->total_write += sz;
130: return c;
131: }
132:
133: while (sz > 0) {
134: wsz = sz & 0xFFFFFF;
135: tag = (7 << 24) + wsz;
136: tagbuf = htole32(tag);
1.3 deraadt 137: if (!io_write_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
1.1 benno 138: ERRX1(sess, "io_write_blocking");
139: return 0;
140: }
1.3 deraadt 141: if (!io_write_blocking(sess, fd, buf, wsz)) {
1.1 benno 142: ERRX1(sess, "io_write_blocking");
143: return 0;
144: }
145: sess->total_write += wsz;
146: sz -= wsz;
147: buf += wsz;
148: }
149:
150: return 1;
151: }
152:
153: /*
154: * Write "line" (NUL-terminated) followed by a newline.
155: * Returns zero on failure, non-zero on succcess.
156: */
157: int
158: io_write_line(struct sess *sess, int fd, const char *line)
159: {
160:
1.3 deraadt 161: if (!io_write_buf(sess, fd, line, strlen(line)))
1.1 benno 162: ERRX1(sess, "io_write_buf");
1.3 deraadt 163: else if (!io_write_byte(sess, fd, '\n'))
1.1 benno 164: ERRX1(sess, "io_write_byte");
165: else
166: return 1;
167:
168: return 0;
169: }
170:
171: /*
172: * Read buffer from non-blocking descriptor.
173: * Returns zero on failure, non-zero on success (zero or more bytes).
174: */
175: static int
176: io_read_nonblocking(struct sess *sess,
177: int fd, void *buf, size_t bsz, size_t *sz)
178: {
179: struct pollfd pfd;
180: ssize_t rsz;
181:
182: *sz = 0;
183:
1.4 deraadt 184: if (bsz == 0)
1.1 benno 185: return 1;
186:
187: pfd.fd = fd;
188: pfd.events = POLLIN;
189:
190: if (poll(&pfd, 1, INFTIM) < 0) {
191: ERR(sess, "poll");
192: return 0;
193: }
194: if ((pfd.revents & (POLLERR|POLLNVAL))) {
195: ERRX(sess, "poll: bad fd");
196: return 0;
1.3 deraadt 197: } else if (!(pfd.revents & (POLLIN|POLLHUP))) {
1.1 benno 198: ERRX(sess, "poll: unknown event");
199: return 0;
200: }
201:
202: if ((rsz = read(fd, buf, bsz)) < 0) {
203: ERR(sess, "read");
204: return 0;
1.4 deraadt 205: } else if (rsz == 0) {
1.1 benno 206: ERRX(sess, "unexpected end of file");
207: return 0;
208: }
209:
210: *sz = rsz;
211: return 1;
212: }
213:
214: /*
215: * Blocking read of the full size of the buffer.
216: * This can be called from either the error type message or a regular
217: * message---or for that matter, multiplexed or not.
218: * Returns 0 on failure, non-zero on success (all bytes read).
219: */
220: static int
221: io_read_blocking(struct sess *sess,
222: int fd, void *buf, size_t sz)
223: {
224: size_t rsz;
225: int c;
226:
227: while (sz > 0) {
228: c = io_read_nonblocking(sess, fd, buf, sz, &rsz);
1.3 deraadt 229: if (!c) {
1.1 benno 230: ERRX1(sess, "io_read_nonblocking");
231: return 0;
1.4 deraadt 232: } else if (rsz == 0) {
1.1 benno 233: ERRX(sess, "io_read_nonblocking: short read");
234: return 0;
235: }
236: buf += rsz;
237: sz -= rsz;
238: }
239:
240: return 1;
241: }
242:
243: /*
244: * When we do a lot of writes in a row (such as when the sender emits
245: * the file list), the server might be sending us multiplexed log
246: * messages.
247: * If it sends too many, it clogs the socket.
248: * This function looks into the read buffer and clears out any log
249: * messages pending.
250: * If called when there are valid data reads available, this function
251: * does nothing.
252: * Returns zero on failure, non-zero on success.
253: */
254: int
255: io_read_flush(struct sess *sess, int fd)
256: {
257: int32_t tagbuf, tag;
258: char mpbuf[1024];
259:
260: if (sess->mplex_read_remain)
261: return 1;
262:
263: /*
264: * First, read the 4-byte multiplex tag.
265: * The first byte is the tag identifier (7 for normal
266: * data, !7 for out-of-band data), the last three are
267: * for the remaining data size.
268: */
269:
1.3 deraadt 270: if (!io_read_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
1.1 benno 271: ERRX1(sess, "io_read_blocking");
272: return 0;
273: }
274: tag = le32toh(tagbuf);
275: sess->mplex_read_remain = tag & 0xFFFFFF;
276: tag >>= 24;
1.4 deraadt 277: if (tag == 7)
1.1 benno 278: return 1;
279:
280: tag -= 7;
281:
282: if (sess->mplex_read_remain > sizeof(mpbuf)) {
283: ERRX(sess, "multiplex buffer overflow");
284: return 0;
1.4 deraadt 285: } else if (sess->mplex_read_remain == 0)
1.1 benno 286: return 1;
287:
1.3 deraadt 288: if (!io_read_blocking(sess, fd,
1.1 benno 289: mpbuf, sess->mplex_read_remain)) {
290: ERRX1(sess, "io_read_blocking");
291: return 0;
292: }
1.4 deraadt 293: if (mpbuf[sess->mplex_read_remain - 1] == '\n')
1.1 benno 294: mpbuf[--sess->mplex_read_remain] = '\0';
295:
296: /*
297: * Always print the server's messages, as the server
298: * will control its own log levelling.
299: */
300:
301: LOG0(sess, "%.*s", (int)sess->mplex_read_remain, mpbuf);
302: sess->mplex_read_remain = 0;
303:
1.2 benno 304: /*
1.1 benno 305: * I only know that a tag of one means an error.
306: * This means that we should exit.
307: */
308:
1.4 deraadt 309: if (tag == 1) {
1.1 benno 310: ERRX1(sess, "error from remote host");
311: return 0;
312: }
313: return 1;
314: }
315:
316: /*
317: * Read buffer from non-blocking descriptor, possibly in multiplex read
318: * mode.
319: * Returns zero on failure, non-zero on success (all bytes read from
320: * the descriptor).
321: */
322: int
323: io_read_buf(struct sess *sess, int fd, void *buf, size_t sz)
324: {
325: size_t rsz;
326: int c;
327:
328: /* If we're not multiplexing, read directly. */
329:
1.3 deraadt 330: if (!sess->mplex_reads) {
1.4 deraadt 331: assert(sess->mplex_read_remain == 0);
1.1 benno 332: c = io_read_blocking(sess, fd, buf, sz);
333: sess->total_read += sz;
334: return c;
335: }
336:
337: while (sz > 0) {
338: /*
339: * First, check to see if we have any regular data
340: * hanging around waiting to be read.
341: * If so, read the lesser of that data and whatever
342: * amount we currently want.
343: */
344:
345: if (sess->mplex_read_remain) {
346: rsz = sess->mplex_read_remain < sz ?
347: sess->mplex_read_remain : sz;
1.3 deraadt 348: if (!io_read_blocking(sess, fd, buf, rsz)) {
1.1 benno 349: ERRX1(sess, "io_read_blocking");
350: return 0;
351: }
352: sz -= rsz;
353: sess->mplex_read_remain -= rsz;
354: buf += rsz;
355: sess->total_read += rsz;
356: continue;
357: }
358:
1.4 deraadt 359: assert(sess->mplex_read_remain == 0);
1.3 deraadt 360: if (!io_read_flush(sess, fd)) {
1.1 benno 361: ERRX1(sess, "io_read_flush");
362: return 0;
363: }
364: }
365:
366: return 1;
367: }
368:
369: int
370: io_write_long(struct sess *sess, int fd, int64_t val)
371: {
372: int64_t nv;
373:
374: /* Short-circuit: send as an integer if possible. */
375:
376: if (val <= INT32_MAX && val >= 0)
377: return io_write_int(sess, fd, (int32_t)val);
378:
379: /* Otherwise, pad with max integer, then send 64-bit. */
380:
381: nv = htole64(val);
382:
1.3 deraadt 383: if (!io_write_int(sess, fd, INT32_MAX))
1.1 benno 384: ERRX(sess, "io_write_int");
1.3 deraadt 385: else if (!io_write_buf(sess, fd, &nv, sizeof(int64_t)))
1.1 benno 386: ERRX(sess, "io_write_buf");
387: else
388: return 1;
389:
390: return 0;
391: }
392:
393: int
394: io_write_int(struct sess *sess, int fd, int32_t val)
395: {
396: int32_t nv;
397:
398: nv = htole32(val);
399:
1.3 deraadt 400: if (!io_write_buf(sess, fd, &nv, sizeof(int32_t))) {
1.1 benno 401: ERRX(sess, "io_write_buf");
402: return 0;
403: }
404: return 1;
405: }
406:
407: /*
408: * A simple assertion-protected memory copy from th einput "val" or size
409: * "valsz" into our buffer "buf", full size "buflen", position "bufpos".
410: * Increases our "bufpos" appropriately.
411: * This has no return value, but will assert() if the size of the buffer
412: * is insufficient for the new data.
413: */
414: void
1.2 benno 415: io_buffer_buf(struct sess *sess, void *buf,
1.1 benno 416: size_t *bufpos, size_t buflen, const void *val, size_t valsz)
417: {
418:
419: assert(*bufpos + valsz <= buflen);
420: memcpy(buf + *bufpos, val, valsz);
421: *bufpos += valsz;
422: }
423:
424: /*
425: * Converts "val" to LE prior to io_buffer_buf().
426: */
427: void
1.2 benno 428: io_buffer_int(struct sess *sess, void *buf,
1.1 benno 429: size_t *bufpos, size_t buflen, int32_t val)
430: {
431: int32_t nv = htole32(val);
432:
1.4 deraadt 433: io_buffer_buf(sess, buf, bufpos, buflen, &nv, sizeof(int32_t));
1.1 benno 434: }
435:
436: int
437: io_read_ulong(struct sess *sess, int fd, uint64_t *val)
438: {
439: int64_t oval;
440:
1.3 deraadt 441: if (!io_read_long(sess, fd, &oval)) {
1.1 benno 442: ERRX(sess, "io_read_int");
443: return 0;
444: } else if (oval < 0) {
445: ERRX(sess, "io_read_size: negative value");
446: return 1;
447: }
448:
449: *val = oval;
450: return 1;
451: }
452:
453: int
454: io_read_long(struct sess *sess, int fd, int64_t *val)
455: {
456: int64_t oval;
457: int32_t sval;
458:
459: /* Start with the short-circuit: read as an int. */
460:
1.3 deraadt 461: if (!io_read_int(sess, fd, &sval)) {
1.1 benno 462: ERRX(sess, "io_read_int");
463: return 0;
1.4 deraadt 464: } else if (sval != INT32_MAX) {
1.1 benno 465: *val = sval;
466: return 1;
467: }
468:
469: /* If the int is maximal, read as 64 bits. */
470:
1.3 deraadt 471: if (!io_read_buf(sess, fd, &oval, sizeof(int64_t))) {
1.1 benno 472: ERRX(sess, "io_read_buf");
473: return 0;
474: }
475:
476: *val = le64toh(oval);
477: return 1;
478: }
479:
480: /*
481: * One thing we often need to do is read a size_t.
482: * These are transmitted as int32_t, so make sure that the value
483: * transmitted is not out of range.
484: * FIXME: I assume that size_t can handle int32_t's max.
485: */
486: int
487: io_read_size(struct sess *sess, int fd, size_t *val)
488: {
489: int32_t oval;
490:
1.3 deraadt 491: if (!io_read_int(sess, fd, &oval)) {
1.1 benno 492: ERRX(sess, "io_read_int");
493: return 0;
494: } else if (oval < 0) {
495: ERRX(sess, "io_read_size: negative value");
496: return 0;
497: }
498:
499: *val = oval;
500: return 1;
501: }
502:
503: int
504: io_read_int(struct sess *sess, int fd, int32_t *val)
505: {
506: int32_t oval;
507:
1.3 deraadt 508: if (!io_read_buf(sess, fd, &oval, sizeof(int32_t))) {
1.1 benno 509: ERRX(sess, "io_read_buf");
510: return 0;
511: }
512:
513: *val = le32toh(oval);
514: return 1;
515: }
516:
517: /*
518: * Copies "valsz" from "buf", full size "bufsz" at position" bufpos",
519: * into "val".
520: * Calls assert() if the source doesn't have enough data.
521: * Increases "bufpos" to the new position.
522: */
523: void
1.2 benno 524: io_unbuffer_buf(struct sess *sess, const void *buf,
1.1 benno 525: size_t *bufpos, size_t bufsz, void *val, size_t valsz)
526: {
527:
528: assert(*bufpos + valsz <= bufsz);
529: memcpy(val, buf + *bufpos, valsz);
530: *bufpos += valsz;
531: }
532:
533: /*
534: * Calls io_unbuffer_buf() and converts from LE.
535: */
536: void
1.2 benno 537: io_unbuffer_int(struct sess *sess, const void *buf,
1.1 benno 538: size_t *bufpos, size_t bufsz, int32_t *val)
539: {
540: int32_t oval;
541:
1.5 ! deraadt 542: io_unbuffer_buf(sess, buf, bufpos, bufsz, &oval, sizeof(int32_t));
1.1 benno 543: *val = le32toh(oval);
544: }
545:
546: int
1.2 benno 547: io_unbuffer_size(struct sess *sess, const void *buf,
1.1 benno 548: size_t *bufpos, size_t bufsz, size_t *val)
549: {
550: int32_t oval;
551:
552: io_unbuffer_int(sess, buf, bufpos, bufsz, &oval);
553: if (oval < 0) {
554: ERRX(sess, "io_unbuffer_size: negative value");
555: return 0;
556: }
557: *val = oval;
558: return 1;
559: }
560:
561: int
562: io_read_byte(struct sess *sess, int fd, uint8_t *val)
563: {
564:
1.3 deraadt 565: if (!io_read_buf(sess, fd, val, sizeof(uint8_t))) {
1.1 benno 566: ERRX(sess, "io_read_buf");
567: return 0;
568: }
569: return 1;
570: }
571:
572: int
573: io_write_byte(struct sess *sess, int fd, uint8_t val)
574: {
575:
1.3 deraadt 576: if (!io_write_buf(sess, fd, &val, sizeof(uint8_t))) {
1.1 benno 577: ERRX(sess, "io_write_buf");
578: return 0;
579: }
580: return 1;
581: }