[BACK]Return to scheduler_ramqueue.c CVS log [TXT][DIR] Up to [local] / src / usr.sbin / smtpd

File: [local] / src / usr.sbin / smtpd / scheduler_ramqueue.c (download)

Revision 1.48, Wed May 31 16:51:46 2023 UTC (12 months ago) by op
Branch: MAIN
CVS Tags: OPENBSD_7_5_BASE, OPENBSD_7_5, OPENBSD_7_4_BASE, OPENBSD_7_4, HEAD
Changes since 1.47: +2 -1 lines

add missing include of time.h

spotted after a report on OpenSMTPD-portable.  While here include
sys/time.h in smtpd.h, as noted in event_init(3), since it includes
event.h.

ok millert@

/*	$OpenBSD: scheduler_ramqueue.c,v 1.48 2023/05/31 16:51:46 op Exp $	*/

/*
 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include "smtpd.h"
#include "log.h"

TAILQ_HEAD(evplist, rq_envelope);

struct rq_message {
	uint32_t		 msgid;
	struct tree		 envelopes;
};

struct rq_envelope {
	TAILQ_ENTRY(rq_envelope) entry;
	SPLAY_ENTRY(rq_envelope) t_entry;

	uint64_t		 evpid;
	uint64_t		 holdq;
	enum delivery_type	 type;

#define	RQ_EVPSTATE_PENDING	 0
#define	RQ_EVPSTATE_SCHEDULED	 1
#define	RQ_EVPSTATE_INFLIGHT	 2
#define	RQ_EVPSTATE_HELD	 3
	uint8_t			 state;

#define	RQ_ENVELOPE_EXPIRED	 0x01
#define	RQ_ENVELOPE_REMOVED	 0x02
#define	RQ_ENVELOPE_SUSPEND	 0x04
#define	RQ_ENVELOPE_UPDATE	 0x08
#define	RQ_ENVELOPE_OVERFLOW	 0x10
	uint8_t			 flags;

	time_t			 ctime;
	time_t			 sched;
	time_t			 expire;

	struct rq_message	*message;

	time_t			 t_inflight;
	time_t			 t_scheduled;
};

struct rq_holdq {
	struct evplist		 q;
	size_t			 count;
};

struct rq_queue {
	size_t			 evpcount;
	struct tree		 messages;
	SPLAY_HEAD(prioqtree, rq_envelope)	q_priotree;

	struct evplist		 q_pending;
	struct evplist		 q_inflight;

	struct evplist		 q_mta;
	struct evplist		 q_mda;
	struct evplist		 q_bounce;
	struct evplist		 q_update;
	struct evplist		 q_expired;
	struct evplist		 q_removed;
};

static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);

SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
static int scheduler_ram_init(const char *);
static int scheduler_ram_insert(struct scheduler_info *);
static size_t scheduler_ram_commit(uint32_t);
static size_t scheduler_ram_rollback(uint32_t);
static int scheduler_ram_update(struct scheduler_info *);
static int scheduler_ram_delete(uint64_t);
static int scheduler_ram_hold(uint64_t, uint64_t);
static int scheduler_ram_release(int, uint64_t, int);
static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
static int scheduler_ram_schedule(uint64_t);
static int scheduler_ram_remove(uint64_t);
static int scheduler_ram_suspend(uint64_t);
static int scheduler_ram_resume(uint64_t);
static int scheduler_ram_query(uint64_t);

static void sorted_insert(struct rq_queue *, struct rq_envelope *);

static void rq_queue_init(struct rq_queue *);
static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
static void rq_queue_dump(struct rq_queue *, const char *);
static void rq_queue_schedule(struct rq_queue *rq);
static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
static const char *rq_envelope_to_text(struct rq_envelope *);

struct scheduler_backend scheduler_backend_ramqueue = {
	scheduler_ram_init,

	scheduler_ram_insert,
	scheduler_ram_commit,
	scheduler_ram_rollback,

	scheduler_ram_update,
	scheduler_ram_delete,
	scheduler_ram_hold,
	scheduler_ram_release,

	scheduler_ram_batch,

	scheduler_ram_messages,
	scheduler_ram_envelopes,
	scheduler_ram_schedule,
	scheduler_ram_remove,
	scheduler_ram_suspend,
	scheduler_ram_resume,
	scheduler_ram_query,
};

static struct rq_queue	ramqueue;
static struct tree	updates;
static struct tree	holdqs[3]; /* delivery type */

