···11+// Copyright 2022, Collabora, Ltd.
22+// SPDX-License-Identifier: BSL-1.0
33+/*!
44+ * @file
55+ * @brief Simple worker pool.
66+ * @author Jakob Bornecrantz <jakob@collabora.com>
77+ *
88+ * @ingroup aux_util
99+ */
1010+1111+#include "os/os_threading.h"
1212+1313+#include "util/u_logging.h"
1414+#include "util/u_worker.h"
1515+#include "util/u_trace_marker.h"
1616+1717+1818+#define MAX_TASK_COUNT (64)
1919+#define MAX_THREAD_COUNT (16)
2020+2121+struct group;
2222+struct pool;
2323+2424+struct task
2525+{
2626+ //! Group this task was submitted from.
2727+ struct group *g;
2828+2929+ //! Function.
3030+ u_worker_group_func_t func;
3131+3232+ //! Function data.
3333+ void *data;
3434+};
3535+3636+struct thread
3737+{
3838+ //! Pool this thread belongs to.
3939+ struct pool *p;
4040+4141+ // Native thread.
4242+ struct os_thread thread;
4343+};
4444+4545+struct pool
4646+{
4747+ struct u_worker_thread_pool base;
4848+4949+ //! Big contenious mutex.
5050+ struct os_mutex mutex;
5151+5252+ //! Array of tasks.
5353+ struct task tasks[MAX_TASK_COUNT];
5454+5555+ //! Number of tasks in array.
5656+ size_t tasks_in_array_count;
5757+5858+ struct
5959+ {
6060+ size_t count;
6161+ struct os_cond cond;
6262+ } available; //!< For worker threads.
6363+6464+ //! Given at creation.
6565+ uint32_t initial_worker_limit;
6666+6767+ //! Currently the number of works that can work, waiting increases this.
6868+ uint32_t worker_limit;
6969+7070+ //! Number of threads working on tasks.
7171+ size_t working_count;
7272+7373+ //! Number of created threads.
7474+ size_t thread_count;
7575+7676+ //! The worker threads.
7777+ struct thread threads[MAX_THREAD_COUNT];
7878+7979+ //! Is the pool up and running?
8080+ bool running;
8181+};
8282+8383+struct group
8484+{
8585+ //! Base struct has to come first.
8686+ struct u_worker_group base;
8787+8888+ //! Pointer to poll of threads.
8989+ struct u_worker_thread_pool *uwtp;
9090+9191+ //! Number of tasks that is pending or being worked on in this group.
9292+ size_t current_submitted_tasks_count;
9393+9494+ //! Number of threads that have been released or newly entered wait.
9595+ size_t released_count;
9696+9797+ struct
9898+ {
9999+ size_t count;
100100+ struct os_cond cond;
101101+ } waiting; //!< For wait_all
102102+};
103103+104104+105105+/*
106106+ *
107107+ * Helper functions.
108108+ *
109109+ */
110110+111111+static inline struct group *
112112+group(struct u_worker_group *uwp)
113113+{
114114+ return (struct group *)uwp;
115115+}
116116+117117+static inline struct pool *
118118+pool(struct u_worker_thread_pool *uwtp)
119119+{
120120+ return (struct pool *)uwtp;
121121+}
122122+123123+124124+/*
125125+ *
126126+ * Internal pool functions.
127127+ *
128128+ */
129129+130130+static void
131131+locked_pool_pop_task(struct pool *p, struct task *out_task)
132132+{
133133+ assert(p->tasks_in_array_count > 0);
134134+135135+ for (size_t i = 0; i < MAX_TASK_COUNT; i++) {
136136+ if (p->tasks[i].func == NULL) {
137137+ continue;
138138+ }
139139+140140+ *out_task = p->tasks[i];
141141+ p->tasks[i] = (struct task){NULL, NULL, NULL};
142142+ p->tasks_in_array_count--;
143143+ return;
144144+ }
145145+146146+ assert(false);
147147+}
148148+149149+static void
150150+locked_pool_push_task(struct pool *p, struct group *g, u_worker_group_func_t func, void *data)
151151+{
152152+ assert(p->tasks_in_array_count < MAX_TASK_COUNT);
153153+154154+ for (size_t i = 0; i < MAX_TASK_COUNT; i++) {
155155+ if (p->tasks[i].func != NULL) {
156156+ continue;
157157+ }
158158+159159+ p->tasks[i] = (struct task){g, func, data};
160160+ p->tasks_in_array_count++;
161161+ g->current_submitted_tasks_count++;
162162+ return;
163163+ }
164164+165165+ assert(false);
166166+}
167167+168168+static void
169169+locked_pool_wake_worker_if_allowed(struct pool *p)
170170+{
171171+ // No tasks in array, don't wake any thread.
172172+ if (p->tasks_in_array_count == 0) {
173173+ return;
174174+ }
175175+176176+ // The number of working threads is at the limit.
177177+ if (p->working_count >= p->worker_limit) {
178178+ return;
179179+ }
180180+181181+ // No waiting thread.
182182+ if (p->available.count == 0) {
183183+ //! @todo Is this a error?
184184+ return;
185185+ }
186186+187187+ os_cond_signal(&p->available.cond);
188188+}
189189+190190+191191+/*
192192+ *
193193+ * Thread group functions.
194194+ *
195195+ */
196196+197197+static bool
198198+locked_group_should_enter_wait_loop(struct pool *p, struct group *g)
199199+{
200200+ if (g->current_submitted_tasks_count == 0) {
201201+ return false;
202202+ }
203203+204204+ // Enter the loop as a released thread.
205205+ g->released_count++;
206206+207207+ return true;
208208+}
209209+210210+static bool
211211+locked_group_should_wait(struct pool *p, struct group *g)
212212+{
213213+ /*
214214+ * There are several cases that needs to be covered by this function.
215215+ *
216216+ * A thread is entering the wait_all function for the first time, and
217217+ * work is outstanding what we should do then is increase the worker
218218+ * limit and wait on the conditional.
219219+ *
220220+ * Similar to above, we where woken up, there are more work outstanding
221221+ * on the group and we had been released, remove one released and up the
222222+ * worker limit, then wait on the conditional.
223223+ *
224224+ * A thread (or more) has been woken up and no new tasks has been
225225+ * submitted, then break out of the loop and decrement the released
226226+ * count.
227227+ *
228228+ * As above, but we where one of many woken up but only one thread had
229229+ * been released and that released count had been taken, then we should
230230+ * do nothing and wait again.
231231+ */
232232+233233+ // Tasks available.
234234+ if (g->current_submitted_tasks_count > 0) {
235235+236236+ // We have been released or newly entered the loop.
237237+ if (g->released_count > 0) {
238238+ g->released_count--;
239239+ p->worker_limit++;
240240+241241+ // Wake a worker with the new worker limit.
242242+ locked_pool_wake_worker_if_allowed(p);
243243+ }
244244+245245+ return true;
246246+ }
247247+248248+ // No tasks, and we have been released, party!
249249+ if (g->released_count > 0) {
250250+ g->released_count--;
251251+ return false;
252252+ }
253253+254254+ // We where woken up, but nothing had been released, loop again.
255255+ return true;
256256+}
257257+258258+static void
259259+locked_group_wake_waiter_if_allowed(struct pool *p, struct group *g)
260260+{
261261+ // Are there still outstanding tasks?
262262+ if (g->current_submitted_tasks_count > 0) {
263263+ return;
264264+ }
265265+266266+ // Is there a thread waiting or not?
267267+ if (g->waiting.count == 0) {
268268+ return;
269269+ }
270270+271271+ // Wake one waiting thread.
272272+ os_cond_signal(&g->waiting.cond);
273273+274274+ assert(p->worker_limit > p->initial_worker_limit);
275275+276276+ // Remove one waiting threads.
277277+ p->worker_limit--;
278278+279279+ // We have released one thread.
280280+ g->released_count++;
281281+}
282282+283283+static void
284284+locked_group_wait(struct pool *p, struct group *g)
285285+{
286286+ // Update tracking.
287287+ g->waiting.count++;
288288+289289+ // The wait, also unlocks the mutex.
290290+ os_cond_wait(&g->waiting.cond, &p->mutex);
291291+292292+ // Update tracking.
293293+ g->waiting.count--;
294294+}
295295+296296+297297+/*
298298+ *
299299+ * Thread internal functions.
300300+ *
301301+ */
302302+303303+static bool
304304+locked_thread_allowed_to_work(struct pool *p)
305305+{
306306+ // No work for you!
307307+ if (p->tasks_in_array_count == 0) {
308308+ return false;
309309+ }
310310+311311+ // Reached the limit.
312312+ if (p->working_count >= p->worker_limit) {
313313+ return false;
314314+ }
315315+316316+ return true;
317317+}
318318+319319+static void
320320+locked_thread_wait_for_work(struct pool *p)
321321+{
322322+ // Update tracking.
323323+ p->available.count++;
324324+325325+ // The wait, also unlocks the mutex.
326326+ os_cond_wait(&p->available.cond, &p->mutex);
327327+328328+ // Update tracking.
329329+ p->available.count--;
330330+}
331331+332332+static void *
333333+run_func(void *ptr)
334334+{
335335+ struct thread *t = (struct thread *)ptr;
336336+ struct pool *p = t->p;
337337+338338+ os_mutex_lock(&p->mutex);
339339+340340+ while (p->running) {
341341+342342+ if (!locked_thread_allowed_to_work(p)) {
343343+ locked_thread_wait_for_work(p);
344344+345345+ // Check running first when woken up.
346346+ continue;
347347+ }
348348+349349+ // Pop a task from the pool.
350350+ struct task task = {NULL, NULL, NULL};
351351+ locked_pool_pop_task(p, &task);
352352+353353+ // We are now counting as working, needed for wake below.
354354+ p->working_count++;
355355+356356+ // Signal another thread if conditions are met.
357357+ locked_pool_wake_worker_if_allowed(p);
358358+359359+ // Do the actual work here.
360360+ os_mutex_unlock(&p->mutex);
361361+ task.func(task.data);
362362+ os_mutex_lock(&p->mutex);
363363+364364+ // No longer working.
365365+ p->working_count--;
366366+367367+ // Only now decrement the task count on the owning group.
368368+ task.g->current_submitted_tasks_count--;
369369+370370+ // Wake up any waiter.
371371+ locked_group_wake_waiter_if_allowed(p, task.g);
372372+ }
373373+374374+ // Make sure all threads are woken up.
375375+ os_cond_signal(&p->available.cond);
376376+377377+ os_mutex_unlock(&p->mutex);
378378+379379+ return NULL;
380380+}
381381+382382+383383+/*
384384+ *
385385+ * 'Exported' thread pool functions.
386386+ *
387387+ */
388388+389389+struct u_worker_thread_pool *
390390+u_worker_thread_pool_create(uint32_t starting_worker_count, uint32_t thread_count)
391391+{
392392+ XRT_TRACE_MARKER();
393393+394394+ assert(starting_worker_count < thread_count);
395395+ if (starting_worker_count >= thread_count) {
396396+ return NULL;
397397+ }
398398+399399+ assert(thread_count <= MAX_THREAD_COUNT);
400400+ if (thread_count > MAX_THREAD_COUNT) {
401401+ return NULL;
402402+ }
403403+404404+ struct pool *p = U_TYPED_CALLOC(struct pool);
405405+ p->base.reference.count = 1;
406406+ p->initial_worker_limit = starting_worker_count;
407407+ p->worker_limit = starting_worker_count;
408408+ p->thread_count = thread_count;
409409+ p->running = true;
410410+411411+ for (size_t i = 0; i < thread_count; i++) {
412412+ p->threads[i].p = p;
413413+ os_thread_init(&p->threads[i].thread);
414414+ os_thread_start(&p->threads[i].thread, run_func, &p->threads[i]);
415415+ }
416416+417417+ return (struct u_worker_thread_pool *)p;
418418+}
419419+420420+void
421421+u_worker_thread_pool_destroy(struct u_worker_thread_pool *uwtp)
422422+{
423423+ XRT_TRACE_MARKER();
424424+425425+ struct pool *p = pool(uwtp);
426426+427427+ os_mutex_lock(&p->mutex);
428428+429429+ p->running = false;
430430+ os_cond_signal(&p->available.cond);
431431+ os_mutex_unlock(&p->mutex);
432432+433433+ // Wait for all threads.
434434+ for (size_t i = 0; i < p->thread_count; i++) {
435435+ os_thread_join(&p->threads[i].thread);
436436+ os_thread_destroy(&p->threads[i].thread);
437437+ }
438438+439439+ os_mutex_destroy(&p->mutex);
440440+ os_cond_destroy(&p->available.cond);
441441+442442+ free(p);
443443+}
444444+445445+446446+/*
447447+ *
448448+ * 'Exported' group functions.
449449+ *
450450+ */
451451+452452+struct u_worker_group *
453453+u_worker_group_create(struct u_worker_thread_pool *uwtp)
454454+{
455455+ XRT_TRACE_MARKER();
456456+457457+ struct group *g = U_TYPED_CALLOC(struct group);
458458+ g->base.reference.count = 1;
459459+ u_worker_thread_pool_reference(&g->uwtp, uwtp);
460460+461461+ os_cond_init(&g->waiting.cond);
462462+463463+ return (struct u_worker_group *)g;
464464+}
465465+466466+void
467467+u_worker_group_push(struct u_worker_group *uwp, u_worker_group_func_t f, void *data)
468468+{
469469+ XRT_TRACE_MARKER();
470470+471471+ struct group *g = group(uwp);
472472+ struct pool *p = pool(g->uwtp);
473473+474474+ os_mutex_lock(&p->mutex);
475475+ while (p->tasks_in_array_count >= MAX_TASK_COUNT) {
476476+ os_mutex_unlock(&p->mutex);
477477+478478+ //! @todo Don't wait all, wait one.
479479+ u_worker_group_wait_all(uwp);
480480+481481+ os_mutex_lock(&p->mutex);
482482+ }
483483+484484+ locked_pool_push_task(p, g, f, data);
485485+486486+ // There are worker threads available, wake one up.
487487+ if (p->available.count > 0) {
488488+ os_cond_signal(&p->available.cond);
489489+ }
490490+491491+ os_mutex_unlock(&p->mutex);
492492+}
493493+494494+void
495495+u_worker_group_wait_all(struct u_worker_group *uwp)
496496+{
497497+ XRT_TRACE_MARKER();
498498+499499+ struct group *g = group(uwp);
500500+ struct pool *p = pool(g->uwtp);
501501+502502+ os_mutex_lock(&p->mutex);
503503+504504+ // Can we early out?
505505+ if (!locked_group_should_enter_wait_loop(p, g)) {
506506+ os_mutex_unlock(&p->mutex);
507507+ return;
508508+ }
509509+510510+ // Wait here until all work been started and completed.
511511+ while (locked_group_should_wait(p, g)) {
512512+ // Do the wait.
513513+ locked_group_wait(p, g);
514514+ }
515515+516516+ os_mutex_unlock(&p->mutex);
517517+}
518518+519519+void
520520+u_worker_group_destroy(struct u_worker_group *uwp)
521521+{
522522+ XRT_TRACE_MARKER();
523523+524524+ struct group *g = group(uwp);
525525+ assert(g->base.reference.count == 0);
526526+527527+ u_worker_group_wait_all(uwp);
528528+529529+ u_worker_thread_pool_reference(&g->uwtp, NULL);
530530+531531+ os_cond_destroy(&g->waiting.cond);
532532+533533+ free(uwp);
534534+}