[BACK]Return to kern_task.c CVS log [TXT][DIR] Up to [local] / src / sys / kern

File: [local] / src / sys / kern / kern_task.c (download)

Revision 1.35, Tue May 14 08:26:13 2024 UTC (2 weeks, 5 days ago) by jsg
Branch: MAIN
CVS Tags: HEAD
Changes since 1.34: +1 -3 lines

remove prototypes with no matching function

/*	$OpenBSD: kern_task.c,v 1.35 2024/05/14 08:26:13 jsg Exp $ */

/*
 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <sys/param.h>
#include <sys/systm.h>
#include <sys/malloc.h>
#include <sys/mutex.h>
#include <sys/kthread.h>
#include <sys/task.h>
#include <sys/proc.h>
#include <sys/witness.h>

#include "kcov.h"
#if NKCOV > 0
#include <sys/kcov.h>
#endif

#ifdef WITNESS

static struct lock_type taskq_lock_type = {
	.lt_name = "taskq"
};

#define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
    (LO_CLASS_RWLOCK << LO_CLASSSHIFT)

#endif /* WITNESS */

struct taskq_thread {
	SLIST_ENTRY(taskq_thread)
				 tt_entry;
	struct proc		*tt_thread;
};
SLIST_HEAD(taskq_threads, taskq_thread);

struct taskq {
	enum {
		TQ_S_CREATED,
		TQ_S_RUNNING,
		TQ_S_DESTROYED
	}			 tq_state;
	unsigned int		 tq_running;
	unsigned int		 tq_nthreads;
	unsigned int		 tq_flags;
	const char		*tq_name;

	struct mutex		 tq_mtx;
	struct task_list	 tq_worklist;

	struct taskq_threads	 tq_threads;
	unsigned int		 tq_barriers;
	unsigned int		 tq_bgen;
	unsigned int		 tq_bthreads;

#ifdef WITNESS
	struct lock_object	 tq_lock_object;
#endif
};

static const char taskq_sys_name[] = "systq";

struct taskq taskq_sys = {
	.tq_state	= TQ_S_CREATED,
	.tq_running	= 0,
	.tq_nthreads	= 1,
	.tq_flags	= 0,
	.tq_name	= taskq_sys_name,
	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
			      taskq_sys_name, 0),
	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),

	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
	.tq_barriers	= 0,
	.tq_bgen	= 0,
	.tq_bthreads	= 0,

#ifdef WITNESS
	.tq_lock_object	= {
		.lo_name	= taskq_sys_name,
		.lo_flags	= TASKQ_LOCK_FLAGS,
	},
#endif
};

static const char taskq_sys_mp_name[] = "systqmp";

struct taskq taskq_sys_mp = {
	.tq_state	= TQ_S_CREATED,
	.tq_running	= 0,
	.tq_nthreads	= 1,
	.tq_flags	= TASKQ_MPSAFE,
	.tq_name	= taskq_sys_mp_name,
	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
			      taskq_sys_mp_name, 0),
	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),

	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
	.tq_barriers	= 0,
	.tq_bgen	= 0,
	.tq_bthreads	= 0,

#ifdef WITNESS
	.tq_lock_object = {
		.lo_name	= taskq_sys_mp_name,
		.lo_flags	= TASKQ_LOCK_FLAGS,
	},
#endif
};

struct taskq *const systq = &taskq_sys;
struct taskq *const systqmp = &taskq_sys_mp;

void	taskq_init(void); /* called in init_main.c */
void	taskq_create_thread(void *);
void	taskq_barrier_task(void *);
int	taskq_next_work(struct taskq *, struct task *);
void	taskq_thread(void *);

void
taskq_init(void)
{
	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
	kthread_create_deferred(taskq_create_thread, systq);
	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
	kthread_create_deferred(taskq_create_thread, systqmp);
}

struct taskq *
taskq_create(const char *name, unsigned int nthreads, int ipl,
    unsigned int flags)
{
	struct taskq *tq;

	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
	if (tq == NULL)
		return (NULL);

	tq->tq_state = TQ_S_CREATED;
	tq->tq_running = 0;
	tq->tq_nthreads = nthreads;
	tq->tq_name = name;
	tq->tq_flags = flags;

	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
	TAILQ_INIT(&tq->tq_worklist);

	SLIST_INIT(&tq->tq_threads);
	tq->tq_barriers = 0;
	tq->tq_bgen = 0;
	tq->tq_bthreads = 0;

#ifdef WITNESS
	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
	tq->tq_lock_object.lo_name = name;
	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
	witness_init(&tq->tq_lock_object, &taskq_lock_type);
#endif

	/* try to create a thread to guarantee that tasks will be serviced */
	kthread_create_deferred(taskq_create_thread, tq);

	return (tq);
}

void
taskq_destroy(struct taskq *tq)
{
	mtx_enter(&tq->tq_mtx);
	switch (tq->tq_state) {
	case TQ_S_CREATED:
		/* tq is still referenced by taskq_create_thread */
		tq->tq_state = TQ_S_DESTROYED;
		mtx_leave(&tq->tq_mtx);
		return;

	case TQ_S_RUNNING:
		tq->tq_state = TQ_S_DESTROYED;
		break;

	default:
		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
	}

	while (tq->tq_running > 0) {
		wakeup(tq);
		msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
		    INFSLP);
	}
	mtx_leave(&tq->tq_mtx);

	free(tq, M_DEVBUF, sizeof(*tq));
}