static time_t		currtime;

#define BACKOFF_TRANSFER	400
#define BACKOFF_DELIVERY	10
#define BACKOFF_OVERFLOW	3

static time_t
scheduler_backoff(time_t t0, time_t base, uint32_t step)
{
	return (t0 + base * step * step);
}

static time_t
scheduler_next(time_t t0, time_t base, uint32_t step)
{
	time_t t;

	/* XXX be more efficient */
	while ((t = scheduler_backoff(t0, base, step)) <= currtime)
		step++;

	return (t);
}

static int
scheduler_ram_init(const char *arg)
{
	rq_queue_init(&ramqueue);
	tree_init(&updates);
	tree_init(&holdqs[D_MDA]);
	tree_init(&holdqs[D_MTA]);
	tree_init(&holdqs[D_BOUNCE]);

	return (1);
}

static int
scheduler_ram_insert(struct scheduler_info *si)
{
	struct rq_queue		*update;
	struct rq_message	*message;
	struct rq_envelope	*envelope;
	uint32_t		 msgid;

	currtime = time(NULL);

	msgid = evpid_to_msgid(si->evpid);

	/* find/prepare a ramqueue update */
	if ((update = tree_get(&updates, msgid)) == NULL) {
		update = xcalloc(1, sizeof *update);
		stat_increment("scheduler.ramqueue.update", 1);
		rq_queue_init(update);
		tree_xset(&updates, msgid, update);
	}

	/* find/prepare the msgtree message in ramqueue update */
	if ((message = tree_get(&update->messages, msgid)) == NULL) {
		message = xcalloc(1, sizeof *message);
		message->msgid = msgid;
		tree_init(&message->envelopes);
		tree_xset(&update->messages, msgid, message);
		stat_increment("scheduler.ramqueue.message", 1);
	}

	/* create envelope in ramqueue message */
	envelope = xcalloc(1, sizeof *envelope);
	envelope->evpid = si->evpid;
	envelope->type = si->type;
	envelope->message = message;
	envelope->ctime = si->creation;
	envelope->expire = si->creation + si->ttl;
	envelope->sched = scheduler_backoff(si->creation,
	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
	tree_xset(&message->envelopes, envelope->evpid, envelope);

	update->evpcount++;
	stat_increment("scheduler.ramqueue.envelope", 1);

	envelope->state = RQ_EVPSTATE_PENDING;
	TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);

	si->nexttry = envelope->sched;

	return (1);
}

static size_t
scheduler_ram_commit(uint32_t msgid)
{
	struct rq_queue	*update;
	size_t		 r;

	currtime = time(NULL);

	update = tree_xpop(&updates, msgid);
	r = update->evpcount;

	if (tracing & TRACE_SCHEDULER)
		rq_queue_dump(update, "update to commit");

	rq_queue_merge(&ramqueue, update);

	if (tracing & TRACE_SCHEDULER)
		rq_queue_dump(&ramqueue, "resulting queue");

	rq_queue_schedule(&ramqueue);

	free(update);
	stat_decrement("scheduler.ramqueue.update", 1);

	return (r);
}

static size_t
scheduler_ram_rollback(uint32_t msgid)
{
	struct rq_queue		*update;
	struct rq_envelope	*evp;
	size_t			 r;

	currtime = time(NULL);

	if ((update = tree_pop(&updates, msgid)) == NULL)
		return (0);
	r = update->evpcount;

	while ((evp = TAILQ_FIRST(&update->q_pending))) {
		TAILQ_REMOVE(&update->q_pending, evp, entry);
		rq_envelope_delete(update, evp);
	}

	free(update);
	stat_decrement("scheduler.ramqueue.update", 1);

	return (r);
}

