19#ifndef _ACF_UTILS_THREAD_H_
20#define _ACF_UTILS_THREAD_H_
35#if __STDC_VERSION__ >= 201112L && !defined(__STDC_NO_ATOMICS__) && \
36 (!defined(__GNUC__) || __GNUC__ >= 7 || defined(__clang__))
37#define _USE_STDATOMICS
43#include <libkern/OSAtomic.h>
47#include <sys/syscall.h>
172 void (*proc)(
void *);
174 const char *filename;
188#if !APL || !defined(_task_user_)
189#define thread_create(thrp, start_proc, arg) \
190 lacf_thread_create((thrp), (start_proc), (arg), \
191 log_basename(__FILE__), __LINE__)
280#ifdef _USE_STDATOMICS
281#define atomic32_t _Atomic int32_t
282#define atomic_inc_32(x) atomic_fetch_add((x), 1)
283#define atomic_dec_32(x) atomic_fetch_add((x), -1)
284#define atomic_set_32(x, y) atomic_store((x), (y))
285#define atomic_add_32(x, y) atomic_fetch_add((x), (y))
286#define atomic64_t _Atomic int64_t
287#define atomic_inc_64(x) atomic_fetch_add((x), 1)
288#define atomic_dec_64(x) atomic_fetch_add((x), -1)
289#define atomic_set_64(x, y) atomic_store((x), (y))
290#define atomic_add_64(x, y) atomic_fetch_add((x), (y))
292#define atomic32_t volatile LONG
293#define atomic_inc_32(x) InterlockedIncrement((x))
294#define atomic_dec_32(x) InterlockedDecrement((x))
295#define atomic_add_32(x, y) InterlockedAdd((x), (y))
296#define atomic_set_32(x, y) InterlockedExchange((x), (y))
297#define atomic64_t volatile LONG64
298#define atomic_inc_64(x) InterlockedIncrement64((x))
299#define atomic_dec_64(x) InterlockedDecrement64((x))
300#define atomic_add_64(x, y) InterlockedAdd64((x), (y))
301#define atomic_set_64(x, y) InterlockedExchange64((x), (y))
303#define atomic32_t volatile int32_t
304#define atomic_inc_32(x) OSAtomicAdd32(1, (x))
305#define atomic_dec_32(x) OSAtomicAdd32(-1, (x))
306#define atomic_add_32(x, y) OSAtomicAdd32((y), (x))
307#define atomic_set_32(x, y) (x) = (y)
308#define atomic64_t volatile int64_t
309#define atomic_inc_64(x) OSAtomicAdd64(1, (x))
310#define atomic_dec_64(x) OSAtomicAdd64(-1, (x))
311#define atomic_add_64(x, y) OSAtomicAdd64((y), (x))
312#define atomic_set_64(x, y) (x) = (y)
314#define atomic32_t volatile int32_t
315#define atomic_inc_32(x) __sync_add_and_fetch((x), 1)
316#define atomic_dec_32(x) __sync_add_and_fetch((x), -1)
317#define atomic_add_32(x, y) __sync_add_and_fetch((x), (y))
318#define atomic_set_32(x, y) __atomic_store_n((x), (y), __ATOMIC_RELAXED)
319#define atomic64_t volatile int64_t
320#define atomic_inc_64(x) __sync_add_and_fetch((x), 1)
321#define atomic_dec_64(x) __sync_add_and_fetch((x), -1)
322#define atomic_add_64(x, y) __sync_add_and_fetch((x), (y))
323#define atomic_set_64(x, y) __atomic_store_n((x), (y), __ATOMIC_RELAXED)
329#define thread_t pthread_t
330#define thread_id_t pthread_t
331#define mutex_t pthread_mutex_t
332#define condvar_t pthread_cond_t
336typedef pthread_mutex_t
mutex_t;
339#define curthread_id pthread_self()
340#define curthread pthread_self()
345 return (pthread_equal(t1, t2));
351 pthread_mutexattr_t attr;
352 pthread_mutexattr_init(&attr);
353 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
354 pthread_mutex_init(mtx, &attr);
360 pthread_mutex_destroy(mtx);
366 pthread_mutex_lock(mtx);
372 pthread_mutex_unlock(mtx);
376#define VERIFY_MUTEX_HELD(mtx) \
377 VERIFY((mtx)->__data.__owner == syscall(SYS_gettid))
378#define VERIFY_MUTEX_NOT_HELD(mtx) \
379 VERIFY((mtx)->__data.__owner != syscall(SYS_gettid))
381#define VERIFY_MUTEX_HELD(mtx) (void)1
382#define VERIFY_MUTEX_NOT_HELD(mtx) (void)1
467#define curthread_id GetCurrentThreadId()
468#define curthread GetCurrentThread()
479 return (tid1 == tid2);
490 mtx->inited = B_TRUE;
491 InitializeCriticalSection(&mtx->cs);
502 DeleteCriticalSection(&mtx->cs);
503 mtx->inited = B_FALSE;
533 EnterCriticalSection(&mtx->cs);
559 LeaveCriticalSection(&mtx->cs);
562#define VERIFY_MUTEX_HELD(mtx) (void)1
563#define VERIFY_MUTEX_NOT_HELD(mtx) (void)1
570API_EXPORT
extern bool_t lacf_thread_list_inited;
571API_EXPORT
extern mutex_t lacf_thread_list_lock;
572API_EXPORT
extern list_t lacf_thread_list;
580 if (!lacf_thread_list_inited) {
584 lacf_thread_list_inited = B_TRUE;
594 if (lacf_thread_list_inited) {
600 lacf_thread_list_inited = B_FALSE;
614 ASSERT(lacf_thread_list_inited);
627 ASSERT(lacf_thread_list_inited);
644 if (!lacf_thread_list_inited)
651 logMsg(
"Leaked thread, created here %s:%d", ti->filename,
666 const char *filename,
int linenum)
672 ti->filename = filename;
673 ti->linenum = linenum;
690 pthread_join(*thrp, NULL);
699 pthread_setname_np(pthread_self(), name);
707 pthread_setname_np(name);
715 pthread_cond_wait(cv, mtx);
721 struct timespec ts = { .tv_sec = (time_t)(limit / 1000000),
722 .tv_nsec = (long)((limit % 1000000) * 1000) };
723 return (pthread_cond_timedwait(cv, mtx, &ts));
729 pthread_cond_init(cv, NULL);
735 pthread_cond_destroy(cv);
741 pthread_cond_signal(cv);
747 pthread_cond_broadcast(cv);
751#define THREAD_PRIO_IDLE sched_get_priority_min()
752#define THREAD_PRIO_VERY_LOW (THREAD_PRIO_NORM - 2)
753#define THREAD_PRIO_LOW (THREAD_PRIO_NORM - 1)
754#define THREAD_PRIO_NORM 0
755#define THREAD_PRIO_HIGH (THREAD_PRIO_NORM + 1)
756#define THREAD_PRIO_VERY_HIGH (THREAD_PRIO_NORM + 2)
757#define THREAD_PRIO_RT sched_get_priority_max()
761 struct sched_param param = {0};
762 param.sched_priority = (prio);
763 pthread_setschedparam(thr, SCHED_OTHER, ¶m);
774#define THREAD_PRIO_IDLE 0
775#define THREAD_PRIO_VERY_LOW 0
776#define THREAD_PRIO_LOW 0
777#define THREAD_PRIO_NORM 0
778#define THREAD_PRIO_HIGH 0
779#define THREAD_PRIO_VERY_HIGH 0
780#define THREAD_PRIO_RT 0
804 const char *filename,
int linenum)
810 ti->filename = filename;
811 ti->linenum = linenum;
838 VERIFY3S(WaitForSingleObject(*thrp, INFINITE), ==, WAIT_OBJECT_0);
870 VERIFY(SleepConditionVariableCS(cv, &mtx->cs, INFINITE));
900 uint64_t now = microclock();
907 if (SleepConditionVariableCS(cv, &mtx->cs,
908 ceil((limit - now) / 1000.0)) != 0) {
911 if (GetLastError() == ERROR_TIMEOUT)
928 InitializeConditionVariable(cv);
950 WakeConditionVariable(cv);
962 WakeAllConditionVariable(cv);
970#define THREAD_PRIO_IDLE THREAD_PRIORITY_IDLE
975#define THREAD_PRIO_VERY_LOW THREAD_PRIORITY_LOWEST
980#define THREAD_PRIO_LOW THREAD_PRIORITY_BELOW_NORMAL
986#define THREAD_PRIO_NORM THREAD_PRIORITY_NORMAL
991#define THREAD_PRIO_HIGH THREAD_PRIORITY_ABOVE_NORMAL
996#define THREAD_PRIO_VERY_HIGH THREAD_PRIORITY_HIGHEST
1004#define THREAD_PRIO_RT THREAD_PRIORITY_TIME_CRITICAL
1019 SetThreadPriority(thr, prio);
1044 bool_t write_locked;
1063 memset(rw, 0,
sizeof (*rw));
1093 return (rw->write_locked && rw->writer ==
curthread_id);
1154 "acquire an rwmutex_t. This is NOT supported!");
1167 while (rw->refcount != 0 ||
1168 list_head(&rw->waiters) != (
void *)&wt) {
1176 rw->write_locked = B_TRUE;
1182 while (rw->write_locked ||
1203 ASSERT(rw->refcount != 0);
1205 if (rw->refcount == 0 && rw->write_locked) {
1207 rw->write_locked = B_FALSE;
1225#define ASSERT_MUTEX_HELD(mtx) VERIFY_MUTEX_HELD(mtx)
1226#define ASSERT_MUTEX_NOT_HELD(mtx) VERIFY_MUTEX_NOT_HELD(mtx)
1228#define ASSERT_MUTEX_HELD(mtx)
1229#define ASSERT_MUTEX_NOT_HELD(mtx)
#define ASSERT_MSG(x, fmt,...)
#define ASSERT3U(x, op, y)
#define VERIFY3S(x, op, y)
#define WARN_UNUSED_RES_ATTR
void list_destroy(list_t *)
void * list_head(const list_t *)
void list_create(list_t *, size_t, size_t)
void * list_next(const list_t *, const void *)
size_t list_count(const list_t *)
void list_remove(list_t *, void *)
void list_insert_tail(list_t *, void *)
static void * safe_calloc(size_t nmemb, size_t size)
CONDITION_VARIABLE condvar_t
static void rwmutex_enter(rwmutex_t *rw, bool_t write)
static void rwmutex_init(rwmutex_t *rw)
static bool_t lacf_thread_create(thread_t *thrp, void(*proc)(void *), void *arg, const char *filename, int linenum)
static void _lacf_thread_list_fini(void)
DWORD lacf_thread_start_routine(void *arg)
static void rwmutex_destroy(rwmutex_t *rw)
static void cv_destroy(condvar_t *cv)
void lacf_mask_sigpipe(void)
static void thread_set_name(const char *name)
static void thread_set_prio(thread_t thr, int prio)
static void mutex_destroy(mutex_t *mtx)
static void thread_join(thread_t *thrp)
static void mutex_enter(mutex_t *mtx)
static void cv_init(condvar_t *cv)
static void mutex_exit(mutex_t *mtx)
static bool_t rwmutex_can_enter_impl(const rwmutex_t *rw, const rwlock_waiter_t *wt_self)
static bool_t rwmutex_held_write(rwmutex_t *rw)
static void cv_wait(condvar_t *cv, mutex_t *mtx)
static void _lacf_thread_list_init(void)
static void cv_signal(condvar_t *cv)
static void _lacf_thread_list_add(lacf_thread_info_t *ti)
static void rwmutex_exit(rwmutex_t *rw)
static int cv_timedwait(condvar_t *cv, mutex_t *mtx, uint64_t limit)
static void mutex_init(mutex_t *mtx)
static void cv_broadcast(condvar_t *cv)
static void lacf_threads_fini(void)
static void _lacf_thread_list_remove(lacf_thread_info_t *ti)
static void rwmutex_upgrade(rwmutex_t *rw)