libacfutils
A general purpose library of utility functions designed to make it easier to develop addons for the X-Plane flight simulator.
Loading...
Searching...
No Matches
taskq.c
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license in the file COPYING
10 * or http://www.opensource.org/licenses/CDDL-1.0.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file COPYING.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22/*
23 * Copyright 2020 Saso Kiselkov. All rights reserved.
24 */
25
26#include <stddef.h>
27
28#include "acfutils/assert.h"
29#include "acfutils/list.h"
30#include "acfutils/safe_alloc.h"
31#include "acfutils/taskq.h"
32#include "acfutils/thread.h"
33#include "acfutils/time.h"
34
35typedef struct {
36 void *task;
37 list_node_t node;
39
40typedef struct {
41 taskq_t *tq;
42 thread_t thr;
43 void *thr_info;
44 list_node_t node;
46
47struct taskq_s {
48 /* immutable */
49 taskq_init_thr_t init_func;
50 taskq_fini_thr_t fini_func;
51 taskq_proc_task_t proc_func;
52 taskq_discard_task_t discard_func;
53 void *userinfo;
54
55 mutex_t lock;
56 /* protected by lock */
57 unsigned num_threads_min;
58 unsigned num_threads_max;
59 uint64_t thr_stop_delay_us;
60 condvar_t cv;
61 bool shutdown;
62 list_t tasks;
63 list_t threads;
64 unsigned num_thr_ready;
65};
66
67static bool
68task_wait_for_work(taskq_t *tq)
69{
70 ASSERT(tq != NULL);
71
72 if (tq->thr_stop_delay_us != 0) {
73 uint64_t limit = microclock() + tq->thr_stop_delay_us;
74 return (cv_timedwait(&tq->cv, &tq->lock, limit) != ETIMEDOUT);
75 } else {
76 cv_wait(&tq->cv, &tq->lock);
77 return (true);
78 }
79}
80
81static void
82taskq_worker(void *info)
83{
84 taskq_thr_t *thr;
85 taskq_t *tq;
86
87 ASSERT(info != NULL);
88 thr = info;
89 ASSERT(thr->tq != NULL);
90 tq = thr->tq;
91 ASSERT(tq->proc_func != NULL);
92
93 if (tq->init_func != NULL)
94 thr->thr_info = tq->init_func(tq->userinfo);
95
96 mutex_enter(&tq->lock);
97 tq->num_thr_ready++;
98 while (!tq->shutdown) {
99 taskq_task_t *task;
100
101 /* Too many threads spawned? Stop. */
102 if (list_count(&tq->threads) > tq->num_threads_max)
103 break;
104 task = list_remove_head(&tq->tasks);
105 if (task == NULL) {
106 /* No work to be done */
107 if (task_wait_for_work(tq) ||
108 list_count(&tq->threads) <= tq->num_threads_min) {
109 continue;
110 } else {
111 break;
112 }
113 }
114 tq->num_thr_ready--;
115 mutex_exit(&tq->lock);
116
117 /* Process the task */
118 tq->proc_func(tq->userinfo, thr->thr_info, task->task);
119 free(task);
120
121 mutex_enter(&tq->lock);
122 tq->num_thr_ready++;
123 }
124 ASSERT(tq->num_thr_ready != 0);
125 tq->num_thr_ready--;
126 /*
127 * Cannot relinquish the lock here until we are completely removed
128 * from the tq->threads list, otherwise taskq_submit might thing we
129 * were just busy and we might still process work. But we are
130 * definitely on our way out.
131 */
132 if (tq->fini_func != NULL)
133 tq->fini_func(tq->userinfo, thr->thr_info);
134
135 ASSERT(list_link_active(&thr->node));
136 list_remove(&tq->threads, thr);
137 if (list_count(&tq->threads) == 0)
138 cv_broadcast(&tq->cv);
139 /*
140 * Mustn't touch `tq' after this, as on a taskq_free, it can become
141 * freed after releasing this lock
142 */
143 mutex_exit(&tq->lock);
144
145 memset(thr, 0, sizeof (*thr));
146 free(thr);
147}
148
149taskq_t *
150taskq_alloc(unsigned num_threads_min, unsigned num_threads_max,
151 uint64_t thr_stop_delay_us, taskq_init_thr_t init_func,
152 taskq_fini_thr_t fini_func, taskq_proc_task_t proc_func,
153 taskq_discard_task_t discard_func, void *userinfo)
154{
155 taskq_t *tq = safe_calloc(1, sizeof (*tq));
156
157 ASSERT3U(num_threads_min, <=, num_threads_max);
158 ASSERT(num_threads_max != 0);
159 ASSERT(proc_func != NULL);
160 ASSERT(discard_func != NULL);
161
162 mutex_init(&tq->lock);
163 cv_init(&tq->cv);
164 list_create(&tq->tasks, sizeof (taskq_task_t),
165 offsetof(taskq_task_t, node));
166 list_create(&tq->threads, sizeof (taskq_thr_t),
167 offsetof(taskq_thr_t, node));
168
169 tq->num_threads_min = num_threads_min;
170 tq->num_threads_max = num_threads_max;
171 tq->thr_stop_delay_us = thr_stop_delay_us;
172 tq->init_func = init_func;
173 tq->fini_func = fini_func;
174 tq->proc_func = proc_func;
175 tq->discard_func = discard_func;
176 tq->userinfo = userinfo;
177
178 return (tq);
179}
180
181void
182taskq_free(taskq_t *tq)
183{
184 taskq_task_t *task;
185
186 ASSERT(tq != NULL);
187 ASSERT(tq->discard_func != NULL);
188 /*
189 * Notify all parked workers to stop.
190 */
191 mutex_enter(&tq->lock);
192 tq->shutdown = true;
193 /* The worker threads will empty out the `tq->threads' list */
194 while (list_count(&tq->threads) != 0) {
195 cv_broadcast(&tq->cv);
196 cv_wait(&tq->cv, &tq->lock);
197 }
198 mutex_exit(&tq->lock);
199 ASSERT0(list_count(&tq->threads));
200 list_destroy(&tq->threads);
201 /*
202 * Discard incomplete work.
203 */
204 while ((task = list_remove_head(&tq->tasks)) != NULL) {
205 tq->discard_func(tq->userinfo, task->task);
206 free(task);
207 }
208 list_destroy(&tq->tasks);
209 /*
210 * Destroy threading primitives.
211 */
212 cv_destroy(&tq->cv);
213 mutex_destroy(&tq->lock);
214
215 free(tq);
216}
217
218void
219taskq_submit(taskq_t *tq, void *task)
220{
221 taskq_task_t *t = safe_calloc(1, sizeof (*t));
222
223 ASSERT(tq != NULL);
224 t->task = task;
225
226 mutex_enter(&tq->lock);
227 list_insert_tail(&tq->tasks, t);
228 if (tq->num_thr_ready != 0) {
229 /* Only wake up a single worker */
230 cv_signal(&tq->cv);
231 } else if (list_count(&tq->threads) < tq->num_threads_max) {
232 /* No worker ready and we can still add more, spawn a new one */
233 taskq_thr_t *thr = safe_calloc(1, sizeof (*thr));
234 thr->tq = tq;
235 list_insert_tail(&tq->threads, thr);
236 VERIFY(thread_create(&thr->thr, taskq_worker, thr));
237 }
238 mutex_exit(&tq->lock);
239}
240
241bool
242taskq_wants_shutdown(taskq_t *tq)
243{
244 bool shutdown;
245
246 ASSERT(tq != NULL);
247 mutex_enter(&tq->lock);
248 shutdown = tq->shutdown;
249 mutex_exit(&tq->lock);
250
251 return (shutdown);
252}
253
254void
255taskq_set_num_threads_min(taskq_t *tq, unsigned num_threads_min)
256{
257 ASSERT(tq != NULL);
258 if (tq->num_threads_min != num_threads_min) {
259 mutex_enter(&tq->lock);
260 tq->num_threads_min = num_threads_min;
261 cv_broadcast(&tq->cv); /* wake up all workers to re-adjust */
262 mutex_exit(&tq->lock);
263 }
264}
265
266unsigned
267taskq_get_num_threads_min(const taskq_t *tq)
268{
269 ASSERT(tq != NULL);
270 return (tq->num_threads_min);
271}
272
273void
274taskq_set_num_threads_max(taskq_t *tq, unsigned num_threads_max)
275{
276 ASSERT(tq != NULL);
277 if (tq->num_threads_max != num_threads_max) {
278 mutex_enter(&tq->lock);
279 tq->num_threads_max = num_threads_max;
280 cv_broadcast(&tq->cv); /* wake up all workers to re-adjust */
281 mutex_exit(&tq->lock);
282 }
283}
284
285unsigned
286taskq_get_num_threads_max(const taskq_t *tq)
287{
288 ASSERT(tq != NULL);
289 return (tq->num_threads_max);
290}
291
292void
293taskq_set_thr_stop_delay(taskq_t *tq, uint64_t thr_stop_delay_us)
294{
295 ASSERT(tq != NULL);
296 if (tq->thr_stop_delay_us != thr_stop_delay_us) {
297 mutex_enter(&tq->lock);
298 tq->thr_stop_delay_us = thr_stop_delay_us;
299 cv_broadcast(&tq->cv); /* wake up all workers to re-adjust */
300 mutex_exit(&tq->lock);
301 }
302}
303
304uint64_t
305taskq_get_thr_stop_delay(const taskq_t *tq)
306{
307 ASSERT(tq != NULL);
308 return (tq->thr_stop_delay_us);
309}
#define VERIFY(x)
Definition assert.h:78
#define ASSERT3U(x, op, y)
Definition assert.h:210
#define ASSERT(x)
Definition assert.h:208
#define ASSERT0(x)
Definition assert.h:213
int list_link_active(const list_node_t *)
Definition list.c:525
void list_destroy(list_t *)
Definition list.c:136
void list_create(list_t *, size_t, size_t)
Definition list.c:113
size_t list_count(const list_t *)
Definition list.c:543
void list_remove(list_t *, void *)
Definition list.c:226
void * list_remove_head(list_t *)
Definition list.c:251
void list_insert_tail(list_t *, void *)
Definition list.c:213
static void * safe_calloc(size_t nmemb, size_t size)
Definition safe_alloc.h:71
CONDITION_VARIABLE condvar_t
Definition thread.h:465
static void cv_destroy(condvar_t *cv)
Definition thread.h:936
static void mutex_destroy(mutex_t *mtx)
Definition thread.h:499
static void mutex_enter(mutex_t *mtx)
Definition thread.h:530
static void cv_init(condvar_t *cv)
Definition thread.h:926
static void mutex_exit(mutex_t *mtx)
Definition thread.h:556
HANDLE thread_t
Definition thread.h:393
static void cv_wait(condvar_t *cv, mutex_t *mtx)
Definition thread.h:868
static void cv_signal(condvar_t *cv)
Definition thread.h:948
static int cv_timedwait(condvar_t *cv, mutex_t *mtx, uint64_t limit)
Definition thread.h:898
static void mutex_init(mutex_t *mtx)
Definition thread.h:488
static void cv_broadcast(condvar_t *cv)
Definition thread.h:960
#define thread_create(thrp, start_proc, arg)
Definition thread.h:189