[BACK]Return to queue_backend.c CVS log [TXT][DIR] Up to [local] / src / usr.sbin / smtpd

File: [local] / src / usr.sbin / smtpd / queue_backend.c (download)

Revision 1.69, Wed May 31 16:51:46 2023 UTC (12 months ago) by op
Branch: MAIN
CVS Tags: OPENBSD_7_5_BASE, OPENBSD_7_5, OPENBSD_7_4_BASE, OPENBSD_7_4, HEAD
Changes since 1.68: +2 -1 lines

add missing include of time.h

spotted after a report on OpenSMTPD-portable.  While here include
sys/time.h in smtpd.h, as noted in event_init(3), since it includes
event.h.

ok millert@

/*	$OpenBSD: queue_backend.c,v 1.69 2023/05/31 16:51:46 op Exp $	*/

/*
 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <errno.h>
#include <fcntl.h>
#include <grp.h>
#include <inttypes.h>
#include <pwd.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include "smtpd.h"
#include "log.h"

static const char* envelope_validate(struct envelope *);

extern struct queue_backend	queue_backend_fs;
extern struct queue_backend	queue_backend_null;
extern struct queue_backend	queue_backend_proc;
extern struct queue_backend	queue_backend_ram;

static void queue_envelope_cache_add(struct envelope *);
static void queue_envelope_cache_update(struct envelope *);
static void queue_envelope_cache_del(uint64_t evpid);

TAILQ_HEAD(evplst, envelope);

static struct tree		evpcache_tree;
static struct evplst		evpcache_list;
static struct queue_backend	*backend;

static int (*handler_close)(void);
static int (*handler_message_create)(uint32_t *);
static int (*handler_message_commit)(uint32_t, const char*);
static int (*handler_message_delete)(uint32_t);
static int (*handler_message_fd_r)(uint32_t);
static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
static int (*handler_envelope_delete)(uint64_t);
static int (*handler_envelope_update)(uint64_t, const char *, size_t);
static int (*handler_envelope_load)(uint64_t, char *, size_t);
static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
static int (*handler_message_walk)(uint64_t *, char *, size_t,
    uint32_t, int *, void **);

#ifdef QUEUE_PROFILING

static struct {
	struct timespec	 t0;
	const char	*name;
} profile;

static inline void profile_enter(const char *name)
{
	if ((profiling & PROFILE_QUEUE) == 0)
		return;

	profile.name = name;
	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
}

static inline void profile_leave(void)
{
	struct timespec	 t1, dt;

	if ((profiling & PROFILE_QUEUE) == 0)
		return;

	clock_gettime(CLOCK_MONOTONIC, &t1);
	timespecsub(&t1, &profile.t0, &dt);
	log_debug("profile-queue: %s %lld.%09ld", profile.name,
	    (long long)dt.tv_sec, dt.tv_nsec);
}
#else
#define profile_enter(x)	do {} while (0)
#define profile_leave()		do {} while (0)
#endif

static int
queue_message_path(uint32_t msgid, char *buf, size_t len)
{
	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
}

int
queue_init(const char *name, int server)
{
	struct passwd	*pwq;
	struct group	*gr;
	int		 r;

	pwq = getpwnam(SMTPD_QUEUE_USER);
	if (pwq == NULL)
		fatalx("unknown user %s", SMTPD_QUEUE_USER);

	gr = getgrnam(SMTPD_QUEUE_GROUP);
	if (gr == NULL)
		fatalx("unknown group %s", SMTPD_QUEUE_GROUP);

	tree_init(&evpcache_tree);
	TAILQ_INIT(&evpcache_list);

	if (!strcmp(name, "fs"))
		backend = &queue_backend_fs;
	else if (!strcmp(name, "null"))
		backend = &queue_backend_null;
	else if (!strcmp(name, "ram"))
		backend = &queue_backend_ram;
	else
		backend = &queue_backend_proc;

	if (server) {
		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
			fatalx("error in spool directory setup");
		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
			fatalx("error in offline directory setup");
		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
			fatalx("error in purge directory setup");

		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);

		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
			fatalx("error in purge directory setup");
	}

	r = backend->init(pwq, server, name);

	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);

	return (r);
}

int
queue_close(void)
{
	if (handler_close)
		return (handler_close());

	return (1);
}

int
queue_message_create(uint32_t *msgid)
{
	int	r;

	profile_enter("queue_message_create");
	r = handler_message_create(msgid);
	profile_leave();

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
	    r, *msgid);

	return (r);
}

int
queue_message_delete(uint32_t msgid)
{
	char	msgpath[PATH_MAX];
	uint64_t evpid;
	void   *iter;
	int	r;

	profile_enter("queue_message_delete");
	r = handler_message_delete(msgid);
	profile_leave();

	/* in case the message is incoming */
	queue_message_path(msgid, msgpath, sizeof(msgpath));
	unlink(msgpath);

	/* remove remaining envelopes from the cache if any (on rollback) */
	evpid = msgid_to_evpid(msgid);
	for (;;) {
		iter = NULL;
		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
			break;
		if (evpid_to_msgid(evpid) != msgid)
			break;
		queue_envelope_cache_del(evpid);
	}

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);

	return (r);
}