static int
scheduler_ram_update(struct scheduler_info *si)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;

	currtime = time(NULL);

	msgid = evpid_to_msgid(si->evpid);
	msg = tree_xget(&ramqueue.messages, msgid);
	evp = tree_xget(&msg->envelopes, si->evpid);

	/* it *must* be in-flight */
	if (evp->state != RQ_EVPSTATE_INFLIGHT)
		fatalx("evp:%016" PRIx64 " not in-flight", si->evpid);

	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);

	/*
	 * If the envelope was removed while inflight,  schedule it for
	 * removal immediately.
	 */
	if (evp->flags & RQ_ENVELOPE_REMOVED) {
		TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
		evp->state = RQ_EVPSTATE_SCHEDULED;
		evp->t_scheduled = currtime;
		return (1);
	}

	evp->sched = scheduler_next(evp->ctime,
	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);

	evp->state = RQ_EVPSTATE_PENDING;
	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
		sorted_insert(&ramqueue, evp);

	si->nexttry = evp->sched;

	return (1);
}

static int
scheduler_ram_delete(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;

	currtime = time(NULL);

	msgid = evpid_to_msgid(evpid);
	msg = tree_xget(&ramqueue.messages, msgid);
	evp = tree_xget(&msg->envelopes, evpid);

	/* it *must* be in-flight */
	if (evp->state != RQ_EVPSTATE_INFLIGHT)
		fatalx("evp:%016" PRIx64 " not in-flight", evpid);

	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);

	rq_envelope_delete(&ramqueue, evp);

	return (1);
}

#define HOLDQ_MAXSIZE	1000

static int
scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
{
	struct rq_holdq		*hq;
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;

	currtime = time(NULL);

	msgid = evpid_to_msgid(evpid);
	msg = tree_xget(&ramqueue.messages, msgid);
	evp = tree_xget(&msg->envelopes, evpid);

	/* it *must* be in-flight */
	if (evp->state != RQ_EVPSTATE_INFLIGHT)
		fatalx("evp:%016" PRIx64 " not in-flight", evpid);

	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);

	/* If the envelope is suspended, just mark it as pending */
	if (evp->flags & RQ_ENVELOPE_SUSPEND) {
		evp->state = RQ_EVPSTATE_PENDING;
		return (0);
	}

	hq = tree_get(&holdqs[evp->type], holdq);
	if (hq == NULL) {
		hq = xcalloc(1, sizeof(*hq));
		TAILQ_INIT(&hq->q);
		tree_xset(&holdqs[evp->type], holdq, hq);
		stat_increment("scheduler.ramqueue.holdq", 1);
	}

	/* If the holdq is full, just "tempfail" the envelope */
	if (hq->count >= HOLDQ_MAXSIZE) {
		evp->state = RQ_EVPSTATE_PENDING;
		evp->flags |= RQ_ENVELOPE_UPDATE;
		evp->flags |= RQ_ENVELOPE_OVERFLOW;
		sorted_insert(&ramqueue, evp);
		stat_increment("scheduler.ramqueue.hold-overflow", 1);
		return (0);
	}

	evp->state = RQ_EVPSTATE_HELD;
	evp->holdq = holdq;
	/* This is an optimization: upon release, the envelopes will be
	 * inserted in the pending queue from the first element to the last.
	 * Since elements already in the queue were received first, they
	 * were scheduled first, so they will be reinserted before the
	 * current element.
	 */
	TAILQ_INSERT_HEAD(&hq->q, evp, entry);
	hq->count += 1;
	stat_increment("scheduler.ramqueue.hold", 1);

	return (1);
}

static int
scheduler_ram_release(int type, uint64_t holdq, int n)
{
	struct rq_holdq		*hq;
	struct rq_envelope	*evp;
	int			 i, update;

	currtime = time(NULL);

	hq = tree_get(&holdqs[type], holdq);
	if (hq == NULL)
		return (0);

	if (n == -1) {
		n = 0;
		update = 1;
	}
	else
		update = 0;

	for (i = 0; n == 0 || i < n; i++) {
		evp = TAILQ_FIRST(&hq->q);
		if (evp == NULL)
			break;

		TAILQ_REMOVE(&hq->q, evp, entry);
		hq->count -= 1;
		evp->holdq = 0;

		/* When released, all envelopes are put in the pending queue
		 * and will be rescheduled immediately.  As an optimization,
		 * we could just schedule them directly.
		 */
		evp->state = RQ_EVPSTATE_PENDING;
		if (update)
			evp->flags |= RQ_ENVELOPE_UPDATE;
		sorted_insert(&ramqueue, evp);
	}

	if (TAILQ_EMPTY(&hq->q)) {
		tree_xpop(&holdqs[type], holdq);
		free(hq);
		stat_decrement("scheduler.ramqueue.holdq", 1);
	}
	stat_decrement("scheduler.ramqueue.hold", i);

	return (i);
}

