File: [local] / src / usr.sbin / smtpd / queue_proc.c (download)
Revision 1.11, Tue May 7 12:10:06 2024 UTC (4 weeks, 4 days ago) by op
Branch: MAIN
CVS Tags: HEAD Changes since 1.10: +2 -2 lines
change the smtpd table protocol
Using imsg for the "proc" table (external programs) has proven quite
painful in practice since a lot of smtpd internals (structs, enums,
etc..) have to be kept in sync with the various tables implementations.
Instead, a filter-like protocol for tables decouples the implementations
and allows to write and test tables easily.
The new text-based transport protocol is documented in the (added)
smtpd-tables(7) manpage.
The old imsg protocol is no longer supported and existing tables have to
be converted. In particular, users of opensmtpd-extras tables will need
install the new opensmtpd-table-* packages.
With lots of suggestions and improvements from gilles and a tweak
from Philipp (philipp+openbsd [at] bureaucracy [dot] de), thanks!
ok gilles
|
/* $OpenBSD: queue_proc.c,v 1.11 2024/05/07 12:10:06 op Exp $ */
/*
* Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include "smtpd.h"
#include "log.h"
static struct imsgbuf ibuf;
static struct imsg imsg;
static size_t rlen;
static char *rdata;
static void
queue_proc_call(void)
{
ssize_t n;
if (imsg_flush(&ibuf) == -1) {
log_warn("warn: queue-proc: imsg_flush");
fatalx("queue-proc: exiting");
}
while (1) {
if ((n = imsg_get(&ibuf, &imsg)) == -1) {
log_warn("warn: queue-proc: imsg_get");
break;
}
if (n) {
rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
rdata = imsg.data;
if (imsg.hdr.type != PROC_QUEUE_OK) {
log_warnx("warn: queue-proc: bad response");
break;
}
return;
}
if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) {
log_warn("warn: queue-proc: imsg_read");
break;
}
if (n == 0) {
log_warnx("warn: queue-proc: pipe closed");
break;
}
}
fatalx("queue-proc: exiting");
}
static void
queue_proc_read(void *dst, size_t len)
{
if (len > rlen) {
log_warnx("warn: queue-proc: bad msg len");
fatalx("queue-proc: exiting");
}
memmove(dst, rdata, len);
rlen -= len;
rdata += len;
}
static void
queue_proc_end(void)
{
if (rlen) {
log_warnx("warn: queue-proc: bogus data");
fatalx("queue-proc: exiting");
}
imsg_free(&imsg);
}
/*
* API
*/
static int
queue_proc_close(void)
{
int r;
imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0);
queue_proc_call();
queue_proc_read(&r, sizeof(r));
queue_proc_end();
return (r);
}
static int
queue_proc_message_create(uint32_t *msgid)
{
int r;
imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);
queue_proc_call();
queue_proc_read(&r, sizeof(r));
if (r == 1)
queue_proc_read(msgid, sizeof(*msgid));
queue_proc_end();
return (r);
}
static int
queue_proc_message_commit(uint32_t msgid, const char *path)
{
int r, fd;
fd = open(path, O_RDONLY);
if (fd == -1) {
log_warn("queue-proc: open: %s", path);
return (0);
}
imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
sizeof(msgid));
queue_proc_call();
queue_proc_read(&r, sizeof(r));
queue_proc_end();
return (r);
}
static int
queue_proc_message_delete(uint32_t msgid)
{
int r;
imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
sizeof(msgid));
queue_proc_call();
queue_proc_read(&r, sizeof(r));
queue_proc_end();
return (r);
}
static int
queue_proc_message_fd_r(uint32_t msgid)
{
imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
sizeof(msgid));
queue_proc_call();
queue_proc_end();
return (imsg_get_fd(&imsg));
}
static int
queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
uint64_t *evpid)
{
struct ibuf *b;
int r;
msgid = evpid_to_msgid(*evpid);
b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
sizeof(msgid) + len);
if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
imsg_add(b, buf, len) == -1)
return (0);
imsg_close(&ibuf, b);
queue_proc_call();
queue_proc_read(&r, sizeof(r));
if (r == 1)
queue_proc_read(evpid, sizeof(*evpid));
queue_proc_end();
return (r);
}
static int
queue_proc_envelope_delete(uint64_t evpid)
{
int r;
imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
sizeof(evpid));
queue_proc_call();
queue_proc_read(&r, sizeof(r));
queue_proc_end();
return (r);
}
static int
queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
{
struct ibuf *b;
int r;
b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
len + sizeof(evpid));
if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
imsg_add(b, buf, len) == -1)
return (0);
imsg_close(&ibuf, b);
queue_proc_call();
queue_proc_read(&r, sizeof(r));
queue_proc_end();
return (r);
}
static int
queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
{
int r;
imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
sizeof(evpid));
queue_proc_call();
if (rlen > len) {
log_warnx("warn: queue-proc: buf too small");
fatalx("queue-proc: exiting");
}
r = rlen;
queue_proc_read(buf, rlen);
queue_proc_end();
return (r);
}
static int
queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
{
int r;
imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);
queue_proc_call();
queue_proc_read(&r, sizeof(r));
if (r > 0) {
queue_proc_read(evpid, sizeof(*evpid));
if (rlen > len) {
log_warnx("warn: queue-proc: buf too small");
fatalx("queue-proc: exiting");
}
if (r != (int)rlen) {
log_warnx("warn: queue-proc: len mismatch");
fatalx("queue-proc: exiting");
}
queue_proc_read(buf, rlen);
}
queue_proc_end();
return (r);
}
static int
queue_proc_init(struct passwd *pw, int server, const char *conf)
{
uint32_t version;
int fd;
fd = fork_proc_backend("queue", conf, "queue-proc", 0);
if (fd == -1)
fatalx("queue-proc: exiting");
imsg_init(&ibuf, fd);
version = PROC_QUEUE_API_VERSION;
imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
&version, sizeof(version));
queue_api_on_close(queue_proc_close);
queue_api_on_message_create(queue_proc_message_create);
queue_api_on_message_commit(queue_proc_message_commit);
queue_api_on_message_delete(queue_proc_message_delete);
queue_api_on_message_fd_r(queue_proc_message_fd_r);
queue_api_on_envelope_create(queue_proc_envelope_create);
queue_api_on_envelope_delete(queue_proc_envelope_delete);
queue_api_on_envelope_update(queue_proc_envelope_update);
queue_api_on_envelope_load(queue_proc_envelope_load);
queue_api_on_envelope_walk(queue_proc_envelope_walk);
queue_proc_call();
queue_proc_end();
return (1);
}
struct queue_backend queue_backend_proc = {
queue_proc_init,
};