libacfutils
A general purpose library of utility functions designed to make it easier to develop addons for the X-Plane flight simulator.
Loading...
Searching...
No Matches
worker.c
1/*
2 * CDDL HEADER START
3 *
4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
7 * 1.0 of the CDDL.
8 *
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
12 *
13 * CDDL HEADER END
14 */
15/*
16 * Copyright 2018 Saso Kiselkov. All rights reserved.
17 */
18
19#include <string.h>
20
21#include <acfutils/assert.h>
22#include <acfutils/helpers.h>
23#include <acfutils/worker.h>
24#include <acfutils/time.h>
25
26#if !IBM
27#include <signal.h>
28#endif
29
34void
36{
37#if !IBM
38 sigset_t set;
39
40 pthread_sigmask(SIG_BLOCK, NULL, &set);
41 if (!sigismember(&set, SIGPIPE)) {
42 sigaddset(&set, SIGPIPE);
43 pthread_sigmask(SIG_BLOCK, &set, NULL);
44 }
45#endif /* !IBM */
46}
47
48static void
49worker(void *ui)
50{
51 worker_t *wk = ui;
52 uint64_t now = microclock();
53#if IBM
54 TIMECAPS tc;
55#endif
56 thread_set_name(wk->name);
57 /*
58 * SIGPIPE is almost never desired on worker threads (typically due
59 * to writing to broken network sockets).
60 */
62
63 if (wk->init_func != NULL) {
64 if (!wk->init_func(wk->userinfo))
65 return;
66 }
67#if IBM
68 /*
69 * On Windows, the sleep interval is constrained to a multiple of
70 * the default clock tick. This is often something really crude,
71 * like 20ms or so. That is way too crude for our needs, so we'll
72 * drop the time tick to the minimum the hardware can support
73 * (usually 1ms).
74 */
75 timeGetDevCaps(&tc, sizeof (tc));
76 timeBeginPeriod(tc.wPeriodMin);
77#endif /* IBM */
78
79 mutex_enter(&wk->lock);
80 while (wk->run) {
81 bool_t result;
82 uint64_t intval_us = wk->intval_us;
83
84 if (intval_us == 0 && !wk->dontstop) {
85 cv_wait(&wk->cv, &wk->lock);
86 if (!wk->run)
87 break;
88 now = microclock();
89 }
90 /*
91 * Avoid holding the worker lock in the user callback, as
92 * that can result in locking inversions if the worker needs
93 * to grab locks, while external threads might be holding
94 * those locks and trying to wake us up.
95 */
96 wk->inside_cb = B_TRUE;
97 mutex_exit(&wk->lock);
98 result = wk->worker_func(wk->userinfo);
99 mutex_enter(&wk->lock);
100 wk->inside_cb = B_FALSE;
101 if (!result)
102 break;
103 /*
104 * If another thread is waiting on us to finish executing,
105 * signal it.
106 */
107 cv_broadcast(&wk->cv);
108
109 if (intval_us != 0 && !wk->dontstop) {
110 uint64_t new_now;
111
112 cv_timedwait(&wk->cv, &wk->lock, now + intval_us);
113 /*
114 * If the timeout expired, we have waited for the
115 * full duration. So jump our idea of current time
116 * forward in increments of the interval. This
117 * maintains a fixed execution schedule, but allows
118 * for skipped intervals in case the callback took
119 * too long to execute.
120 */
121 new_now = microclock();
122 if (new_now >= now + intval_us) {
123 uint64_t d_t = new_now - now;
124 now += (d_t / intval_us) * intval_us;
125 }
126 }
127 wk->dontstop = B_FALSE;
128 }
129 mutex_exit(&wk->lock);
130
131 if (wk->fini_func != NULL)
132 wk->fini_func(wk->userinfo);
133
134#if IBM
135 timeEndPeriod(tc.wPeriodMin);
136#endif /* IBM */
137}
138
139void
140worker_init(worker_t *wk, bool_t (*worker_func)(void *userinfo),
141 uint64_t intval_us, void *userinfo, const char *thread_name)
142{
143 worker_init2(wk, NULL, worker_func, NULL, intval_us, userinfo,
144 thread_name);
145}
146
147API_EXPORT void
148worker_init2(worker_t *wk,
149 bool_t (*init_func)(void *userinfo),
150 bool_t (*worker_func)(void *userinfo),
151 void (*fini_func)(void *userinfo),
152 uint64_t intval_us, void *userinfo, const char *thread_name)
153{
154 ASSERT(worker_func != NULL);
155
156 wk->run = B_TRUE;
157 mutex_init(&wk->lock);
158 cv_init(&wk->cv);
159 wk->init_func = init_func;
160 wk->worker_func = worker_func;
161 wk->fini_func = fini_func;
162 wk->intval_us = intval_us;
163 wk->userinfo = userinfo;
164 if (thread_name != NULL)
165 lacf_strlcpy(wk->name, thread_name, sizeof (wk->name));
166 else
167 memset(wk->name, 0, sizeof (wk->name));
168 VERIFY(thread_create(&wk->thread, worker, wk));
169}
170
171void
172worker_fini(worker_t *wk)
173{
174 if (!wk->run)
175 return;
176
177 /*
178 * Stop the thread before grabbing the lock, to shut it down
179 * while it is executing its callback.
180 */
181 wk->run = B_FALSE;
182
183 mutex_enter(&wk->lock);
184 cv_broadcast(&wk->cv);
185 mutex_exit(&wk->lock);
186
187 thread_join(&wk->thread);
188
189 mutex_destroy(&wk->lock);
190 cv_destroy(&wk->cv);
191}
192
193void
194worker_set_interval(worker_t *wk, uint64_t intval_us)
195{
196 mutex_enter(&wk->lock);
197 /* If the worker is in the callback, wait for it to exit first */
198 while (wk->inside_cb)
199 cv_wait(&wk->cv, &wk->lock);
200 if (wk->intval_us != intval_us) {
201 wk->intval_us = intval_us;
202 cv_broadcast(&wk->cv);
203 }
204 mutex_exit(&wk->lock);
205}
206
207/*
208 * Same as worker_set_interval, but doesn't cause the worker to
209 * immediately wake up and run another loop.
210 */
211void
212worker_set_interval_nowake(worker_t *wk, uint64_t intval_us)
213{
214 mutex_enter(&wk->lock);
215 wk->intval_us = intval_us;
216 mutex_exit(&wk->lock);
217}
218
219void
220worker_wake_up(worker_t *wk)
221{
222 mutex_enter(&wk->lock);
223 wk->dontstop = B_TRUE;
224 cv_broadcast(&wk->cv);
225 mutex_exit(&wk->lock);
226}
227
228void
229worker_wake_up_wait(worker_t *wk)
230{
231 mutex_enter(&wk->lock);
232 /* If the worker is in the callback, wait for it to exit first */
233 while (wk->inside_cb)
234 cv_wait(&wk->cv, &wk->lock);
235 /* Now we are certain the worker is sleeping, wake it up again */
236 cv_broadcast(&wk->cv);
237 /* And then wait for it to finish */
238 cv_wait(&wk->cv, &wk->lock);
239 mutex_exit(&wk->lock);
240}
#define VERIFY(x)
Definition assert.h:78
#define ASSERT(x)
Definition assert.h:208
void lacf_strlcpy(char *dest, const char *src, size_t cap)
Definition helpers.c:1667
static void cv_destroy(condvar_t *cv)
Definition thread.h:936
void lacf_mask_sigpipe(void)
Definition worker.c:35
static void thread_set_name(const char *name)
Definition thread.h:852
static void mutex_destroy(mutex_t *mtx)
Definition thread.h:499
static void thread_join(thread_t *thrp)
Definition thread.h:836
static void mutex_enter(mutex_t *mtx)
Definition thread.h:530
static void cv_init(condvar_t *cv)
Definition thread.h:926
static void mutex_exit(mutex_t *mtx)
Definition thread.h:556
static void cv_wait(condvar_t *cv, mutex_t *mtx)
Definition thread.h:868
static int cv_timedwait(condvar_t *cv, mutex_t *mtx, uint64_t limit)
Definition thread.h:898
static void mutex_init(mutex_t *mtx)
Definition thread.h:488
static void cv_broadcast(condvar_t *cv)
Definition thread.h:960
#define thread_create(thrp, start_proc, arg)
Definition thread.h:189