static int
scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
{
	struct rq_envelope	*evp;
	size_t			 i, n;
	time_t			 t;

	currtime = time(NULL);

	rq_queue_schedule(&ramqueue);
	if (tracing & TRACE_SCHEDULER)
		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");

	i = 0;
	n = 0;

	for (;;) {

		if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
			TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
			types[i] = SCHED_REMOVE;
			evpids[i] = evp->evpid;
			rq_envelope_delete(&ramqueue, evp);

			if (++i == *count)
				break;
		}

		if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
			TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
			types[i] = SCHED_EXPIRE;
			evpids[i] = evp->evpid;
			rq_envelope_delete(&ramqueue, evp);

			if (++i == *count)
				break;
		}

		if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
			TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
			types[i] = SCHED_UPDATE;
			evpids[i] = evp->evpid;

			if (evp->flags & RQ_ENVELOPE_OVERFLOW)
				t = BACKOFF_OVERFLOW;
			else if (evp->type == D_MTA)
				t = BACKOFF_TRANSFER;
			else
				t = BACKOFF_DELIVERY;

			evp->sched = scheduler_next(evp->ctime, t, 0);
			evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
			evp->state = RQ_EVPSTATE_PENDING;
			if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
				sorted_insert(&ramqueue, evp);

			if (++i == *count)
				break;
		}

		if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
			TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
			types[i] = SCHED_BOUNCE;
			evpids[i] = evp->evpid;

			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
			evp->state = RQ_EVPSTATE_INFLIGHT;
			evp->t_inflight = currtime;

			if (++i == *count)
				break;
		}

		if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
			TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
			types[i] = SCHED_MDA;
			evpids[i] = evp->evpid;

			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
			evp->state = RQ_EVPSTATE_INFLIGHT;
			evp->t_inflight = currtime;

			if (++i == *count)
				break;
		}

		if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
			TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
			types[i] = SCHED_MTA;
			evpids[i] = evp->evpid;

			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
			evp->state = RQ_EVPSTATE_INFLIGHT;
			evp->t_inflight = currtime;

			if (++i == *count)
				break;
		}

		/* nothing seen this round */
		if (i == n)
			break;

		n = i;
	}

	if (i) {
		*count = i;
		return (1);
	}

	if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
		if (evp->sched < evp->expire)
			t = evp->sched;
		else
			t = evp->expire;
		*delay = (t < currtime) ? 0 : (t - currtime);
	}
	else
		*delay = -1;

	return (0);
}

static size_t
scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
{
	uint64_t id;
	size_t	 n;
	void	*i;

	for (n = 0, i = NULL; n < size; n++) {
		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
			break;
		dst[n] = id;
	}

	return (n);
}

static size_t
scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	void			*i;
	size_t			 n;

	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
		return (0);

	for (n = 0, i = NULL; n < size; ) {

		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
		    (void**)&evp) == 0)
			break;

		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
			continue;

		dst[n].evpid = evp->evpid;
		dst[n].flags = 0;
		dst[n].retry = 0;
		dst[n].time = 0;

		if (evp->state == RQ_EVPSTATE_PENDING) {
			dst[n].time = evp->sched;
			dst[n].flags = EF_PENDING;
		}
		else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
			dst[n].time = evp->t_scheduled;
			dst[n].flags = EF_PENDING;
		}
		else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
			dst[n].time = evp->t_inflight;
			dst[n].flags = EF_INFLIGHT;
		}
		else if (evp->state == RQ_EVPSTATE_HELD) {
			/* same as scheduled */
			dst[n].time = evp->t_scheduled;
			dst[n].flags = EF_PENDING;
			dst[n].flags |= EF_HOLD;
		}
		if (evp->flags & RQ_ENVELOPE_SUSPEND)
			dst[n].flags |= EF_SUSPEND;

		n++;
	}

	return (n);
}

