[BACK]Return to rrdp.c CVS log [TXT][DIR] Up to [local] / src / usr.sbin / rpki-client

File: [local] / src / usr.sbin / rpki-client / rrdp.c (download)

Revision 1.33, Fri Feb 16 11:46:57 2024 UTC (3 months, 2 weeks ago) by tb
Branch: MAIN
CVS Tags: OPENBSD_7_5_BASE, OPENBSD_7_5, HEAD
Changes since 1.32: +2 -2 lines

Zap extra ;

/*	$OpenBSD: rrdp.c,v 1.33 2024/02/16 11:46:57 tb Exp $ */
/*
 * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
 * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */
#include <sys/queue.h>
#include <sys/stat.h>

#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <poll.h>
#include <string.h>
#include <unistd.h>
#include <imsg.h>

#include <expat.h>
#include <openssl/sha.h>

#include "extern.h"
#include "rrdp.h"

#define MAX_SESSIONS	12
#define	READ_BUF_SIZE	(32 * 1024)

static struct msgbuf	msgq;

#define RRDP_STATE_REQ		0x01
#define RRDP_STATE_WAIT		0x02
#define RRDP_STATE_PARSE	0x04
#define RRDP_STATE_PARSE_ERROR	0x08
#define RRDP_STATE_PARSE_DONE	0x10
#define RRDP_STATE_HTTP_DONE	0x20
#define RRDP_STATE_DONE		(RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)

struct rrdp {
	TAILQ_ENTRY(rrdp)	 entry;
	unsigned int		 id;
	char			*notifyuri;
	char			*local;
	char			*last_mod;

	struct pollfd		*pfd;
	int			 infd;
	int			 state;
	int			 aborted;
	unsigned int		 file_pending;
	unsigned int		 file_failed;
	enum http_result	 res;
	enum rrdp_task		 task;

	char			 hash[SHA256_DIGEST_LENGTH];
	SHA256_CTX		 ctx;

	struct rrdp_session	*repository;
	struct rrdp_session	*current;
	XML_Parser		 parser;
	struct notification_xml	*nxml;
	struct snapshot_xml	*sxml;
	struct delta_xml	*dxml;
};

static TAILQ_HEAD(, rrdp)	states = TAILQ_HEAD_INITIALIZER(states);

char *
xstrdup(const char *s)
{
	char *r;
	if ((r = strdup(s)) == NULL)
		err(1, "strdup");
	return r;
}

/*
 * Report back that a RRDP request finished.
 * ok should only be set to 1 if the cache is now up-to-date.
 */
static void
rrdp_done(unsigned int id, int ok)
{
	enum rrdp_msg type = RRDP_END;
	struct ibuf *b;

	b = io_new_buffer();
	io_simple_buffer(b, &type, sizeof(type));
	io_simple_buffer(b, &id, sizeof(id));
	io_simple_buffer(b, &ok, sizeof(ok));
	io_close_buffer(&msgq, b);
}

/*
 * Request an URI to be fetched via HTTPS.
 * The main process will respond with a RRDP_HTTP_INI which includes
 * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
 * end of the request with the HTTP status code and last modified timestamp.
 * If the request should not set the If-Modified-Since: header then last_mod
 * should be set to NULL, else it should point to a proper date string.
 */
static void
rrdp_http_req(unsigned int id, const char *uri, const char *last_mod)
{
	enum rrdp_msg type = RRDP_HTTP_REQ;
	struct ibuf *b;

	b = io_new_buffer();
	io_simple_buffer(b, &type, sizeof(type));
	io_simple_buffer(b, &id, sizeof(id));
	io_str_buffer(b, uri);
	io_str_buffer(b, last_mod);
	io_close_buffer(&msgq, b);
}

/*
 * Send the session state to the main process so it gets stored.
 */
static void
rrdp_state_send(struct rrdp *s)
{
	enum rrdp_msg type = RRDP_SESSION;
	struct ibuf *b;

	b = io_new_buffer();
	io_simple_buffer(b, &type, sizeof(type));
	io_simple_buffer(b, &s->id, sizeof(s->id));
	rrdp_session_buffer(b, s->current);
	io_close_buffer(&msgq, b);
}

/*
 * Inform parent to clear the RRDP repository before start of snapshot.
 */
