31#include "acfutils/taskq.h"
33#include "acfutils/time.h"
49 taskq_init_thr_t init_func;
50 taskq_fini_thr_t fini_func;
51 taskq_proc_task_t proc_func;
52 taskq_discard_task_t discard_func;
57 unsigned num_threads_min;
58 unsigned num_threads_max;
59 uint64_t thr_stop_delay_us;
64 unsigned num_thr_ready;
68task_wait_for_work(taskq_t *tq)
72 if (tq->thr_stop_delay_us != 0) {
73 uint64_t limit = microclock() + tq->thr_stop_delay_us;
74 return (
cv_timedwait(&tq->cv, &tq->lock, limit) != ETIMEDOUT);
82taskq_worker(
void *info)
91 ASSERT(tq->proc_func != NULL);
93 if (tq->init_func != NULL)
94 thr->thr_info = tq->init_func(tq->userinfo);
98 while (!tq->shutdown) {
102 if (
list_count(&tq->threads) > tq->num_threads_max)
107 if (task_wait_for_work(tq) ||
108 list_count(&tq->threads) <= tq->num_threads_min) {
118 tq->proc_func(tq->userinfo, thr->thr_info, task->task);
124 ASSERT(tq->num_thr_ready != 0);
132 if (tq->fini_func != NULL)
133 tq->fini_func(tq->userinfo, thr->thr_info);
145 memset(thr, 0,
sizeof (*thr));
150taskq_alloc(
unsigned num_threads_min,
unsigned num_threads_max,
151 uint64_t thr_stop_delay_us, taskq_init_thr_t init_func,
152 taskq_fini_thr_t fini_func, taskq_proc_task_t proc_func,
153 taskq_discard_task_t discard_func,
void *userinfo)
157 ASSERT3U(num_threads_min, <=, num_threads_max);
158 ASSERT(num_threads_max != 0);
159 ASSERT(proc_func != NULL);
160 ASSERT(discard_func != NULL);
169 tq->num_threads_min = num_threads_min;
170 tq->num_threads_max = num_threads_max;
171 tq->thr_stop_delay_us = thr_stop_delay_us;
172 tq->init_func = init_func;
173 tq->fini_func = fini_func;
174 tq->proc_func = proc_func;
175 tq->discard_func = discard_func;
176 tq->userinfo = userinfo;
182taskq_free(taskq_t *tq)
187 ASSERT(tq->discard_func != NULL);
205 tq->discard_func(tq->userinfo, task->task);
219taskq_submit(taskq_t *tq,
void *task)
228 if (tq->num_thr_ready != 0) {
231 }
else if (
list_count(&tq->threads) < tq->num_threads_max) {
242taskq_wants_shutdown(taskq_t *tq)
248 shutdown = tq->shutdown;
255taskq_set_num_threads_min(taskq_t *tq,
unsigned num_threads_min)
258 if (tq->num_threads_min != num_threads_min) {
260 tq->num_threads_min = num_threads_min;
267taskq_get_num_threads_min(
const taskq_t *tq)
270 return (tq->num_threads_min);
274taskq_set_num_threads_max(taskq_t *tq,
unsigned num_threads_max)
277 if (tq->num_threads_max != num_threads_max) {
279 tq->num_threads_max = num_threads_max;
286taskq_get_num_threads_max(
const taskq_t *tq)
289 return (tq->num_threads_max);
293taskq_set_thr_stop_delay(taskq_t *tq, uint64_t thr_stop_delay_us)
296 if (tq->thr_stop_delay_us != thr_stop_delay_us) {
298 tq->thr_stop_delay_us = thr_stop_delay_us;
305taskq_get_thr_stop_delay(
const taskq_t *tq)
308 return (tq->thr_stop_delay_us);
#define ASSERT3U(x, op, y)
int list_link_active(const list_node_t *)
void list_destroy(list_t *)
void list_create(list_t *, size_t, size_t)
size_t list_count(const list_t *)
void list_remove(list_t *, void *)
void * list_remove_head(list_t *)
void list_insert_tail(list_t *, void *)
static void * safe_calloc(size_t nmemb, size_t size)
CONDITION_VARIABLE condvar_t
static void cv_destroy(condvar_t *cv)
static void mutex_destroy(mutex_t *mtx)
static void mutex_enter(mutex_t *mtx)
static void cv_init(condvar_t *cv)
static void mutex_exit(mutex_t *mtx)
static void cv_wait(condvar_t *cv, mutex_t *mtx)
static void cv_signal(condvar_t *cv)
static int cv_timedwait(condvar_t *cv, mutex_t *mtx, uint64_t limit)
static void mutex_init(mutex_t *mtx)
static void cv_broadcast(condvar_t *cv)
#define thread_create(thrp, start_proc, arg)