static int
scheduler_ram_schedule(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (evp->state == RQ_EVPSTATE_INFLIGHT)
			return (0);
		rq_envelope_schedule(&ramqueue, evp);
		return (1);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
			if (evp->state == RQ_EVPSTATE_INFLIGHT)
				continue;
			rq_envelope_schedule(&ramqueue, evp);
			r++;
		}
		return (r);
	}
}

static int
scheduler_ram_remove(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (rq_envelope_remove(&ramqueue, evp))
			return (1);
		return (0);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
			if (rq_envelope_remove(&ramqueue, evp))
				r++;
		return (r);
	}
}

static int
scheduler_ram_suspend(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (rq_envelope_suspend(&ramqueue, evp))
			return (1);
		return (0);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
			if (rq_envelope_suspend(&ramqueue, evp))
				r++;
		return (r);
	}
}

static int
scheduler_ram_resume(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (rq_envelope_resume(&ramqueue, evp))
			return (1);
		return (0);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
			if (rq_envelope_resume(&ramqueue, evp))
				r++;
		return (r);
	}
}

static int
scheduler_ram_query(uint64_t evpid)
{
	uint32_t msgid;

	if (evpid > 0xffffffff)
		msgid = evpid_to_msgid(evpid);
	else
		msgid = evpid;

	if (tree_get(&ramqueue.messages, msgid) == NULL)
		return (0);

	return (1);
}

static void
sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
{
	struct rq_envelope	*evp2;

	SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
	evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
	if (evp2)
		TAILQ_INSERT_BEFORE(evp2, evp, entry);
	else
		TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
}

static void
rq_queue_init(struct rq_queue *rq)
{
	memset(rq, 0, sizeof *rq);
	tree_init(&rq->messages);
	TAILQ_INIT(&rq->q_pending);
	TAILQ_INIT(&rq->q_inflight);
	TAILQ_INIT(&rq->q_mta);
	TAILQ_INIT(&rq->q_mda);
	TAILQ_INIT(&rq->q_bounce);
	TAILQ_INIT(&rq->q_update);
	TAILQ_INIT(&rq->q_expired);
	TAILQ_INIT(&rq->q_removed);
	SPLAY_INIT(&rq->q_priotree);
}

static void
rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
{
	struct rq_message	*message, *tomessage;
	struct rq_envelope	*envelope;
	uint64_t		 id;
	void			*i;

	while (tree_poproot(&update->messages, &id, (void*)&message)) {
		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
			/* message does not exist. re-use structure */
			tree_xset(&rq->messages, id, message);
			continue;
		}
		/* need to re-link all envelopes before merging them */
		i = NULL;
		while ((tree_iter(&message->envelopes, &i, &id,
		    (void*)&envelope)))
			envelope->message = tomessage;
		tree_merge(&tomessage->envelopes, &message->envelopes);
		free(message);
		stat_decrement("scheduler.ramqueue.message", 1);
	}

	/* Sorted insert in the pending queue */
	while ((envelope = TAILQ_FIRST(&update->q_pending))) {
		TAILQ_REMOVE(&update->q_pending, envelope, entry);
		sorted_insert(rq, envelope);
	}

	rq->evpcount += update->evpcount;
}

#define SCHEDULEMAX	1024

static void
rq_queue_schedule(struct rq_queue *rq)
{
	struct rq_envelope	*evp;
	size_t			 n;

	n = 0;
	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
		if (evp->sched > currtime && evp->expire > currtime)
			break;

		if (n == SCHEDULEMAX)
			break;

		if (evp->state != RQ_EVPSTATE_PENDING)
			fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid,
			    evp->flags);

		if (evp->expire <= currtime) {
			TAILQ_REMOVE(&rq->q_pending, evp, entry);
			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
			evp->state = RQ_EVPSTATE_SCHEDULED;
			evp->flags |= RQ_ENVELOPE_EXPIRED;
			evp->t_scheduled = currtime;
			continue;
		}
		rq_envelope_schedule(rq, evp);
		n += 1;
	}
}

