File: [local] / src / usr.sbin / smtpd / queue_backend.c (download)
Revision 1.69, Wed May 31 16:51:46 2023 UTC (12 months ago) by op
Branch: MAIN
CVS Tags: OPENBSD_7_5_BASE, OPENBSD_7_5, OPENBSD_7_4_BASE, OPENBSD_7_4, HEAD Changes since 1.68: +2 -1 lines
add missing include of time.h
spotted after a report on OpenSMTPD-portable. While here include
sys/time.h in smtpd.h, as noted in event_init(3), since it includes
event.h.
ok millert@
|
/* $OpenBSD: queue_backend.c,v 1.69 2023/05/31 16:51:46 op Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <errno.h>
#include <fcntl.h>
#include <grp.h>
#include <inttypes.h>
#include <pwd.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "smtpd.h"
#include "log.h"
static const char* envelope_validate(struct envelope *);
extern struct queue_backend queue_backend_fs;
extern struct queue_backend queue_backend_null;
extern struct queue_backend queue_backend_proc;
extern struct queue_backend queue_backend_ram;
static void queue_envelope_cache_add(struct envelope *);
static void queue_envelope_cache_update(struct envelope *);
static void queue_envelope_cache_del(uint64_t evpid);
TAILQ_HEAD(evplst, envelope);
static struct tree evpcache_tree;
static struct evplst evpcache_list;
static struct queue_backend *backend;
static int (*handler_close)(void);
static int (*handler_message_create)(uint32_t *);
static int (*handler_message_commit)(uint32_t, const char*);
static int (*handler_message_delete)(uint32_t);
static int (*handler_message_fd_r)(uint32_t);
static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
static int (*handler_envelope_delete)(uint64_t);
static int (*handler_envelope_update)(uint64_t, const char *, size_t);
static int (*handler_envelope_load)(uint64_t, char *, size_t);
static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
static int (*handler_message_walk)(uint64_t *, char *, size_t,
uint32_t, int *, void **);
#ifdef QUEUE_PROFILING
static struct {
struct timespec t0;
const char *name;
} profile;
static inline void profile_enter(const char *name)
{
if ((profiling & PROFILE_QUEUE) == 0)
return;
profile.name = name;
clock_gettime(CLOCK_MONOTONIC, &profile.t0);
}
static inline void profile_leave(void)
{
struct timespec t1, dt;
if ((profiling & PROFILE_QUEUE) == 0)
return;
clock_gettime(CLOCK_MONOTONIC, &t1);
timespecsub(&t1, &profile.t0, &dt);
log_debug("profile-queue: %s %lld.%09ld", profile.name,
(long long)dt.tv_sec, dt.tv_nsec);
}
#else
#define profile_enter(x) do {} while (0)
#define profile_leave() do {} while (0)
#endif
static int
queue_message_path(uint32_t msgid, char *buf, size_t len)
{
return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
}
int
queue_init(const char *name, int server)
{
struct passwd *pwq;
struct group *gr;
int r;
pwq = getpwnam(SMTPD_QUEUE_USER);
if (pwq == NULL)
fatalx("unknown user %s", SMTPD_QUEUE_USER);
gr = getgrnam(SMTPD_QUEUE_GROUP);
if (gr == NULL)
fatalx("unknown group %s", SMTPD_QUEUE_GROUP);
tree_init(&evpcache_tree);
TAILQ_INIT(&evpcache_list);
if (!strcmp(name, "fs"))
backend = &queue_backend_fs;
else if (!strcmp(name, "null"))
backend = &queue_backend_null;
else if (!strcmp(name, "ram"))
backend = &queue_backend_ram;
else
backend = &queue_backend_proc;
if (server) {
if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
fatalx("error in spool directory setup");
if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
fatalx("error in offline directory setup");
if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
fatalx("error in purge directory setup");
mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
fatalx("error in purge directory setup");
}
r = backend->init(pwq, server, name);
log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
return (r);
}
int
queue_close(void)
{
if (handler_close)
return (handler_close());
return (1);
}
int
queue_message_create(uint32_t *msgid)
{
int r;
profile_enter("queue_message_create");
r = handler_message_create(msgid);
profile_leave();
log_trace(TRACE_QUEUE,
"queue-backend: queue_message_create() -> %d (%08"PRIx32")",
r, *msgid);
return (r);
}
int
queue_message_delete(uint32_t msgid)
{
char msgpath[PATH_MAX];
uint64_t evpid;
void *iter;
int r;
profile_enter("queue_message_delete");
r = handler_message_delete(msgid);
profile_leave();
/* in case the message is incoming */
queue_message_path(msgid, msgpath, sizeof(msgpath));
unlink(msgpath);
/* remove remaining envelopes from the cache if any (on rollback) */
evpid = msgid_to_evpid(msgid);
for (;;) {
iter = NULL;
if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
break;
if (evpid_to_msgid(evpid) != msgid)
break;
queue_envelope_cache_del(evpid);
}
log_trace(TRACE_QUEUE,
"queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
return (r);
}
int
queue_message_commit(uint32_t msgid)
{
int r;
char msgpath[PATH_MAX];
char tmppath[PATH_MAX];
FILE *ifp = NULL;
FILE *ofp = NULL;
profile_enter("queue_message_commit");
queue_message_path(msgid, msgpath, sizeof(msgpath));
if (env->sc_queue_flags & QUEUE_COMPRESSION) {
bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
ifp = fopen(msgpath, "r");
ofp = fopen(tmppath, "w+");
if (ifp == NULL || ofp == NULL)
goto err;
if (!compress_file(ifp, ofp))
goto err;
fclose(ifp);
fclose(ofp);
ifp = NULL;
ofp = NULL;
if (rename(tmppath, msgpath) == -1) {
if (errno == ENOSPC)
return (0);
unlink(tmppath);
log_warn("rename");
return (0);
}
}
if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
ifp = fopen(msgpath, "r");
ofp = fopen(tmppath, "w+");
if (ifp == NULL || ofp == NULL)
goto err;
if (!crypto_encrypt_file(ifp, ofp))
goto err;
fclose(ifp);
fclose(ofp);
ifp = NULL;
ofp = NULL;
if (rename(tmppath, msgpath) == -1) {
if (errno == ENOSPC)
return (0);
unlink(tmppath);
log_warn("rename");
return (0);
}
}
r = handler_message_commit(msgid, msgpath);
profile_leave();
/* in case it's not done by the backend */
unlink(msgpath);
log_trace(TRACE_QUEUE,
"queue-backend: queue_message_commit(%08"PRIx32") -> %d",
msgid, r);
return (r);
err:
if (ifp)
fclose(ifp);
if (ofp)
fclose(ofp);
return 0;
}
int
queue_message_fd_r(uint32_t msgid)
{
int fdin = -1, fdout = -1, fd = -1;
FILE *ifp = NULL;
FILE *ofp = NULL;
profile_enter("queue_message_fd_r");
fdin = handler_message_fd_r(msgid);
profile_leave();
log_trace(TRACE_QUEUE,
"queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
if (fdin == -1)
return (-1);
if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
if ((fdout = mktmpfile()) == -1)
goto err;
if ((fd = dup(fdout)) == -1)
goto err;
if ((ifp = fdopen(fdin, "r")) == NULL)
goto err;
fdin = fd;
fd = -1;
if ((ofp = fdopen(fdout, "w+")) == NULL)
goto err;
if (!crypto_decrypt_file(ifp, ofp))
goto err;
fclose(ifp);
ifp = NULL;
fclose(ofp);
ofp = NULL;
lseek(fdin, SEEK_SET, 0);
}
if (env->sc_queue_flags & QUEUE_COMPRESSION) {
if ((fdout = mktmpfile()) == -1)
goto err;
if ((fd = dup(fdout)) == -1)
goto err;
if ((ifp = fdopen(fdin, "r")) == NULL)
goto err;
fdin = fd;
fd = -1;
if ((ofp = fdopen(fdout, "w+")) == NULL)
goto err;
if (!uncompress_file(ifp, ofp))
goto err;
fclose(ifp);
ifp = NULL;
fclose(ofp);
ofp = NULL;
lseek(fdin, SEEK_SET, 0);
}
return (fdin);
err:
if (fd != -1)
close(fd);
if (fdin != -1)
close(fdin);
if (fdout != -1)
close(fdout);
if (ifp)
fclose(ifp);
if (ofp)
fclose(ofp);
return -1;
}
int
queue_message_fd_rw(uint32_t msgid)
{
char buf[PATH_MAX];
queue_message_path(msgid, buf, sizeof(buf));
return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
}
static int
queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
{
char *evp;
size_t evplen;
size_t complen;
char compbuf[sizeof(struct envelope)];
size_t enclen;
char encbuf[sizeof(struct envelope)];
evp = evpbuf;
evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
if (evplen == 0)
return (0);
if (env->sc_queue_flags & QUEUE_COMPRESSION) {
complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
if (complen == 0)
return (0);
evp = compbuf;
evplen = complen;
}
if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
if (enclen == 0)
return (0);
evp = encbuf;
evplen = enclen;
}
memmove(evpbuf, evp, evplen);
return (evplen);
}
static int
queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
{
char *evp;
size_t evplen;
char compbuf[sizeof(struct envelope)];
size_t complen;
char encbuf[sizeof(struct envelope)];
size_t enclen;
evp = evpbuf;
evplen = evpbufsize;
if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
if (enclen == 0)
return (0);
evp = encbuf;
evplen = enclen;
}
if (env->sc_queue_flags & QUEUE_COMPRESSION) {
complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
if (complen == 0)
return (0);
evp = compbuf;
evplen = complen;
}
return (envelope_load_buffer(ep, evp, evplen));
}
static void
queue_envelope_cache_add(struct envelope *e)
{
struct envelope *cached;
while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
cached = xcalloc(1, sizeof *cached);
*cached = *e;
TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
tree_xset(&evpcache_tree, e->id, cached);
stat_increment("queue.evpcache.size", 1);
}
static void
queue_envelope_cache_update(struct envelope *e)
{
struct envelope *cached;
if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
queue_envelope_cache_add(e);
stat_increment("queue.evpcache.update.missed", 1);
} else {
TAILQ_REMOVE(&evpcache_list, cached, entry);
*cached = *e;
TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
stat_increment("queue.evpcache.update.hit", 1);
}
}
static void
queue_envelope_cache_del(uint64_t evpid)
{
struct envelope *cached;
if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
return;
TAILQ_REMOVE(&evpcache_list, cached, entry);
free(cached);
stat_decrement("queue.evpcache.size", 1);
}
int
queue_envelope_create(struct envelope *ep)
{
int r;
char evpbuf[sizeof(struct envelope)];
size_t evplen;
uint64_t evpid;
uint32_t msgid;
ep->creation = time(NULL);
evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
if (evplen == 0)
return (0);
evpid = ep->id;
msgid = evpid_to_msgid(evpid);
profile_enter("queue_envelope_create");
r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
profile_leave();
log_trace(TRACE_QUEUE,
"queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
evpid, evplen, r, ep->id);
if (!r) {
ep->creation = 0;
ep->id = 0;
}
if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
queue_envelope_cache_add(ep);
return (r);
}
int
queue_envelope_delete(uint64_t evpid)
{
int r;
if (env->sc_queue_flags & QUEUE_EVPCACHE)
queue_envelope_cache_del(evpid);
profile_enter("queue_envelope_delete");
r = handler_envelope_delete(evpid);
profile_leave();
log_trace(TRACE_QUEUE,
"queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
evpid, r);
return (r);
}
int
queue_envelope_load(uint64_t evpid, struct envelope *ep)
{
const char *e;
char evpbuf[sizeof(struct envelope)];
size_t evplen;
struct envelope *cached;
if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
(cached = tree_get(&evpcache_tree, evpid))) {
*ep = *cached;
stat_increment("queue.evpcache.load.hit", 1);
return (1);
}
ep->id = evpid;
profile_enter("queue_envelope_load");
evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
profile_leave();
log_trace(TRACE_QUEUE,
"queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
evpid, evplen);
if (evplen == 0)
return (0);
if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
if ((e = envelope_validate(ep)) == NULL) {
ep->id = evpid;
if (env->sc_queue_flags & QUEUE_EVPCACHE) {
queue_envelope_cache_add(ep);
stat_increment("queue.evpcache.load.missed", 1);
}
return (1);
}
log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
evpid, e);
}
return (0);
}
int
queue_envelope_update(struct envelope *ep)
{
char evpbuf[sizeof(struct envelope)];
size_t evplen;
int r;
evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
if (evplen == 0)
return (0);
profile_enter("queue_envelope_update");
r = handler_envelope_update(ep->id, evpbuf, evplen);
profile_leave();
if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
queue_envelope_cache_update(ep);
log_trace(TRACE_QUEUE,
"queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
ep->id, r);
return (r);
}
int
queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
{
char evpbuf[sizeof(struct envelope)];
uint64_t evpid;
int r;
const char *e;
profile_enter("queue_message_walk");
r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
msgid, done, data);
profile_leave();
log_trace(TRACE_QUEUE,
"queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
r, evpid);
if (r == -1)
return (r);
if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
if ((e = envelope_validate(ep)) == NULL) {
ep->id = evpid;
/*
* do not cache the envelope here, while discovering
* envelopes one could re-run discover on already
* scheduled envelopes which leads to triggering of
* strict checks in caching. Envelopes could anyway
* be loaded from backend if it isn't cached.
*/
return (1);
}
log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
evpid, e);
}
return (0);
}
int
queue_envelope_walk(struct envelope *ep)
{
const char *e;
uint64_t evpid;
char evpbuf[sizeof(struct envelope)];
int r;
profile_enter("queue_envelope_walk");
r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
profile_leave();
log_trace(TRACE_QUEUE,
"queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
r, evpid);
if (r == -1)
return (r);
if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
if ((e = envelope_validate(ep)) == NULL) {
ep->id = evpid;
if (env->sc_queue_flags & QUEUE_EVPCACHE)
queue_envelope_cache_add(ep);
return (1);
}
log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
evpid, e);
}
return (0);
}
uint32_t
queue_generate_msgid(void)
{
uint32_t msgid;
while ((msgid = arc4random()) == 0)
;
return msgid;
}
uint64_t
queue_generate_evpid(uint32_t msgid)
{
uint32_t rnd;
uint64_t evpid;
while ((rnd = arc4random()) == 0)
;
evpid = msgid;
evpid <<= 32;
evpid |= rnd;
return evpid;
}
static const char*
envelope_validate(struct envelope *ep)
{
if (ep->version != SMTPD_ENVELOPE_VERSION)
return "version mismatch";
if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
return "invalid helo";
if (ep->helo[0] == '\0')
return "empty helo";
if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
return "invalid hostname";
if (ep->hostname[0] == '\0')
return "empty hostname";
if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
return "invalid error line";
if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
return "unknown dispatcher";
return NULL;
}
void
queue_api_on_close(int(*cb)(void))
{
handler_close = cb;
}
void
queue_api_on_message_create(int(*cb)(uint32_t *))
{
handler_message_create = cb;
}
void
queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
{
handler_message_commit = cb;
}
void
queue_api_on_message_delete(int(*cb)(uint32_t))
{
handler_message_delete = cb;
}
void
queue_api_on_message_fd_r(int(*cb)(uint32_t))
{
handler_message_fd_r = cb;
}
void
queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
{
handler_envelope_create = cb;
}
void
queue_api_on_envelope_delete(int(*cb)(uint64_t))
{
handler_envelope_delete = cb;
}
void
queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
{
handler_envelope_update = cb;
}
void
queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
{
handler_envelope_load = cb;
}
void
queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
{
handler_envelope_walk = cb;
}
void
queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
uint32_t, int *, void **))
{
handler_message_walk = cb;
}