int
queue_message_commit(uint32_t msgid)
{
	int	r;
	char	msgpath[PATH_MAX];
	char	tmppath[PATH_MAX];
	FILE	*ifp = NULL;
	FILE	*ofp = NULL;

	profile_enter("queue_message_commit");

	queue_message_path(msgid, msgpath, sizeof(msgpath));

	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
		ifp = fopen(msgpath, "r");
		ofp = fopen(tmppath, "w+");
		if (ifp == NULL || ofp == NULL)
			goto err;
		if (!compress_file(ifp, ofp))
			goto err;
		fclose(ifp);
		fclose(ofp);
		ifp = NULL;
		ofp = NULL;

		if (rename(tmppath, msgpath) == -1) {
			if (errno == ENOSPC)
				return (0);
			unlink(tmppath);
			log_warn("rename");
			return (0);
		}
	}

	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
		ifp = fopen(msgpath, "r");
		ofp = fopen(tmppath, "w+");
		if (ifp == NULL || ofp == NULL)
			goto err;
		if (!crypto_encrypt_file(ifp, ofp))
			goto err;
		fclose(ifp);
		fclose(ofp);
		ifp = NULL;
		ofp = NULL;

		if (rename(tmppath, msgpath) == -1) {
			if (errno == ENOSPC)
				return (0);
			unlink(tmppath);
			log_warn("rename");
			return (0);
		}
	}

	r = handler_message_commit(msgid, msgpath);
	profile_leave();

	/* in case it's not done by the backend */
	unlink(msgpath);

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
	    msgid, r);

	return (r);

err:
	if (ifp)
		fclose(ifp);
	if (ofp)
		fclose(ofp);
	return 0;
}

int
queue_message_fd_r(uint32_t msgid)
{
	int	fdin = -1, fdout = -1, fd = -1;
	FILE	*ifp = NULL;
	FILE	*ofp = NULL;

	profile_enter("queue_message_fd_r");
	fdin = handler_message_fd_r(msgid);
	profile_leave();

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);

	if (fdin == -1)
		return (-1);

	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
		if ((fdout = mktmpfile()) == -1)
			goto err;
		if ((fd = dup(fdout)) == -1)
			goto err;
		if ((ifp = fdopen(fdin, "r")) == NULL)
			goto err;
		fdin = fd;
		fd = -1;
		if ((ofp = fdopen(fdout, "w+")) == NULL)
			goto err;

		if (!crypto_decrypt_file(ifp, ofp))
			goto err;

		fclose(ifp);
		ifp = NULL;
		fclose(ofp);
		ofp = NULL;
		lseek(fdin, SEEK_SET, 0);
	}

	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
		if ((fdout = mktmpfile()) == -1)
			goto err;
		if ((fd = dup(fdout)) == -1)
			goto err;
		if ((ifp = fdopen(fdin, "r")) == NULL)
			goto err;
		fdin = fd;
		fd = -1;
		if ((ofp = fdopen(fdout, "w+")) == NULL)
			goto err;

		if (!uncompress_file(ifp, ofp))
			goto err;

		fclose(ifp);
		ifp = NULL;
		fclose(ofp);
		ofp = NULL;
		lseek(fdin, SEEK_SET, 0);
	}

	return (fdin);