static struct evplist *
rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
{
	switch (evp->state) {
	case RQ_EVPSTATE_PENDING:
		return &rq->q_pending;

	case RQ_EVPSTATE_SCHEDULED:
		if (evp->flags & RQ_ENVELOPE_EXPIRED)
			return &rq->q_expired;
		if (evp->flags & RQ_ENVELOPE_REMOVED)
			return &rq->q_removed;
		if (evp->flags & RQ_ENVELOPE_UPDATE)
			return &rq->q_update;
		if (evp->type == D_MTA)
			return &rq->q_mta;
		if (evp->type == D_MDA)
			return &rq->q_mda;
		if (evp->type == D_BOUNCE)
			return &rq->q_bounce;
		fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);

	case RQ_EVPSTATE_INFLIGHT:
		return &rq->q_inflight;

	case RQ_EVPSTATE_HELD:
		return (NULL);
	}

	fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state);
	return (NULL);
}

static void
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
	struct rq_holdq	*hq;
	struct evplist	*q = NULL;

	switch (evp->type) {
	case D_MTA:
		q = &rq->q_mta;
		break;
	case D_MDA:
		q = &rq->q_mda;
		break;
	case D_BOUNCE:
		q = &rq->q_bounce;
		break;
	}

	if (evp->flags & RQ_ENVELOPE_UPDATE)
		q = &rq->q_update;

	if (evp->state == RQ_EVPSTATE_HELD) {
		hq = tree_xget(&holdqs[evp->type], evp->holdq);
		TAILQ_REMOVE(&hq->q, evp, entry);
		hq->count -= 1;
		if (TAILQ_EMPTY(&hq->q)) {
			tree_xpop(&holdqs[evp->type], evp->holdq);
			free(hq);
		}
		evp->holdq = 0;
		stat_decrement("scheduler.ramqueue.hold", 1);
	}
	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
		TAILQ_REMOVE(&rq->q_pending, evp, entry);
		SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
	}

	TAILQ_INSERT_TAIL(q, evp, entry);
	evp->state = RQ_EVPSTATE_SCHEDULED;
	evp->t_scheduled = currtime;
}

static int
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
	struct rq_holdq	*hq;
	struct evplist	*evl;

	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
		return (0);
	/*
	 * If envelope is inflight, mark it envelope for removal.
	 */
	if (evp->state == RQ_EVPSTATE_INFLIGHT) {
		evp->flags |= RQ_ENVELOPE_REMOVED;
		return (1);
	}

	if (evp->state == RQ_EVPSTATE_HELD) {
		hq = tree_xget(&holdqs[evp->type], evp->holdq);
		TAILQ_REMOVE(&hq->q, evp, entry);
		hq->count -= 1;
		if (TAILQ_EMPTY(&hq->q)) {
			tree_xpop(&holdqs[evp->type], evp->holdq);
			free(hq);
		}
		evp->holdq = 0;
		stat_decrement("scheduler.ramqueue.hold", 1);
	}
	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
		evl = rq_envelope_list(rq, evp);
		TAILQ_REMOVE(evl, evp, entry);
		if (evl == &rq->q_pending)
			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
	}

	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
	evp->state = RQ_EVPSTATE_SCHEDULED;
	evp->flags |= RQ_ENVELOPE_REMOVED;
	evp->t_scheduled = currtime;

	return (1);
}

static int
rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
{
	struct rq_holdq	*hq;
	struct evplist	*evl;

	if (evp->flags & RQ_ENVELOPE_SUSPEND)
		return (0);

	if (evp->state == RQ_EVPSTATE_HELD) {
		hq = tree_xget(&holdqs[evp->type], evp->holdq);
		TAILQ_REMOVE(&hq->q, evp, entry);
		hq->count -= 1;
		if (TAILQ_EMPTY(&hq->q)) {
			tree_xpop(&holdqs[evp->type], evp->holdq);
			free(hq);
		}
		evp->holdq = 0;
		evp->state = RQ_EVPSTATE_PENDING;
		stat_decrement("scheduler.ramqueue.hold", 1);
	}
	else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
		evl = rq_envelope_list(rq, evp);
		TAILQ_REMOVE(evl, evp, entry);
		if (evl == &rq->q_pending)
			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
	}

	evp->flags |= RQ_ENVELOPE_SUSPEND;

	return (1);
}