void
taskq_create_thread(void *arg)
{
	struct taskq *tq = arg;
	int rv;

	mtx_enter(&tq->tq_mtx);

	switch (tq->tq_state) {
	case TQ_S_DESTROYED:
		mtx_leave(&tq->tq_mtx);
		free(tq, M_DEVBUF, sizeof(*tq));
		return;

	case TQ_S_CREATED:
		tq->tq_state = TQ_S_RUNNING;
		break;

	default:
		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
	}

	do {
		tq->tq_running++;
		mtx_leave(&tq->tq_mtx);

		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);

		mtx_enter(&tq->tq_mtx);
		if (rv != 0) {
			printf("unable to create thread for \"%s\" taskq\n",
			    tq->tq_name);

			tq->tq_running--;
			/* could have been destroyed during kthread_create */
			if (tq->tq_state == TQ_S_DESTROYED &&
			    tq->tq_running == 0)
				wakeup_one(&tq->tq_running);
			break;
		}
	} while (tq->tq_running < tq->tq_nthreads);

	mtx_leave(&tq->tq_mtx);
}

void
taskq_barrier_task(void *p)
{
	struct taskq *tq = p;
	unsigned int gen;

	mtx_enter(&tq->tq_mtx);
	tq->tq_bthreads++;
	wakeup(&tq->tq_bthreads);

	gen = tq->tq_bgen;
	do {
		msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
		    PWAIT, "tqbarend", INFSLP);
	} while (gen == tq->tq_bgen);
	mtx_leave(&tq->tq_mtx);
}

static void
taskq_do_barrier(struct taskq *tq)
{
	struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
	struct proc *thread = curproc;
	struct taskq_thread *tt;

	mtx_enter(&tq->tq_mtx);
	tq->tq_barriers++;

	/* is the barrier being run from a task inside the taskq? */
	SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
		if (tt->tt_thread == thread) {
			tq->tq_bthreads++;
			wakeup(&tq->tq_bthreads);
			break;
		}
	}

	while (tq->tq_bthreads < tq->tq_nthreads) {
		/* shove the task into the queue for a worker to pick up */
		SET(t.t_flags, TASK_ONQUEUE);
		TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
		wakeup_one(tq);

		msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
		    PWAIT, "tqbar", INFSLP);

		/*
		 * another thread running a barrier might have
		 * done this work for us.
		 */
		if (ISSET(t.t_flags, TASK_ONQUEUE))
			TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
	}

	if (--tq->tq_barriers == 0) {
		/* we're the last one out */
		tq->tq_bgen++;
		wakeup(&tq->tq_bgen);
		tq->tq_bthreads = 0;
	} else {
		unsigned int gen = tq->tq_bgen;
		do {
			msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
			    PWAIT, "tqbarwait", INFSLP);
		} while (gen == tq->tq_bgen);
	}
	mtx_leave(&tq->tq_mtx);
}

void
taskq_barrier(struct taskq *tq)
{
	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);

	taskq_do_barrier(tq);
}

void
taskq_del_barrier(struct taskq *tq, struct task *t)
{
	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);

	if (task_del(tq, t))
		return;

	taskq_do_barrier(tq);
}

void
task_set(struct task *t, void (*fn)(void *), void *arg)
{
	t->t_func = fn;
	t->t_arg = arg;
	t->t_flags = 0;
}

int
task_add(struct taskq *tq, struct task *w)
{
	int rv = 0;

	if (ISSET(w->t_flags, TASK_ONQUEUE))
		return (0);

	mtx_enter(&tq->tq_mtx);
	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
		rv = 1;
		SET(w->t_flags, TASK_ONQUEUE);
		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
#if NKCOV > 0
		if (!kcov_cold)
			w->t_process = curproc->p_p;
#endif
	}
	mtx_leave(&tq->tq_mtx);

	if (rv)
		wakeup_one(tq);

	return (rv);
}

int
task_del(struct taskq *tq, struct task *w)
{
	int rv = 0;

	if (!ISSET(w->t_flags, TASK_ONQUEUE))
		return (0);

	mtx_enter(&tq->tq_mtx);
	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
		rv = 1;
		CLR(w->t_flags, TASK_ONQUEUE);
		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
	}
	mtx_leave(&tq->tq_mtx);

	return (rv);
}

int
taskq_next_work(struct taskq *tq, struct task *work)
{
	struct task *next;

	mtx_enter(&tq->tq_mtx);
	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
		if (tq->tq_state != TQ_S_RUNNING) {
			mtx_leave(&tq->tq_mtx);
			return (0);
		}

		msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
	}

	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
	CLR(next->t_flags, TASK_ONQUEUE);

	*work = *next; /* copy to caller to avoid races */

	next = TAILQ_FIRST(&tq->tq_worklist);
	mtx_leave(&tq->tq_mtx);

	if (next != NULL && tq->tq_nthreads > 1)
		wakeup_one(tq);

	return (1);
}

void
taskq_thread(void *xtq)
{
	struct taskq_thread self = { .tt_thread = curproc };
	struct taskq *tq = xtq;
	struct task work;
	int last;

	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
		KERNEL_UNLOCK();

	mtx_enter(&tq->tq_mtx);
	SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
	mtx_leave(&tq->tq_mtx);

	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);

	while (taskq_next_work(tq, &work)) {
		WITNESS_LOCK(&tq->tq_lock_object, 0);
#if NKCOV > 0
		kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
#endif
		(*work.t_func)(work.t_arg);
#if NKCOV > 0
		kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
#endif
		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
		sched_pause(yield);
	}

	mtx_enter(&tq->tq_mtx);
	SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
	last = (--tq->tq_running == 0);
	mtx_leave(&tq->tq_mtx);

	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
		KERNEL_LOCK();

	if (last)
		wakeup_one(&tq->tq_running);

	kthread_exit(0);
}