static void
rrdp_clear_repo(struct rrdp *s)
{
	enum rrdp_msg type = RRDP_CLEAR;
	struct ibuf *b;

	b = io_new_buffer();
	io_simple_buffer(b, &type, sizeof(type));
	io_simple_buffer(b, &s->id, sizeof(s->id));
	io_close_buffer(&msgq, b);
}

/*
 * Send a blob of data to the main process to store it in the repository.
 */
void
rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
    unsigned char *data, size_t datasz)
{
	enum rrdp_msg type = RRDP_FILE;
	struct ibuf *b;

	/* only send files if the fetch did not fail already */
	if (s->file_failed == 0) {
		b = io_new_buffer();
		io_simple_buffer(b, &type, sizeof(type));
		io_simple_buffer(b, &s->id, sizeof(s->id));
		io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
		if (pxml->type != PUB_ADD)
			io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
		io_str_buffer(b, pxml->uri);
		io_buf_buffer(b, data, datasz);
		io_close_buffer(&msgq, b);
		s->file_pending++;
	}
}

static void
rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
{
	struct rrdp *s;

	if ((s = calloc(1, sizeof(*s))) == NULL)
		err(1, NULL);

	s->infd = -1;
	s->id = id;
	s->local = local;
	s->notifyuri = notify;
	s->repository = state;
	if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
		err(1, NULL);

	s->state = RRDP_STATE_REQ;
	if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
		err(1, "XML_ParserCreate");

	s->nxml = new_notification_xml(s->parser, s->repository, s->current,
	    notify);

	TAILQ_INSERT_TAIL(&states, s, entry);
}

static void
rrdp_free(struct rrdp *s)
{
	if (s == NULL)
		return;

	TAILQ_REMOVE(&states, s, entry);

	free_notification_xml(s->nxml);
	free_snapshot_xml(s->sxml);
	free_delta_xml(s->dxml);

	if (s->parser)
		XML_ParserFree(s->parser);
	if (s->infd != -1)
		close(s->infd);
	free(s->notifyuri);
	free(s->local);
	free(s->last_mod);
	rrdp_session_free(s->repository);
	rrdp_session_free(s->current);

	free(s);
}

static struct rrdp *
rrdp_get(unsigned int id)
{
	struct rrdp *s;

	TAILQ_FOREACH(s, &states, entry)
		if (s->id == id)
			break;
	return s;
}

static void
rrdp_failed(struct rrdp *s)
{
	unsigned int id = s->id;

	/* reset file state before retrying */
	s->file_failed = 0;

	if (s->task == DELTA && !s->aborted) {
		/* fallback to a snapshot as per RFC8182 */
		free_delta_xml(s->dxml);
		s->dxml = NULL;
		rrdp_clear_repo(s);
		s->sxml = new_snapshot_xml(s->parser, s->current, s);
		s->task = SNAPSHOT;
		s->state = RRDP_STATE_REQ;
		logx("%s: delta sync failed, fallback to snapshot", s->local);
	} else {
		/*
		 * TODO: update state to track recurring failures
		 * and fall back to rsync after a while.
		 * This should probably happen in the main process.
		 */
		rrdp_free(s);
		rrdp_done(id, 0);
	}
}