static int
rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
{
	struct evplist	*evl;

	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
		return (0);

	if (evp->state != RQ_EVPSTATE_INFLIGHT) {
		evl = rq_envelope_list(rq, evp);
		if (evl == &rq->q_pending)
			sorted_insert(rq, evp);
		else
			TAILQ_INSERT_TAIL(evl, evp, entry);
	}

	evp->flags &= ~RQ_ENVELOPE_SUSPEND;

	return (1);
}

static void
rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
{
	tree_xpop(&evp->message->envelopes, evp->evpid);
	if (tree_empty(&evp->message->envelopes)) {
		tree_xpop(&rq->messages, evp->message->msgid);
		free(evp->message);
		stat_decrement("scheduler.ramqueue.message", 1);
	}

	free(evp);
	rq->evpcount--;
	stat_decrement("scheduler.ramqueue.envelope", 1);
}

static const char *
rq_envelope_to_text(struct rq_envelope *e)
{
	static char	buf[256];
	char		t[64];

	(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);

	if (e->type == D_BOUNCE)
		(void)strlcat(buf, "bounce", sizeof buf);
	else if (e->type == D_MDA)
		(void)strlcat(buf, "mda", sizeof buf);
	else if (e->type == D_MTA)
		(void)strlcat(buf, "mta", sizeof buf);

	(void)snprintf(t, sizeof t, ",expire=%s",
	    duration_to_text(e->expire - currtime));
	(void)strlcat(buf, t, sizeof buf);


	switch (e->state) {
	case RQ_EVPSTATE_PENDING:
		(void)snprintf(t, sizeof t, ",pending=%s",
		    duration_to_text(e->sched - currtime));
		(void)strlcat(buf, t, sizeof buf);
		break;

	case RQ_EVPSTATE_SCHEDULED:
		(void)snprintf(t, sizeof t, ",scheduled=%s",
		    duration_to_text(currtime - e->t_scheduled));
		(void)strlcat(buf, t, sizeof buf);
		break;

	case RQ_EVPSTATE_INFLIGHT:
		(void)snprintf(t, sizeof t, ",inflight=%s",
		    duration_to_text(currtime - e->t_inflight));
		(void)strlcat(buf, t, sizeof buf);
		break;

	case RQ_EVPSTATE_HELD:
		(void)snprintf(t, sizeof t, ",held=%s",
		    duration_to_text(currtime - e->t_inflight));
		(void)strlcat(buf, t, sizeof buf);
		break;
	default:
		fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state);
	}

	if (e->flags & RQ_ENVELOPE_REMOVED)
		(void)strlcat(buf, ",removed", sizeof buf);
	if (e->flags & RQ_ENVELOPE_EXPIRED)
		(void)strlcat(buf, ",expired", sizeof buf);
	if (e->flags & RQ_ENVELOPE_SUSPEND)
		(void)strlcat(buf, ",suspended", sizeof buf);

	(void)strlcat(buf, "]", sizeof buf);

	return (buf);
}

static void
rq_queue_dump(struct rq_queue *rq, const char * name)
{
	struct rq_message	*message;
	struct rq_envelope	*envelope;
	void			*i, *j;
	uint64_t		 id;

	log_debug("debug: /--- ramqueue: %s", name);

	i = NULL;
	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
		log_debug("debug: | msg:%08" PRIx32, message->msgid);
		j = NULL;
		while ((tree_iter(&message->envelopes, &j, &id,
		    (void*)&envelope)))
			log_debug("debug: |   %s",
			    rq_envelope_to_text(envelope));
	}
	log_debug("debug: \\---");
}

static int
rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
{
	time_t	ref1, ref2;

	ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
	ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
	if (ref1 != ref2)
		return (ref1 < ref2) ? -1 : 1;

	if (e1->evpid != e2->evpid)
		return (e1->evpid < e2->evpid) ? -1 : 1;

	return 0;
}

SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);