err:
	if (fd != -1)
		close(fd);
	if (fdin != -1)
		close(fdin);
	if (fdout != -1)
		close(fdout);
	if (ifp)
		fclose(ifp);
	if (ofp)
		fclose(ofp);
	return -1;
}

int
queue_message_fd_rw(uint32_t msgid)
{
	char buf[PATH_MAX];

	queue_message_path(msgid, buf, sizeof(buf));

	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
}

static int
queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
{
	char   *evp;
	size_t	evplen;
	size_t	complen;
	char	compbuf[sizeof(struct envelope)];
	size_t	enclen;
	char	encbuf[sizeof(struct envelope)];

	evp = evpbuf;
	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
	if (evplen == 0)
		return (0);

	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
		if (complen == 0)
			return (0);
		evp = compbuf;
		evplen = complen;
	}

	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
		if (enclen == 0)
			return (0);
		evp = encbuf;
		evplen = enclen;
	}

	memmove(evpbuf, evp, evplen);

	return (evplen);
}

static int
queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
{
	char		*evp;
	size_t		 evplen;
	char		 compbuf[sizeof(struct envelope)];
	size_t		 complen;
	char		 encbuf[sizeof(struct envelope)];
	size_t		 enclen;

	evp = evpbuf;
	evplen = evpbufsize;

	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
		if (enclen == 0)
			return (0);
		evp = encbuf;
		evplen = enclen;
	}

	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
		if (complen == 0)
			return (0);
		evp = compbuf;
		evplen = complen;
	}

	return (envelope_load_buffer(ep, evp, evplen));
}

static void
queue_envelope_cache_add(struct envelope *e)
{
	struct envelope *cached;

	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);

	cached = xcalloc(1, sizeof *cached);
	*cached = *e;
	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
	tree_xset(&evpcache_tree, e->id, cached);
	stat_increment("queue.evpcache.size", 1);
}

static void
queue_envelope_cache_update(struct envelope *e)
{
	struct envelope *cached;

	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
		queue_envelope_cache_add(e);
		stat_increment("queue.evpcache.update.missed", 1);
	} else {
		TAILQ_REMOVE(&evpcache_list, cached, entry);
		*cached = *e;
		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
		stat_increment("queue.evpcache.update.hit", 1);
	}
}

static void
queue_envelope_cache_del(uint64_t evpid)
{
	struct envelope *cached;

	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
		return;

	TAILQ_REMOVE(&evpcache_list, cached, entry);
	free(cached);
	stat_decrement("queue.evpcache.size", 1);
}

int
queue_envelope_create(struct envelope *ep)
{
	int		 r;
	char		 evpbuf[sizeof(struct envelope)];
	size_t		 evplen;
	uint64_t	 evpid;
	uint32_t	 msgid;

	ep->creation = time(NULL);
	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
	if (evplen == 0)
		return (0);

	evpid = ep->id;
	msgid = evpid_to_msgid(evpid);

	profile_enter("queue_envelope_create");
	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
	profile_leave();

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
	    evpid, evplen, r, ep->id);

	if (!r) {
		ep->creation = 0;
		ep->id = 0;
	}

	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
		queue_envelope_cache_add(ep);

	return (r);
}

int
queue_envelope_delete(uint64_t evpid)
{
	int	r;

	if (env->sc_queue_flags & QUEUE_EVPCACHE)
		queue_envelope_cache_del(evpid);

	profile_enter("queue_envelope_delete");
	r = handler_envelope_delete(evpid);
	profile_leave();

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
	    evpid, r);

	return (r);
}

