The open source OpenXR runtime
1// Copyright 2022, Collabora, Ltd.
2// Copyright 2024-2025, NVIDIA CORPORATION.
3// SPDX-License-Identifier: BSL-1.0
4/*!
5 * @file
6 * @brief Simple worker pool.
7 * @author Jakob Bornecrantz <jakob@collabora.com>
8 *
9 * @ingroup aux_util
10 */
11
12#include "os/os_threading.h"
13
14#include "util/u_logging.h"
15#include "util/u_worker.h"
16#include "util/u_trace_marker.h"
17
18
19#define MAX_TASK_COUNT (64)
20#define MAX_THREAD_COUNT (16)
21
22struct group;
23struct pool;
24
25struct task
26{
27 //! Group this task was submitted from.
28 struct group *g;
29
30 //! Function.
31 u_worker_group_func_t func;
32
33 //! Function data.
34 void *data;
35};
36
37struct thread
38{
39 //! Pool this thread belongs to.
40 struct pool *p;
41
42 // Native thread.
43 struct os_thread thread;
44
45 //! Thread name.
46 char name[64];
47};
48
49struct pool
50{
51 struct u_worker_thread_pool base;
52
53 //! Big contenious mutex.
54 struct os_mutex mutex;
55
56 //! Array of tasks.
57 struct task tasks[MAX_TASK_COUNT];
58
59 //! Number of tasks in array.
60 size_t tasks_in_array_count;
61
62 struct
63 {
64 size_t count;
65 struct os_cond cond;
66 } available; //!< For worker threads.
67
68 //! Given at creation.
69 uint32_t initial_worker_limit;
70
71 //! Currently the number of works that can work, waiting increases this.
72 uint32_t worker_limit;
73
74 //! Number of threads working on tasks.
75 size_t working_count;
76
77 //! Number of created threads.
78 size_t thread_count;
79
80 //! The worker threads.
81 struct thread threads[MAX_THREAD_COUNT];
82
83 //! Is the pool up and running?
84 bool running;
85
86 //! Prefix to use for thread names.
87 char prefix[32];
88};
89
90struct group
91{
92 //! Base struct has to come first.
93 struct u_worker_group base;
94
95 //! Pointer to poll of threads.
96 struct u_worker_thread_pool *uwtp;
97
98 /*!
99 * The number of tasks that are pending execution by a worker.
100 * They reside in the pool::tasks array.
101 */
102 uint32_t current_tasks_in_array;
103
104 /*!
105 * Number of tasks that are being worked on.
106 * They live inside of the working thread.
107 */
108 uint32_t current_working_tasks;
109
110 /*!
111 * Number of waiting threads that have been released by a worker,
112 * or a thread that has started waiting (see u_worker_group_wait_all).
113 */
114 size_t released_count;
115
116 struct
117 {
118 size_t count;
119 struct os_cond cond;
120 } waiting; //!< For wait_all
121};
122
123
124/*
125 *
126 * Helper functions.
127 *
128 */
129
130static inline struct group *
131group(struct u_worker_group *uwg)
132{
133 return (struct group *)uwg;
134}
135
136static inline struct pool *
137pool(struct u_worker_thread_pool *uwtp)
138{
139 return (struct pool *)uwtp;
140}
141
142
143/*
144 *
145 * Internal pool functions.
146 *
147 */
148
149static void
150locked_pool_pop_task(struct pool *p, struct task *out_task)
151{
152 assert(p->tasks_in_array_count > 0);
153
154 for (size_t i = 0; i < MAX_TASK_COUNT; i++) {
155 if (p->tasks[i].func == NULL) {
156 continue;
157 }
158
159 struct task task = p->tasks[i];
160 p->tasks[i] = (struct task){NULL, NULL, NULL};
161
162 p->tasks_in_array_count--;
163 task.g->current_tasks_in_array--;
164 task.g->current_working_tasks++;
165
166 *out_task = task;
167
168 return;
169 }
170
171 assert(false);
172}
173
174static void
175locked_pool_push_task(struct pool *p, struct group *g, u_worker_group_func_t func, void *data)
176{
177 assert(p->tasks_in_array_count < MAX_TASK_COUNT);
178
179 for (size_t i = 0; i < MAX_TASK_COUNT; i++) {
180 if (p->tasks[i].func != NULL) {
181 continue;
182 }
183
184 p->tasks[i] = (struct task){g, func, data};
185 p->tasks_in_array_count++;
186 g->current_tasks_in_array++;
187 return;
188 }
189
190 assert(false);
191}
192
193static void
194locked_pool_wake_worker_if_allowed(struct pool *p)
195{
196 // No tasks in array, don't wake any thread.
197 if (p->tasks_in_array_count == 0) {
198 return;
199 }
200
201 // The number of working threads is at the limit.
202 if (p->working_count >= p->worker_limit) {
203 return;
204 }
205
206 // No waiting thread.
207 if (p->available.count == 0) {
208 //! @todo Is this a error?
209 return;
210 }
211
212 os_cond_signal(&p->available.cond);
213}
214
215
216/*
217 *
218 * Thread group functions.
219 *
220 */
221
222static bool
223locked_group_has_tasks_waiting_or_inflight(const struct group *g)
224{
225 if (g->current_tasks_in_array == 0 && g->current_working_tasks == 0) {
226 return false;
227 }
228
229 return true;
230}
231
232static bool
233locked_group_should_wait(struct pool *p, struct group *g)
234{
235 /*
236 * There are several cases that needs to be covered by this function.
237 *
238 * A thread is entering the wait_all function for the first time, and
239 * work is outstanding what we should do then is increase the worker
240 * limit and wait on the conditional.
241 *
242 * Similar to preceding, we were woken up, there are more work outstanding
243 * on the group and we had been released, remove one released and up the
244 * worker limit, then wait on the conditional.
245 *
246 * A thread (or more) has been woken up and no new tasks has been
247 * submitted, then break out of the loop and decrement the released
248 * count.
249 *
250 * As preceding, but we were one of many woken up but only one thread had
251 * been released and that released count had been taken, then we should
252 * do nothing and wait again.
253 */
254
255 // Tasks available.
256 if (locked_group_has_tasks_waiting_or_inflight(g)) {
257
258 // We have been released or newly entered the loop.
259 if (g->released_count > 0) {
260 g->released_count--;
261 p->worker_limit++;
262
263 // Wake a worker with the new worker limit.
264 locked_pool_wake_worker_if_allowed(p);
265 }
266
267 return true;
268 }
269
270 // No tasks, and we have been released, party!
271 if (g->released_count > 0) {
272 g->released_count--;
273 return false;
274 }
275
276 // We where woken up, but nothing had been released, loop again.
277 return true;
278}
279
280static void
281locked_group_wake_waiter_if_allowed(struct pool *p, struct group *g)
282{
283 // Are there still outstanding tasks?
284 if (locked_group_has_tasks_waiting_or_inflight(g)) {
285 return;
286 }
287
288 // Is there a thread waiting or not?
289 if (g->waiting.count == 0) {
290 return;
291 }
292
293 // Wake one waiting thread.
294 os_cond_signal(&g->waiting.cond);
295
296 assert(p->worker_limit > p->initial_worker_limit);
297
298 // Remove one waiting threads.
299 p->worker_limit--;
300
301 // We have released one thread.
302 g->released_count++;
303}
304
305static void
306locked_group_wait(struct pool *p, struct group *g)
307{
308 // Update tracking.
309 g->waiting.count++;
310
311 // The wait, also unlocks the mutex.
312 os_cond_wait(&g->waiting.cond, &p->mutex);
313
314 // Update tracking.
315 g->waiting.count--;
316}
317
318
319/*
320 *
321 * Thread internal functions.
322 *
323 */
324
325static bool
326locked_thread_allowed_to_work(struct pool *p)
327{
328 // No work for you!
329 if (p->tasks_in_array_count == 0) {
330 return false;
331 }
332
333 // Reached the limit.
334 if (p->working_count >= p->worker_limit) {
335 return false;
336 }
337
338 return true;
339}
340
341static void
342locked_thread_wait_for_work(struct pool *p)
343{
344 // Update tracking.
345 p->available.count++;
346
347 // The wait, also unlocks the mutex.
348 os_cond_wait(&p->available.cond, &p->mutex);
349
350 // Update tracking.
351 p->available.count--;
352}
353
354static void *
355run_func(void *ptr)
356{
357 struct thread *t = (struct thread *)ptr;
358 struct pool *p = t->p;
359
360 snprintf(t->name, sizeof(t->name), "%s: Worker", p->prefix);
361 U_TRACE_SET_THREAD_NAME(t->name);
362 os_thread_name(&t->thread, t->name);
363
364 os_mutex_lock(&p->mutex);
365
366 while (p->running) {
367
368 if (!locked_thread_allowed_to_work(p)) {
369 locked_thread_wait_for_work(p);
370
371 // Check running first when woken up.
372 continue;
373 }
374
375 // Pop a task from the pool.
376 struct task task = {NULL, NULL, NULL};
377 locked_pool_pop_task(p, &task);
378
379 // We are now counting as working, needed for wake below.
380 p->working_count++;
381
382 // Signal another thread if conditions are met.
383 locked_pool_wake_worker_if_allowed(p);
384
385 // Do the actual work here.
386 os_mutex_unlock(&p->mutex);
387 task.func(task.data);
388 os_mutex_lock(&p->mutex);
389
390 // No longer working.
391 p->working_count--;
392
393 // We are no longer working on the task.
394 task.g->current_working_tasks--;
395
396 // This must hold true.
397 assert(task.g->current_tasks_in_array <= p->tasks_in_array_count);
398
399 // Wake up any waiter.
400 locked_group_wake_waiter_if_allowed(p, task.g);
401 }
402
403 // Make sure all threads are woken up.
404 os_cond_signal(&p->available.cond);
405
406 os_mutex_unlock(&p->mutex);
407
408 return NULL;
409}
410
411
412/*
413 *
414 * 'Exported' thread pool functions.
415 *
416 */
417
418struct u_worker_thread_pool *
419u_worker_thread_pool_create(uint32_t starting_worker_count, uint32_t thread_count, const char *prefix)
420{
421 XRT_TRACE_MARKER();
422 int ret;
423
424 assert(starting_worker_count <= thread_count);
425 if (starting_worker_count > thread_count) {
426 return NULL;
427 }
428
429 assert(thread_count <= MAX_THREAD_COUNT);
430 if (thread_count > MAX_THREAD_COUNT) {
431 return NULL;
432 }
433
434 struct pool *p = U_TYPED_CALLOC(struct pool);
435 p->base.reference.count = 1;
436 p->initial_worker_limit = starting_worker_count;
437 p->worker_limit = starting_worker_count;
438 p->thread_count = thread_count;
439 p->running = true;
440 snprintf(p->prefix, sizeof(p->prefix), "%s", prefix);
441
442 ret = os_mutex_init(&p->mutex);
443 if (ret != 0) {
444 goto err_alloc;
445 }
446
447 ret = os_cond_init(&p->available.cond);
448 if (ret != 0) {
449 goto err_mutex;
450 }
451
452 for (size_t i = 0; i < thread_count; i++) {
453 p->threads[i].p = p;
454 os_thread_init(&p->threads[i].thread);
455 os_thread_start(&p->threads[i].thread, run_func, &p->threads[i]);
456 }
457
458 return (struct u_worker_thread_pool *)p;
459
460
461err_mutex:
462 os_mutex_destroy(&p->mutex);
463
464err_alloc:
465 free(p);
466
467 return NULL;
468}
469
470void
471u_worker_thread_pool_destroy(struct u_worker_thread_pool *uwtp)
472{
473 XRT_TRACE_MARKER();
474
475 struct pool *p = pool(uwtp);
476
477 os_mutex_lock(&p->mutex);
478
479 p->running = false;
480 os_cond_signal(&p->available.cond);
481 os_mutex_unlock(&p->mutex);
482
483 // Wait for all threads.
484 for (size_t i = 0; i < p->thread_count; i++) {
485 os_thread_join(&p->threads[i].thread);
486 os_thread_destroy(&p->threads[i].thread);
487 }
488
489 os_mutex_destroy(&p->mutex);
490 os_cond_destroy(&p->available.cond);
491
492 free(p);
493}
494
495
496/*
497 *
498 * 'Exported' group functions.
499 *
500 */
501
502struct u_worker_group *
503u_worker_group_create(struct u_worker_thread_pool *uwtp)
504{
505 XRT_TRACE_MARKER();
506
507 struct group *g = U_TYPED_CALLOC(struct group);
508 g->base.reference.count = 1;
509 u_worker_thread_pool_reference(&g->uwtp, uwtp);
510
511 os_cond_init(&g->waiting.cond);
512
513 return (struct u_worker_group *)g;
514}
515
516void
517u_worker_group_push(struct u_worker_group *uwg, u_worker_group_func_t f, void *data)
518{
519 XRT_TRACE_MARKER();
520
521 struct group *g = group(uwg);
522 struct pool *p = pool(g->uwtp);
523
524 os_mutex_lock(&p->mutex);
525 while (p->tasks_in_array_count >= MAX_TASK_COUNT) {
526 os_mutex_unlock(&p->mutex);
527
528 //! @todo Don't wait all, wait one.
529 u_worker_group_wait_all(uwg);
530
531 os_mutex_lock(&p->mutex);
532 }
533
534 locked_pool_push_task(p, g, f, data);
535
536 // There are worker threads available, wake one up.
537 if (p->available.count > 0) {
538 os_cond_signal(&p->available.cond);
539 }
540
541 os_mutex_unlock(&p->mutex);
542}
543
544void
545u_worker_group_wait_all(struct u_worker_group *uwg)
546{
547 XRT_TRACE_MARKER();
548
549 struct group *g = group(uwg);
550 struct pool *p = pool(g->uwtp);
551
552 os_mutex_lock(&p->mutex);
553
554 // Can we early out?
555 if (!locked_group_has_tasks_waiting_or_inflight(g)) {
556 os_mutex_unlock(&p->mutex);
557 return;
558 }
559
560 /*
561 * The released_count is tied to the decrement of worker_limit, that is
562 * when a waiting thread is woken up the worker_limit is decreased, and
563 * released_count is increased. The waiting thread will then double
564 * check that it can be released or not, if it can not be released it
565 * will once again donate this thread and increase the worker_limit.
566 *
567 * If it can be released it will decrement released_count and exit the
568 * loop below.
569 *
570 * So if we increment it here, the loop will increase worker_limit
571 * which is what we want.
572 */
573 g->released_count++;
574
575 // Wait here until all work been started and completed.
576 while (locked_group_should_wait(p, g)) {
577 // Do the wait.
578 locked_group_wait(p, g);
579 }
580
581 os_mutex_unlock(&p->mutex);
582}
583
584void
585u_worker_group_destroy(struct u_worker_group *uwg)
586{
587 XRT_TRACE_MARKER();
588
589 struct group *g = group(uwg);
590 assert(g->base.reference.count == 0);
591
592 u_worker_group_wait_all(uwg);
593
594 u_worker_thread_pool_reference(&g->uwtp, NULL);
595
596 os_cond_destroy(&g->waiting.cond);
597
598 free(uwg);
599}