The open source OpenXR runtime
at main 599 lines 12 kB view raw
1// Copyright 2022, Collabora, Ltd. 2// Copyright 2024-2025, NVIDIA CORPORATION. 3// SPDX-License-Identifier: BSL-1.0 4/*! 5 * @file 6 * @brief Simple worker pool. 7 * @author Jakob Bornecrantz <jakob@collabora.com> 8 * 9 * @ingroup aux_util 10 */ 11 12#include "os/os_threading.h" 13 14#include "util/u_logging.h" 15#include "util/u_worker.h" 16#include "util/u_trace_marker.h" 17 18 19#define MAX_TASK_COUNT (64) 20#define MAX_THREAD_COUNT (16) 21 22struct group; 23struct pool; 24 25struct task 26{ 27 //! Group this task was submitted from. 28 struct group *g; 29 30 //! Function. 31 u_worker_group_func_t func; 32 33 //! Function data. 34 void *data; 35}; 36 37struct thread 38{ 39 //! Pool this thread belongs to. 40 struct pool *p; 41 42 // Native thread. 43 struct os_thread thread; 44 45 //! Thread name. 46 char name[64]; 47}; 48 49struct pool 50{ 51 struct u_worker_thread_pool base; 52 53 //! Big contenious mutex. 54 struct os_mutex mutex; 55 56 //! Array of tasks. 57 struct task tasks[MAX_TASK_COUNT]; 58 59 //! Number of tasks in array. 60 size_t tasks_in_array_count; 61 62 struct 63 { 64 size_t count; 65 struct os_cond cond; 66 } available; //!< For worker threads. 67 68 //! Given at creation. 69 uint32_t initial_worker_limit; 70 71 //! Currently the number of works that can work, waiting increases this. 72 uint32_t worker_limit; 73 74 //! Number of threads working on tasks. 75 size_t working_count; 76 77 //! Number of created threads. 78 size_t thread_count; 79 80 //! The worker threads. 81 struct thread threads[MAX_THREAD_COUNT]; 82 83 //! Is the pool up and running? 84 bool running; 85 86 //! Prefix to use for thread names. 87 char prefix[32]; 88}; 89 90struct group 91{ 92 //! Base struct has to come first. 93 struct u_worker_group base; 94 95 //! Pointer to poll of threads. 96 struct u_worker_thread_pool *uwtp; 97 98 /*! 99 * The number of tasks that are pending execution by a worker. 100 * They reside in the pool::tasks array. 101 */ 102 uint32_t current_tasks_in_array; 103 104 /*! 105 * Number of tasks that are being worked on. 106 * They live inside of the working thread. 107 */ 108 uint32_t current_working_tasks; 109 110 /*! 111 * Number of waiting threads that have been released by a worker, 112 * or a thread that has started waiting (see u_worker_group_wait_all). 113 */ 114 size_t released_count; 115 116 struct 117 { 118 size_t count; 119 struct os_cond cond; 120 } waiting; //!< For wait_all 121}; 122 123 124/* 125 * 126 * Helper functions. 127 * 128 */ 129 130static inline struct group * 131group(struct u_worker_group *uwg) 132{ 133 return (struct group *)uwg; 134} 135 136static inline struct pool * 137pool(struct u_worker_thread_pool *uwtp) 138{ 139 return (struct pool *)uwtp; 140} 141 142 143/* 144 * 145 * Internal pool functions. 146 * 147 */ 148 149static void 150locked_pool_pop_task(struct pool *p, struct task *out_task) 151{ 152 assert(p->tasks_in_array_count > 0); 153 154 for (size_t i = 0; i < MAX_TASK_COUNT; i++) { 155 if (p->tasks[i].func == NULL) { 156 continue; 157 } 158 159 struct task task = p->tasks[i]; 160 p->tasks[i] = (struct task){NULL, NULL, NULL}; 161 162 p->tasks_in_array_count--; 163 task.g->current_tasks_in_array--; 164 task.g->current_working_tasks++; 165 166 *out_task = task; 167 168 return; 169 } 170 171 assert(false); 172} 173 174static void 175locked_pool_push_task(struct pool *p, struct group *g, u_worker_group_func_t func, void *data) 176{ 177 assert(p->tasks_in_array_count < MAX_TASK_COUNT); 178 179 for (size_t i = 0; i < MAX_TASK_COUNT; i++) { 180 if (p->tasks[i].func != NULL) { 181 continue; 182 } 183 184 p->tasks[i] = (struct task){g, func, data}; 185 p->tasks_in_array_count++; 186 g->current_tasks_in_array++; 187 return; 188 } 189 190 assert(false); 191} 192 193static void 194locked_pool_wake_worker_if_allowed(struct pool *p) 195{ 196 // No tasks in array, don't wake any thread. 197 if (p->tasks_in_array_count == 0) { 198 return; 199 } 200 201 // The number of working threads is at the limit. 202 if (p->working_count >= p->worker_limit) { 203 return; 204 } 205 206 // No waiting thread. 207 if (p->available.count == 0) { 208 //! @todo Is this a error? 209 return; 210 } 211 212 os_cond_signal(&p->available.cond); 213} 214 215 216/* 217 * 218 * Thread group functions. 219 * 220 */ 221 222static bool 223locked_group_has_tasks_waiting_or_inflight(const struct group *g) 224{ 225 if (g->current_tasks_in_array == 0 && g->current_working_tasks == 0) { 226 return false; 227 } 228 229 return true; 230} 231 232static bool 233locked_group_should_wait(struct pool *p, struct group *g) 234{ 235 /* 236 * There are several cases that needs to be covered by this function. 237 * 238 * A thread is entering the wait_all function for the first time, and 239 * work is outstanding what we should do then is increase the worker 240 * limit and wait on the conditional. 241 * 242 * Similar to preceding, we were woken up, there are more work outstanding 243 * on the group and we had been released, remove one released and up the 244 * worker limit, then wait on the conditional. 245 * 246 * A thread (or more) has been woken up and no new tasks has been 247 * submitted, then break out of the loop and decrement the released 248 * count. 249 * 250 * As preceding, but we were one of many woken up but only one thread had 251 * been released and that released count had been taken, then we should 252 * do nothing and wait again. 253 */ 254 255 // Tasks available. 256 if (locked_group_has_tasks_waiting_or_inflight(g)) { 257 258 // We have been released or newly entered the loop. 259 if (g->released_count > 0) { 260 g->released_count--; 261 p->worker_limit++; 262 263 // Wake a worker with the new worker limit. 264 locked_pool_wake_worker_if_allowed(p); 265 } 266 267 return true; 268 } 269 270 // No tasks, and we have been released, party! 271 if (g->released_count > 0) { 272 g->released_count--; 273 return false; 274 } 275 276 // We where woken up, but nothing had been released, loop again. 277 return true; 278} 279 280static void 281locked_group_wake_waiter_if_allowed(struct pool *p, struct group *g) 282{ 283 // Are there still outstanding tasks? 284 if (locked_group_has_tasks_waiting_or_inflight(g)) { 285 return; 286 } 287 288 // Is there a thread waiting or not? 289 if (g->waiting.count == 0) { 290 return; 291 } 292 293 // Wake one waiting thread. 294 os_cond_signal(&g->waiting.cond); 295 296 assert(p->worker_limit > p->initial_worker_limit); 297 298 // Remove one waiting threads. 299 p->worker_limit--; 300 301 // We have released one thread. 302 g->released_count++; 303} 304 305static void 306locked_group_wait(struct pool *p, struct group *g) 307{ 308 // Update tracking. 309 g->waiting.count++; 310 311 // The wait, also unlocks the mutex. 312 os_cond_wait(&g->waiting.cond, &p->mutex); 313 314 // Update tracking. 315 g->waiting.count--; 316} 317 318 319/* 320 * 321 * Thread internal functions. 322 * 323 */ 324 325static bool 326locked_thread_allowed_to_work(struct pool *p) 327{ 328 // No work for you! 329 if (p->tasks_in_array_count == 0) { 330 return false; 331 } 332 333 // Reached the limit. 334 if (p->working_count >= p->worker_limit) { 335 return false; 336 } 337 338 return true; 339} 340 341static void 342locked_thread_wait_for_work(struct pool *p) 343{ 344 // Update tracking. 345 p->available.count++; 346 347 // The wait, also unlocks the mutex. 348 os_cond_wait(&p->available.cond, &p->mutex); 349 350 // Update tracking. 351 p->available.count--; 352} 353 354static void * 355run_func(void *ptr) 356{ 357 struct thread *t = (struct thread *)ptr; 358 struct pool *p = t->p; 359 360 snprintf(t->name, sizeof(t->name), "%s: Worker", p->prefix); 361 U_TRACE_SET_THREAD_NAME(t->name); 362 os_thread_name(&t->thread, t->name); 363 364 os_mutex_lock(&p->mutex); 365 366 while (p->running) { 367 368 if (!locked_thread_allowed_to_work(p)) { 369 locked_thread_wait_for_work(p); 370 371 // Check running first when woken up. 372 continue; 373 } 374 375 // Pop a task from the pool. 376 struct task task = {NULL, NULL, NULL}; 377 locked_pool_pop_task(p, &task); 378 379 // We are now counting as working, needed for wake below. 380 p->working_count++; 381 382 // Signal another thread if conditions are met. 383 locked_pool_wake_worker_if_allowed(p); 384 385 // Do the actual work here. 386 os_mutex_unlock(&p->mutex); 387 task.func(task.data); 388 os_mutex_lock(&p->mutex); 389 390 // No longer working. 391 p->working_count--; 392 393 // We are no longer working on the task. 394 task.g->current_working_tasks--; 395 396 // This must hold true. 397 assert(task.g->current_tasks_in_array <= p->tasks_in_array_count); 398 399 // Wake up any waiter. 400 locked_group_wake_waiter_if_allowed(p, task.g); 401 } 402 403 // Make sure all threads are woken up. 404 os_cond_signal(&p->available.cond); 405 406 os_mutex_unlock(&p->mutex); 407 408 return NULL; 409} 410 411 412/* 413 * 414 * 'Exported' thread pool functions. 415 * 416 */ 417 418struct u_worker_thread_pool * 419u_worker_thread_pool_create(uint32_t starting_worker_count, uint32_t thread_count, const char *prefix) 420{ 421 XRT_TRACE_MARKER(); 422 int ret; 423 424 assert(starting_worker_count <= thread_count); 425 if (starting_worker_count > thread_count) { 426 return NULL; 427 } 428 429 assert(thread_count <= MAX_THREAD_COUNT); 430 if (thread_count > MAX_THREAD_COUNT) { 431 return NULL; 432 } 433 434 struct pool *p = U_TYPED_CALLOC(struct pool); 435 p->base.reference.count = 1; 436 p->initial_worker_limit = starting_worker_count; 437 p->worker_limit = starting_worker_count; 438 p->thread_count = thread_count; 439 p->running = true; 440 snprintf(p->prefix, sizeof(p->prefix), "%s", prefix); 441 442 ret = os_mutex_init(&p->mutex); 443 if (ret != 0) { 444 goto err_alloc; 445 } 446 447 ret = os_cond_init(&p->available.cond); 448 if (ret != 0) { 449 goto err_mutex; 450 } 451 452 for (size_t i = 0; i < thread_count; i++) { 453 p->threads[i].p = p; 454 os_thread_init(&p->threads[i].thread); 455 os_thread_start(&p->threads[i].thread, run_func, &p->threads[i]); 456 } 457 458 return (struct u_worker_thread_pool *)p; 459 460 461err_mutex: 462 os_mutex_destroy(&p->mutex); 463 464err_alloc: 465 free(p); 466 467 return NULL; 468} 469 470void 471u_worker_thread_pool_destroy(struct u_worker_thread_pool *uwtp) 472{ 473 XRT_TRACE_MARKER(); 474 475 struct pool *p = pool(uwtp); 476 477 os_mutex_lock(&p->mutex); 478 479 p->running = false; 480 os_cond_signal(&p->available.cond); 481 os_mutex_unlock(&p->mutex); 482 483 // Wait for all threads. 484 for (size_t i = 0; i < p->thread_count; i++) { 485 os_thread_join(&p->threads[i].thread); 486 os_thread_destroy(&p->threads[i].thread); 487 } 488 489 os_mutex_destroy(&p->mutex); 490 os_cond_destroy(&p->available.cond); 491 492 free(p); 493} 494 495 496/* 497 * 498 * 'Exported' group functions. 499 * 500 */ 501 502struct u_worker_group * 503u_worker_group_create(struct u_worker_thread_pool *uwtp) 504{ 505 XRT_TRACE_MARKER(); 506 507 struct group *g = U_TYPED_CALLOC(struct group); 508 g->base.reference.count = 1; 509 u_worker_thread_pool_reference(&g->uwtp, uwtp); 510 511 os_cond_init(&g->waiting.cond); 512 513 return (struct u_worker_group *)g; 514} 515 516void 517u_worker_group_push(struct u_worker_group *uwg, u_worker_group_func_t f, void *data) 518{ 519 XRT_TRACE_MARKER(); 520 521 struct group *g = group(uwg); 522 struct pool *p = pool(g->uwtp); 523 524 os_mutex_lock(&p->mutex); 525 while (p->tasks_in_array_count >= MAX_TASK_COUNT) { 526 os_mutex_unlock(&p->mutex); 527 528 //! @todo Don't wait all, wait one. 529 u_worker_group_wait_all(uwg); 530 531 os_mutex_lock(&p->mutex); 532 } 533 534 locked_pool_push_task(p, g, f, data); 535 536 // There are worker threads available, wake one up. 537 if (p->available.count > 0) { 538 os_cond_signal(&p->available.cond); 539 } 540 541 os_mutex_unlock(&p->mutex); 542} 543 544void 545u_worker_group_wait_all(struct u_worker_group *uwg) 546{ 547 XRT_TRACE_MARKER(); 548 549 struct group *g = group(uwg); 550 struct pool *p = pool(g->uwtp); 551 552 os_mutex_lock(&p->mutex); 553 554 // Can we early out? 555 if (!locked_group_has_tasks_waiting_or_inflight(g)) { 556 os_mutex_unlock(&p->mutex); 557 return; 558 } 559 560 /* 561 * The released_count is tied to the decrement of worker_limit, that is 562 * when a waiting thread is woken up the worker_limit is decreased, and 563 * released_count is increased. The waiting thread will then double 564 * check that it can be released or not, if it can not be released it 565 * will once again donate this thread and increase the worker_limit. 566 * 567 * If it can be released it will decrement released_count and exit the 568 * loop below. 569 * 570 * So if we increment it here, the loop will increase worker_limit 571 * which is what we want. 572 */ 573 g->released_count++; 574 575 // Wait here until all work been started and completed. 576 while (locked_group_should_wait(p, g)) { 577 // Do the wait. 578 locked_group_wait(p, g); 579 } 580 581 os_mutex_unlock(&p->mutex); 582} 583 584void 585u_worker_group_destroy(struct u_worker_group *uwg) 586{ 587 XRT_TRACE_MARKER(); 588 589 struct group *g = group(uwg); 590 assert(g->base.reference.count == 0); 591 592 u_worker_group_wait_all(uwg); 593 594 u_worker_thread_pool_reference(&g->uwtp, NULL); 595 596 os_cond_destroy(&g->waiting.cond); 597 598 free(uwg); 599}