int
queue_envelope_load(uint64_t evpid, struct envelope *ep)
{
	const char	*e;
	char		 evpbuf[sizeof(struct envelope)];
	size_t		 evplen;
	struct envelope	*cached;

	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
	    (cached = tree_get(&evpcache_tree, evpid))) {
		*ep = *cached;
		stat_increment("queue.evpcache.load.hit", 1);
		return (1);
	}

	ep->id = evpid;
	profile_enter("queue_envelope_load");
	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
	profile_leave();

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
	    evpid, evplen);

	if (evplen == 0)
		return (0);

	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
		if ((e = envelope_validate(ep)) == NULL) {
			ep->id = evpid;
			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
				queue_envelope_cache_add(ep);
				stat_increment("queue.evpcache.load.missed", 1);
			}
			return (1);
		}
		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
		    evpid, e);
	}
	return (0);
}

int
queue_envelope_update(struct envelope *ep)
{
	char	evpbuf[sizeof(struct envelope)];
	size_t	evplen;
	int	r;

	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
	if (evplen == 0)
		return (0);

	profile_enter("queue_envelope_update");
	r = handler_envelope_update(ep->id, evpbuf, evplen);
	profile_leave();

	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
		queue_envelope_cache_update(ep);

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
	    ep->id, r);

	return (r);
}

int
queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
{
	char		 evpbuf[sizeof(struct envelope)];
	uint64_t	 evpid;
	int		 r;
	const char	*e;

	profile_enter("queue_message_walk");
	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
	    msgid, done, data);
	profile_leave();

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
	    r, evpid);

	if (r == -1)
		return (r);

	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
		if ((e = envelope_validate(ep)) == NULL) {
			ep->id = evpid;
			/*
			 * do not cache the envelope here, while discovering
			 * envelopes one could re-run discover on already
			 * scheduled envelopes which leads to triggering of
			 * strict checks in caching. Envelopes could anyway
			 * be loaded from backend if it isn't cached.
			 */
			return (1);
		}
		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
		    evpid, e);
	}
	return (0);
}

int
queue_envelope_walk(struct envelope *ep)
{
	const char	*e;
	uint64_t	 evpid;
	char		 evpbuf[sizeof(struct envelope)];
	int		 r;

	profile_enter("queue_envelope_walk");
	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
	profile_leave();

	log_trace(TRACE_QUEUE,
	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
	    r, evpid);

	if (r == -1)
		return (r);

	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
		if ((e = envelope_validate(ep)) == NULL) {
			ep->id = evpid;
			if (env->sc_queue_flags & QUEUE_EVPCACHE)
				queue_envelope_cache_add(ep);
			return (1);
		}
		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
		    evpid, e);
	}
	return (0);
}

uint32_t
queue_generate_msgid(void)
{
	uint32_t msgid;

	while ((msgid = arc4random()) == 0)
		;

	return msgid;
}

uint64_t
queue_generate_evpid(uint32_t msgid)
{
	uint32_t rnd;
	uint64_t evpid;

	while ((rnd = arc4random()) == 0)
		;

	evpid = msgid;
	evpid <<= 32;
	evpid |= rnd;

	return evpid;
}

static const char*
envelope_validate(struct envelope *ep)
{
	if (ep->version != SMTPD_ENVELOPE_VERSION)
		return "version mismatch";

	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
		return "invalid helo";
	if (ep->helo[0] == '\0')
		return "empty helo";

	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
		return "invalid hostname";
	if (ep->hostname[0] == '\0')
		return "empty hostname";

	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
		return "invalid error line";

	if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
		return "unknown dispatcher";

	return NULL;
}

void
queue_api_on_close(int(*cb)(void))
{
	handler_close = cb;
}

void
queue_api_on_message_create(int(*cb)(uint32_t *))
{
	handler_message_create = cb;
}

void
queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
{
	handler_message_commit = cb;
}

void
queue_api_on_message_delete(int(*cb)(uint32_t))
{
	handler_message_delete = cb;
}

void
queue_api_on_message_fd_r(int(*cb)(uint32_t))
{
	handler_message_fd_r = cb;
}

void
queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
{
	handler_envelope_create = cb;
}

void
queue_api_on_envelope_delete(int(*cb)(uint64_t))
{
	handler_envelope_delete = cb;
}

void
queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
{
	handler_envelope_update = cb;
}

void
queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
{
	handler_envelope_load = cb;
}

void
queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
{
	handler_envelope_walk = cb;
}

void
queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
    uint32_t, int *, void **))
{
	handler_message_walk = cb;
}