static void
rrdp_finished(struct rrdp *s)
{
	unsigned int id = s->id;

	/* check if all parts of the process have finished */
	if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
		return;

	/* still some files pending */
	if (s->file_pending > 0)
		return;

	if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) {
		rrdp_failed(s);
		return;
	}

	if (s->res == HTTP_OK) {
		XML_Parser p = s->parser;

		/*
		 * Finalize parsing on success to be sure that
		 * all of the XML is correct. Needs to be done here
		 * since the call would most probably fail for non
		 * successful data fetches.
		 */
		if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
			warnx("%s: XML error at line %llu: %s", s->local,
			    (unsigned long long)XML_GetCurrentLineNumber(p),
			    XML_ErrorString(XML_GetErrorCode(p)));
			rrdp_failed(s);
			return;
		}

		/* If a file caused an error fail the update */
		if (s->file_failed > 0) {
			rrdp_failed(s);
			return;
		}

		switch (s->task) {
		case NOTIFICATION:
			s->task = notification_done(s->nxml, s->last_mod);
			s->last_mod = NULL;
			switch (s->task) {
			case NOTIFICATION:
				logx("%s: repository not modified (%s#%lld)",
				    s->local, s->repository->session_id,
				    s->repository->serial);
				rrdp_state_send(s);
				rrdp_free(s);
				rrdp_done(id, 1);
				break;
			case SNAPSHOT:
				logx("%s: downloading snapshot (%s#%lld)",
				    s->local, s->current->session_id,
				    s->current->serial);
				rrdp_clear_repo(s);
				s->sxml = new_snapshot_xml(p, s->current, s);
				s->state = RRDP_STATE_REQ;
				break;
			case DELTA:
				logx("%s: downloading %lld deltas (%s#%lld)",
				    s->local,
				    s->repository->serial - s->current->serial,
				    s->current->session_id, s->current->serial);
				s->dxml = new_delta_xml(p, s->current, s);
				s->state = RRDP_STATE_REQ;
				break;
			}
			break;
		case SNAPSHOT:
			rrdp_state_send(s);
			rrdp_free(s);
			rrdp_done(id, 1);
			break;
		case DELTA:
			if (notification_delta_done(s->nxml)) {
				/* finished */
				rrdp_state_send(s);
				rrdp_free(s);
				rrdp_done(id, 1);
			} else {
				/* reset delta parser for next delta */
				free_delta_xml(s->dxml);
				s->dxml = new_delta_xml(p, s->current, s);
				s->state = RRDP_STATE_REQ;
			}
			break;
		}
	} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
		logx("%s: notification file not modified (%s#%lld)", s->local,
		    s->repository->session_id, s->repository->serial);
		/* no need to update state file */
		rrdp_free(s);
		rrdp_done(id, 1);
	} else {
		rrdp_failed(s);
	}
}

static void
rrdp_abort_req(struct rrdp *s)
{
	unsigned int id = s->id;

	s->aborted = 1;
	if (s->state == RRDP_STATE_REQ) {
		/* nothing is pending, just abort */
		rrdp_free(s);
		rrdp_done(id, 1);
		return;
	}
	if (s->state == RRDP_STATE_WAIT)
		/* wait for HTTP_INI which will progress the state */
		return;

	/*
	 * RRDP_STATE_PARSE or later, close infd, abort parser but
	 * wait for HTTP_FIN and file_pending to drop to 0.
	 */
	if (s->infd != -1) {
		close(s->infd);
		s->infd = -1;
		s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR;
	}
	rrdp_finished(s);
}

static void
rrdp_input_handler(int fd)
{
	static struct ibuf *inbuf;
	struct rrdp_session *state;
	char *local, *notify, *last_mod;
	struct ibuf *b;
	struct rrdp *s;
	enum rrdp_msg type;
	enum http_result res;
	unsigned int id;
	int ok;

	b = io_buf_recvfd(fd, &inbuf);
	if (b == NULL)
		return;

	io_read_buf(b, &type, sizeof(type));
	io_read_buf(b, &id, sizeof(id));

	switch (type) {
	case RRDP_START:
		if (ibuf_fd_avail(b))
			errx(1, "received unexpected fd");
		io_read_str(b, &local);
		io_read_str(b, &notify);
		state = rrdp_session_read(b);
		rrdp_new(id, local, notify, state);
		break;
	case RRDP_HTTP_INI:
		s = rrdp_get(id);
		if (s == NULL)
			errx(1, "http ini, rrdp session %u does not exist", id);
		if (s->state != RRDP_STATE_WAIT)
			errx(1, "%s: bad internal state", s->local);
		s->infd = ibuf_fd_get(b);
		if (s->infd == -1)
			errx(1, "expected fd not received");
		s->state = RRDP_STATE_PARSE;
		if (s->aborted) {
			rrdp_abort_req(s);
			break;
		}
		break;
	case RRDP_HTTP_FIN:
		io_read_buf(b, &res, sizeof(res));
		io_read_str(b, &last_mod);
		if (ibuf_fd_avail(b))
			errx(1, "received unexpected fd");

		s = rrdp_get(id);
		if (s == NULL)
			errx(1, "http fin, rrdp session %u does not exist", id);
		if (!(s->state & RRDP_STATE_PARSE))
			errx(1, "%s: bad internal state", s->local);
		s->state |= RRDP_STATE_HTTP_DONE;
		s->res = res;
		free(s->last_mod);
		s->last_mod = last_mod;
		rrdp_finished(s);
		break;
	case RRDP_FILE:
		s = rrdp_get(id);
		if (s == NULL)
			errx(1, "file, rrdp session %u does not exist", id);
		if (ibuf_fd_avail(b))
			errx(1, "received unexpected fd");
		io_read_buf(b, &ok, sizeof(ok));
		if (ok != 1)
			s->file_failed++;
		s->file_pending--;
		if (s->file_pending == 0)
			rrdp_finished(s);
		break;
	case RRDP_ABORT:
		if (ibuf_fd_avail(b))
			errx(1, "received unexpected fd");
		s = rrdp_get(id);
		if (s != NULL)
			rrdp_abort_req(s);
		break;
	default:
		errx(1, "unexpected message %d", type);
	}
	ibuf_free(b);
}

