[BACK]Return to queue_proc.c CVS log [TXT][DIR] Up to [local] / src / usr.sbin / smtpd

File: [local] / src / usr.sbin / smtpd / queue_proc.c (download)

Revision 1.11, Tue May 7 12:10:06 2024 UTC (4 weeks, 4 days ago) by op
Branch: MAIN
CVS Tags: HEAD
Changes since 1.10: +2 -2 lines

change the smtpd table protocol

Using imsg for the "proc" table (external programs) has proven quite
painful in practice since a lot of smtpd internals (structs, enums,
etc..) have to be kept in sync with the various tables implementations.

Instead, a filter-like protocol for tables decouples the implementations
and allows to write and test tables easily.

The new text-based transport protocol is documented in the (added)
smtpd-tables(7) manpage.

The old imsg protocol is no longer supported and existing tables have to
be converted.  In particular, users of opensmtpd-extras tables will need
install the new opensmtpd-table-* packages.

With lots of suggestions and improvements from gilles and a tweak
from Philipp (philipp+openbsd [at] bureaucracy [dot] de), thanks!

ok gilles

/*	$OpenBSD: queue_proc.c,v 1.11 2024/05/07 12:10:06 op Exp $	*/

/*
 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <errno.h>
#include <fcntl.h>
#include <string.h>

#include "smtpd.h"
#include "log.h"

static struct imsgbuf	 ibuf;
static struct imsg	 imsg;
static size_t		 rlen;
static char		*rdata;

static void
queue_proc_call(void)
{
	ssize_t	n;

	if (imsg_flush(&ibuf) == -1) {
		log_warn("warn: queue-proc: imsg_flush");
		fatalx("queue-proc: exiting");
	}

	while (1) {
		if ((n = imsg_get(&ibuf, &imsg)) == -1) {
			log_warn("warn: queue-proc: imsg_get");
			break;
		}
		if (n) {
			rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
			rdata = imsg.data;

			if (imsg.hdr.type != PROC_QUEUE_OK) {
				log_warnx("warn: queue-proc: bad response");
				break;
			}
			return;
		}

		if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) {
			log_warn("warn: queue-proc: imsg_read");
			break;
		}

		if (n == 0) {
			log_warnx("warn: queue-proc: pipe closed");
			break;
		}
	}

	fatalx("queue-proc: exiting");
}

static void
queue_proc_read(void *dst, size_t len)
{
	if (len > rlen) {
		log_warnx("warn: queue-proc: bad msg len");
		fatalx("queue-proc: exiting");
	}

	memmove(dst, rdata, len);
	rlen -= len;
	rdata += len;
}

static void
queue_proc_end(void)
{
	if (rlen) {
		log_warnx("warn: queue-proc: bogus data");
		fatalx("queue-proc: exiting");
	}
	imsg_free(&imsg);
}

/*
 * API
 */

static int
queue_proc_close(void)
{
	int	r;

	imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0);

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));
	queue_proc_end();

	return (r);
}

static int
queue_proc_message_create(uint32_t *msgid)
{
	int	r;

	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));
	if (r == 1)
		queue_proc_read(msgid, sizeof(*msgid));
	queue_proc_end();

	return (r);
}

static int
queue_proc_message_commit(uint32_t msgid, const char *path)
{
	int	r, fd;

	fd = open(path, O_RDONLY);
	if (fd == -1) {
		log_warn("queue-proc: open: %s", path);
		return (0);
	}

	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
	    sizeof(msgid));

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));
	queue_proc_end();

	return (r);
}

static int
queue_proc_message_delete(uint32_t msgid)
{
	int	r;

	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
	    sizeof(msgid));

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));
	queue_proc_end();

	return (r);
}

static int
queue_proc_message_fd_r(uint32_t msgid)
{
	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
	    sizeof(msgid));

	queue_proc_call();
	queue_proc_end();

	return (imsg_get_fd(&imsg));
}

static int
queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
    uint64_t *evpid)
{
	struct ibuf	*b;
	int		 r;

	msgid = evpid_to_msgid(*evpid);
	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
	    sizeof(msgid) + len);
	if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
	    imsg_add(b, buf, len) == -1)
		return (0);
	imsg_close(&ibuf, b);

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));
	if (r == 1)
		queue_proc_read(evpid, sizeof(*evpid));
	queue_proc_end();

	return (r);
}

static int
queue_proc_envelope_delete(uint64_t evpid)
{
	int	r;

	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
	    sizeof(evpid));

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));
	queue_proc_end();

	return (r);
}

static int
queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
{
	struct ibuf	*b;
	int		 r;

	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
	    len + sizeof(evpid));
	if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
	    imsg_add(b, buf, len) == -1)
		return (0);
	imsg_close(&ibuf, b);

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));
	queue_proc_end();

	return (r);
}

static int
queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
{
	int	r;

	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
	    sizeof(evpid));

	queue_proc_call();

	if (rlen > len) {
		log_warnx("warn: queue-proc: buf too small");
		fatalx("queue-proc: exiting");
	}

	r = rlen;
	queue_proc_read(buf, rlen);
	queue_proc_end();

	return (r);
}

static int
queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
{
	int	r;

	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);

	queue_proc_call();
	queue_proc_read(&r, sizeof(r));

	if (r > 0) {
		queue_proc_read(evpid, sizeof(*evpid));
		if (rlen > len) {
			log_warnx("warn: queue-proc: buf too small");
			fatalx("queue-proc: exiting");
		}
		if (r != (int)rlen) {
			log_warnx("warn: queue-proc: len mismatch");
			fatalx("queue-proc: exiting");
		}
		queue_proc_read(buf, rlen);
	}
	queue_proc_end();

	return (r);
}

static int
queue_proc_init(struct passwd *pw, int server, const char *conf)
{
	uint32_t	version;
	int		fd;

	fd = fork_proc_backend("queue", conf, "queue-proc", 0);
	if (fd == -1)
		fatalx("queue-proc: exiting");

	imsg_init(&ibuf, fd);

	version = PROC_QUEUE_API_VERSION;
	imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
	    &version, sizeof(version));

	queue_api_on_close(queue_proc_close);
	queue_api_on_message_create(queue_proc_message_create);
	queue_api_on_message_commit(queue_proc_message_commit);
	queue_api_on_message_delete(queue_proc_message_delete);
	queue_api_on_message_fd_r(queue_proc_message_fd_r);
	queue_api_on_envelope_create(queue_proc_envelope_create);
	queue_api_on_envelope_delete(queue_proc_envelope_delete);
	queue_api_on_envelope_update(queue_proc_envelope_update);
	queue_api_on_envelope_load(queue_proc_envelope_load);
	queue_api_on_envelope_walk(queue_proc_envelope_walk);

	queue_proc_call();
	queue_proc_end();

	return (1);
}

struct queue_backend	queue_backend_proc = {
	queue_proc_init,
};