Annotation of src/usr.bin/rsync/io.c, Revision 1.2
1.1 benno 1: /* $Id$ */
2: /*
3: * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4: *
5: * Permission to use, copy, modify, and distribute this software for any
6: * purpose with or without fee is hereby granted, provided that the above
7: * copyright notice and this permission notice appear in all copies.
8: *
9: * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10: * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11: * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12: * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13: * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14: * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15: * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16: */
17: #include <sys/stat.h>
18:
19: #include <assert.h>
20: #include <endian.h>
21: #include <errno.h>
22: #include <poll.h>
23: #include <stdint.h>
24: #include <stdio.h>
25: #include <stdlib.h>
26: #include <string.h>
27: #include <unistd.h>
28:
29: #include "extern.h"
30:
31: int
32: io_read_check(struct sess *sess, int fd)
33: {
34: struct pollfd pfd;
35:
36: pfd.fd = fd;
37: pfd.events = POLLIN;
38:
39: if (poll(&pfd, 1, 0) < 0) {
40: ERR(sess, "poll");
41: return -1;
42: }
43: return pfd.revents & POLLIN;
44: }
45:
46: /*
47: * Write buffer to non-blocking descriptor.
48: * Returns zero on failure, non-zero on success (zero or more bytes).
49: */
50: static int
51: io_write_nonblocking(struct sess *sess,
52: int fd, const void *buf, size_t bsz, size_t *sz)
53: {
54: struct pollfd pfd;
55: ssize_t wsz;
56:
57: *sz = 0;
58:
59: if (0 == bsz)
60: return 1;
61:
62: pfd.fd = fd;
63: pfd.events = POLLOUT;
64:
65: if (poll(&pfd, 1, INFTIM) < 0) {
66: ERR(sess, "poll");
67: return 0;
68: }
69: if ((pfd.revents & (POLLERR|POLLNVAL))) {
70: ERRX(sess, "poll: bad fd");
71: return 0;
72: } else if ((pfd.revents & POLLHUP)) {
73: ERRX(sess, "poll: hangup");
74: return 0;
75: } else if ( ! (pfd.revents & POLLOUT)) {
76: ERRX(sess, "poll: unknown event");
77: return 0;
78: }
79:
80: if ((wsz = write(fd, buf, bsz)) < 0) {
81: ERR(sess, "write");
82: return 0;
83: }
84:
85: *sz = wsz;
86: return 1;
87: }
88:
89: /*
90: * Blocking write of the full size of the buffer.
91: * Returns 0 on failure, non-zero on success (all bytes written).
92: */
93: static int
94: io_write_blocking(struct sess *sess,
95: int fd, const void *buf, size_t sz)
96: {
97: size_t wsz;
98: int c;
99:
100: while (sz > 0) {
101: c = io_write_nonblocking(sess, fd, buf, sz, &wsz);
102: if ( ! c) {
103: ERRX1(sess, "io_write_nonblocking");
104: return 0;
105: } else if (0 == wsz) {
106: ERRX(sess, "io_write_nonblocking: short write");
107: return 0;
108: }
109: buf += wsz;
110: sz -= wsz;
111: }
112:
113: return 1;
114: }
115:
116: /*
117: * Write "buf" of size "sz" to non-blocking descriptor.
118: * Returns zero on failure, non-zero on success (all bytes written to
119: * the descriptor).
120: */
121: int
122: io_write_buf(struct sess *sess, int fd, const void *buf, size_t sz)
123: {
124: int32_t tag, tagbuf;
125: size_t wsz;
126: int c;
127:
128: if ( ! sess->mplex_writes) {
129: c = io_write_blocking(sess, fd, buf, sz);
130: sess->total_write += sz;
131: return c;
132: }
133:
134: while (sz > 0) {
135: wsz = sz & 0xFFFFFF;
136: tag = (7 << 24) + wsz;
137: tagbuf = htole32(tag);
138: if ( ! io_write_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
139: ERRX1(sess, "io_write_blocking");
140: return 0;
141: }
142: if ( ! io_write_blocking(sess, fd, buf, wsz)) {
143: ERRX1(sess, "io_write_blocking");
144: return 0;
145: }
146: sess->total_write += wsz;
147: sz -= wsz;
148: buf += wsz;
149: }
150:
151: return 1;
152: }
153:
154: /*
155: * Write "line" (NUL-terminated) followed by a newline.
156: * Returns zero on failure, non-zero on succcess.
157: */
158: int
159: io_write_line(struct sess *sess, int fd, const char *line)
160: {
161:
162: if ( ! io_write_buf(sess, fd, line, strlen(line)))
163: ERRX1(sess, "io_write_buf");
164: else if ( ! io_write_byte(sess, fd, '\n'))
165: ERRX1(sess, "io_write_byte");
166: else
167: return 1;
168:
169: return 0;
170: }
171:
172: /*
173: * Read buffer from non-blocking descriptor.
174: * Returns zero on failure, non-zero on success (zero or more bytes).
175: */
176: static int
177: io_read_nonblocking(struct sess *sess,
178: int fd, void *buf, size_t bsz, size_t *sz)
179: {
180: struct pollfd pfd;
181: ssize_t rsz;
182:
183: *sz = 0;
184:
185: if (0 == bsz)
186: return 1;
187:
188: pfd.fd = fd;
189: pfd.events = POLLIN;
190:
191: if (poll(&pfd, 1, INFTIM) < 0) {
192: ERR(sess, "poll");
193: return 0;
194: }
195: if ((pfd.revents & (POLLERR|POLLNVAL))) {
196: ERRX(sess, "poll: bad fd");
197: return 0;
198: } else if ( ! (pfd.revents & (POLLIN|POLLHUP))) {
199: ERRX(sess, "poll: unknown event");
200: return 0;
201: }
202:
203: if ((rsz = read(fd, buf, bsz)) < 0) {
204: ERR(sess, "read");
205: return 0;
206: } else if (0 == rsz) {
207: ERRX(sess, "unexpected end of file");
208: return 0;
209: }
210:
211: *sz = rsz;
212: return 1;
213: }
214:
215: /*
216: * Blocking read of the full size of the buffer.
217: * This can be called from either the error type message or a regular
218: * message---or for that matter, multiplexed or not.
219: * Returns 0 on failure, non-zero on success (all bytes read).
220: */
221: static int
222: io_read_blocking(struct sess *sess,
223: int fd, void *buf, size_t sz)
224: {
225: size_t rsz;
226: int c;
227:
228: while (sz > 0) {
229: c = io_read_nonblocking(sess, fd, buf, sz, &rsz);
230: if ( ! c) {
231: ERRX1(sess, "io_read_nonblocking");
232: return 0;
233: } else if (0 == rsz) {
234: ERRX(sess, "io_read_nonblocking: short read");
235: return 0;
236: }
237: buf += rsz;
238: sz -= rsz;
239: }
240:
241: return 1;
242: }
243:
244: /*
245: * When we do a lot of writes in a row (such as when the sender emits
246: * the file list), the server might be sending us multiplexed log
247: * messages.
248: * If it sends too many, it clogs the socket.
249: * This function looks into the read buffer and clears out any log
250: * messages pending.
251: * If called when there are valid data reads available, this function
252: * does nothing.
253: * Returns zero on failure, non-zero on success.
254: */
255: int
256: io_read_flush(struct sess *sess, int fd)
257: {
258: int32_t tagbuf, tag;
259: char mpbuf[1024];
260:
261: if (sess->mplex_read_remain)
262: return 1;
263:
264: /*
265: * First, read the 4-byte multiplex tag.
266: * The first byte is the tag identifier (7 for normal
267: * data, !7 for out-of-band data), the last three are
268: * for the remaining data size.
269: */
270:
271: if ( ! io_read_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
272: ERRX1(sess, "io_read_blocking");
273: return 0;
274: }
275: tag = le32toh(tagbuf);
276: sess->mplex_read_remain = tag & 0xFFFFFF;
277: tag >>= 24;
278: if (7 == tag)
279: return 1;
280:
281: tag -= 7;
282:
283: if (sess->mplex_read_remain > sizeof(mpbuf)) {
284: ERRX(sess, "multiplex buffer overflow");
285: return 0;
286: } else if (0 == sess->mplex_read_remain)
287: return 1;
288:
289: if ( ! io_read_blocking(sess, fd,
290: mpbuf, sess->mplex_read_remain)) {
291: ERRX1(sess, "io_read_blocking");
292: return 0;
293: }
294: if ('\n' == mpbuf[sess->mplex_read_remain - 1])
295: mpbuf[--sess->mplex_read_remain] = '\0';
296:
297: /*
298: * Always print the server's messages, as the server
299: * will control its own log levelling.
300: */
301:
302: LOG0(sess, "%.*s", (int)sess->mplex_read_remain, mpbuf);
303: sess->mplex_read_remain = 0;
304:
1.2 ! benno 305: /*
1.1 benno 306: * I only know that a tag of one means an error.
307: * This means that we should exit.
308: */
309:
310: if (1 == tag) {
311: ERRX1(sess, "error from remote host");
312: return 0;
313: }
314: return 1;
315: }
316:
317: /*
318: * Read buffer from non-blocking descriptor, possibly in multiplex read
319: * mode.
320: * Returns zero on failure, non-zero on success (all bytes read from
321: * the descriptor).
322: */
323: int
324: io_read_buf(struct sess *sess, int fd, void *buf, size_t sz)
325: {
326: size_t rsz;
327: int c;
328:
329: /* If we're not multiplexing, read directly. */
330:
331: if ( ! sess->mplex_reads) {
332: assert(0 == sess->mplex_read_remain);
333: c = io_read_blocking(sess, fd, buf, sz);
334: sess->total_read += sz;
335: return c;
336: }
337:
338: while (sz > 0) {
339: /*
340: * First, check to see if we have any regular data
341: * hanging around waiting to be read.
342: * If so, read the lesser of that data and whatever
343: * amount we currently want.
344: */
345:
346: if (sess->mplex_read_remain) {
347: rsz = sess->mplex_read_remain < sz ?
348: sess->mplex_read_remain : sz;
349: if ( ! io_read_blocking(sess, fd, buf, rsz)) {
350: ERRX1(sess, "io_read_blocking");
351: return 0;
352: }
353: sz -= rsz;
354: sess->mplex_read_remain -= rsz;
355: buf += rsz;
356: sess->total_read += rsz;
357: continue;
358: }
359:
360: assert(0 == sess->mplex_read_remain);
361: if ( ! io_read_flush(sess, fd)) {
362: ERRX1(sess, "io_read_flush");
363: return 0;
364: }
365: }
366:
367: return 1;
368: }
369:
370: int
371: io_write_long(struct sess *sess, int fd, int64_t val)
372: {
373: int64_t nv;
374:
375: /* Short-circuit: send as an integer if possible. */
376:
377: if (val <= INT32_MAX && val >= 0)
378: return io_write_int(sess, fd, (int32_t)val);
379:
380: /* Otherwise, pad with max integer, then send 64-bit. */
381:
382: nv = htole64(val);
383:
384: if ( ! io_write_int(sess, fd, INT32_MAX))
385: ERRX(sess, "io_write_int");
386: else if ( ! io_write_buf(sess, fd, &nv, sizeof(int64_t)))
387: ERRX(sess, "io_write_buf");
388: else
389: return 1;
390:
391: return 0;
392: }
393:
394: int
395: io_write_int(struct sess *sess, int fd, int32_t val)
396: {
397: int32_t nv;
398:
399: nv = htole32(val);
400:
401: if ( ! io_write_buf(sess, fd, &nv, sizeof(int32_t))) {
402: ERRX(sess, "io_write_buf");
403: return 0;
404: }
405: return 1;
406: }
407:
408: /*
409: * A simple assertion-protected memory copy from th einput "val" or size
410: * "valsz" into our buffer "buf", full size "buflen", position "bufpos".
411: * Increases our "bufpos" appropriately.
412: * This has no return value, but will assert() if the size of the buffer
413: * is insufficient for the new data.
414: */
415: void
1.2 ! benno 416: io_buffer_buf(struct sess *sess, void *buf,
1.1 benno 417: size_t *bufpos, size_t buflen, const void *val, size_t valsz)
418: {
419:
420: assert(*bufpos + valsz <= buflen);
421: memcpy(buf + *bufpos, val, valsz);
422: *bufpos += valsz;
423: }
424:
425: /*
426: * Converts "val" to LE prior to io_buffer_buf().
427: */
428: void
1.2 ! benno 429: io_buffer_int(struct sess *sess, void *buf,
1.1 benno 430: size_t *bufpos, size_t buflen, int32_t val)
431: {
432: int32_t nv = htole32(val);
433:
1.2 ! benno 434: io_buffer_buf(sess, buf, bufpos,
1.1 benno 435: buflen, &nv, sizeof(int32_t));
436: }
437:
438: int
439: io_read_ulong(struct sess *sess, int fd, uint64_t *val)
440: {
441: int64_t oval;
442:
443: if ( ! io_read_long(sess, fd, &oval)) {
444: ERRX(sess, "io_read_int");
445: return 0;
446: } else if (oval < 0) {
447: ERRX(sess, "io_read_size: negative value");
448: return 1;
449: }
450:
451: *val = oval;
452: return 1;
453: }
454:
455: int
456: io_read_long(struct sess *sess, int fd, int64_t *val)
457: {
458: int64_t oval;
459: int32_t sval;
460:
461: /* Start with the short-circuit: read as an int. */
462:
463: if ( ! io_read_int(sess, fd, &sval)) {
464: ERRX(sess, "io_read_int");
465: return 0;
466: } else if (INT32_MAX != sval) {
467: *val = sval;
468: return 1;
469: }
470:
471: /* If the int is maximal, read as 64 bits. */
472:
473: if ( ! io_read_buf(sess, fd, &oval, sizeof(int64_t))) {
474: ERRX(sess, "io_read_buf");
475: return 0;
476: }
477:
478: *val = le64toh(oval);
479: return 1;
480: }
481:
482: /*
483: * One thing we often need to do is read a size_t.
484: * These are transmitted as int32_t, so make sure that the value
485: * transmitted is not out of range.
486: * FIXME: I assume that size_t can handle int32_t's max.
487: */
488: int
489: io_read_size(struct sess *sess, int fd, size_t *val)
490: {
491: int32_t oval;
492:
493: if ( ! io_read_int(sess, fd, &oval)) {
494: ERRX(sess, "io_read_int");
495: return 0;
496: } else if (oval < 0) {
497: ERRX(sess, "io_read_size: negative value");
498: return 0;
499: }
500:
501: *val = oval;
502: return 1;
503: }
504:
505: int
506: io_read_int(struct sess *sess, int fd, int32_t *val)
507: {
508: int32_t oval;
509:
510: if ( ! io_read_buf(sess, fd, &oval, sizeof(int32_t))) {
511: ERRX(sess, "io_read_buf");
512: return 0;
513: }
514:
515: *val = le32toh(oval);
516: return 1;
517: }
518:
519: /*
520: * Copies "valsz" from "buf", full size "bufsz" at position" bufpos",
521: * into "val".
522: * Calls assert() if the source doesn't have enough data.
523: * Increases "bufpos" to the new position.
524: */
525: void
1.2 ! benno 526: io_unbuffer_buf(struct sess *sess, const void *buf,
1.1 benno 527: size_t *bufpos, size_t bufsz, void *val, size_t valsz)
528: {
529:
530: assert(*bufpos + valsz <= bufsz);
531: memcpy(val, buf + *bufpos, valsz);
532: *bufpos += valsz;
533: }
534:
535: /*
536: * Calls io_unbuffer_buf() and converts from LE.
537: */
538: void
1.2 ! benno 539: io_unbuffer_int(struct sess *sess, const void *buf,
1.1 benno 540: size_t *bufpos, size_t bufsz, int32_t *val)
541: {
542: int32_t oval;
543:
1.2 ! benno 544: io_unbuffer_buf(sess, buf, bufpos,
1.1 benno 545: bufsz, &oval, sizeof(int32_t));
546: *val = le32toh(oval);
547: }
548:
549: int
1.2 ! benno 550: io_unbuffer_size(struct sess *sess, const void *buf,
1.1 benno 551: size_t *bufpos, size_t bufsz, size_t *val)
552: {
553: int32_t oval;
554:
555: io_unbuffer_int(sess, buf, bufpos, bufsz, &oval);
556: if (oval < 0) {
557: ERRX(sess, "io_unbuffer_size: negative value");
558: return 0;
559: }
560: *val = oval;
561: return 1;
562: }
563:
564: int
565: io_read_byte(struct sess *sess, int fd, uint8_t *val)
566: {
567:
568: if ( ! io_read_buf(sess, fd, val, sizeof(uint8_t))) {
569: ERRX(sess, "io_read_buf");
570: return 0;
571: }
572: return 1;
573: }
574:
575: int
576: io_write_byte(struct sess *sess, int fd, uint8_t val)
577: {
578:
579: if ( ! io_write_buf(sess, fd, &val, sizeof(uint8_t))) {
580: ERRX(sess, "io_write_buf");
581: return 0;
582: }
583: return 1;
584: }