static void
rrdp_data_handler(struct rrdp *s)
{
	char buf[READ_BUF_SIZE];
	XML_Parser p = s->parser;
	ssize_t len;

	len = read(s->infd, buf, sizeof(buf));
	if (len == -1) {
		warn("%s: read failure", s->local);
		rrdp_abort_req(s);
		return;
	}
	if ((s->state & RRDP_STATE_PARSE) == 0)
		errx(1, "%s: bad parser state", s->local);
	if (len == 0) {
		/* parser stage finished */
		close(s->infd);
		s->infd = -1;

		if (s->task != NOTIFICATION) {
			char h[SHA256_DIGEST_LENGTH];

			SHA256_Final(h, &s->ctx);
			if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
				s->state |= RRDP_STATE_PARSE_ERROR;
				warnx("%s: bad message digest", s->local);
			}
		}

		s->state |= RRDP_STATE_PARSE_DONE;
		rrdp_finished(s);
		return;
	}

	/* parse and maybe hash the bytes just read */
	if (s->task != NOTIFICATION)
		SHA256_Update(&s->ctx, buf, len);
	if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
	    XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
		warnx("%s: parse error at line %llu: %s", s->local,
		    (unsigned long long)XML_GetCurrentLineNumber(p),
		    XML_ErrorString(XML_GetErrorCode(p)));
		s->state |= RRDP_STATE_PARSE_ERROR;
	}
}

void
proc_rrdp(int fd)
{
	struct pollfd pfds[MAX_SESSIONS + 1];
	struct rrdp *s, *ns;
	size_t i;

	if (pledge("stdio recvfd", NULL) == -1)
		err(1, "pledge");

	msgbuf_init(&msgq);
	msgq.fd = fd;

	for (;;) {
		i = 1;
		memset(&pfds, 0, sizeof(pfds));
		TAILQ_FOREACH(s, &states, entry) {
			if (i >= MAX_SESSIONS + 1) {
				/* not enough sessions, wait for better times */
				s->pfd = NULL;
				continue;
			}
			/* request new assets when there are free sessions */
			if (s->state == RRDP_STATE_REQ) {
				const char *uri;
				switch (s->task) {
				case NOTIFICATION:
					rrdp_http_req(s->id, s->notifyuri,
					    s->repository->last_mod);
					break;
				case SNAPSHOT:
				case DELTA:
					uri = notification_get_next(s->nxml,
					    s->hash, sizeof(s->hash),
					    s->task);
					SHA256_Init(&s->ctx);
					rrdp_http_req(s->id, uri, NULL);
					break;
				}
				s->state = RRDP_STATE_WAIT;
			}
			s->pfd = pfds + i++;
			s->pfd->fd = s->infd;
			s->pfd->events = POLLIN;
		}

		/*
		 * Update main fd last.
		 * The previous loop may have enqueue messages.
		 */
		pfds[0].fd = fd;
		pfds[0].events = POLLIN;
		if (msgq.queued)
			pfds[0].events |= POLLOUT;

		if (poll(pfds, i, INFTIM) == -1) {
			if (errno == EINTR)
				continue;
			err(1, "poll");
		}

		if (pfds[0].revents & POLLHUP)
			break;
		if (pfds[0].revents & POLLOUT) {
			switch (msgbuf_write(&msgq)) {
			case 0:
				errx(1, "write: connection closed");
			case -1:
				err(1, "write");
			}
		}
		if (pfds[0].revents & POLLIN)
			rrdp_input_handler(fd);

		TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
			if (s->pfd == NULL)
				continue;
			if (s->pfd->revents != 0)
				rrdp_data_handler(s);
		}
	}

	exit(0);
}