The open source OpenXR runtime
at main 301 lines 6.5 kB view raw
1// Copyright 2019-2021, Collabora, Ltd. 2// SPDX-License-Identifier: BSL-1.0 3/*! 4 * @file 5 * @brief An @ref xrt_frame_sink queue. 6 * @author Jakob Bornecrantz <jakob@collabora.com> 7 * @ingroup aux_util 8 */ 9 10#include "util/u_misc.h" 11#include "util/u_sink.h" 12#include "util/u_trace_marker.h" 13 14#include <stdio.h> 15#include <pthread.h> 16 17struct u_sink_queue_elem 18{ 19 struct xrt_frame *frame; 20 struct u_sink_queue_elem *next; 21}; 22 23/*! 24 * An @ref xrt_frame_sink queue, any frames received will be pushed to the 25 * downstream consumer on the queue thread. Will drop frames should multiple 26 * frames be queued up. 27 * 28 * @implements xrt_frame_sink 29 * @implements xrt_frame_node 30 */ 31struct u_sink_queue 32{ 33 //! Base sink. 34 struct xrt_frame_sink base; 35 //! For tracking on the frame context. 36 struct xrt_frame_node node; 37 38 //! The consumer of the frames that are queued. 39 struct xrt_frame_sink *consumer; 40 41 //! Front of the queue (oldest frame, first to be consumed) 42 struct u_sink_queue_elem *front; 43 44 //! Back of the queue (newest frame, back->next is always null) 45 struct u_sink_queue_elem *back; 46 47 //! Number of currently enqueued frames 48 uint64_t size; 49 50 //! Max amount of frames before dropping new ones. 0 means unbounded. 51 uint64_t max_size; 52 53 pthread_t thread; 54 pthread_mutex_t mutex; 55 56 //! So we can wake the mainloop up 57 pthread_cond_t cond; 58 59 //! Should we keep running. 60 bool running; 61}; 62 63//! Call with q->mutex locked. 64static bool 65queue_is_empty(struct u_sink_queue *q) 66{ 67 return q->size == 0; 68} 69 70//! Call with q->mutex locked. 71static bool 72queue_is_full(struct u_sink_queue *q) 73{ 74 bool is_unbounded = q->max_size == 0; 75 return q->size >= q->max_size && !is_unbounded; 76} 77 78//! Pops the oldest frame, reference counting unchanged. 79//! Call with q->mutex locked. 80static struct xrt_frame * 81queue_pop(struct u_sink_queue *q) 82{ 83 assert(!queue_is_empty(q)); 84 struct xrt_frame *frame = q->front->frame; 85 struct u_sink_queue_elem *old_front = q->front; 86 q->front = q->front->next; 87 free(old_front); 88 q->size--; 89 if (q->front == NULL) { 90 assert(queue_is_empty(q)); 91 q->back = NULL; 92 } 93 return frame; 94} 95 96//! Tries to push a frame and increases its reference count. 97//! Call with q->mutex locked. 98static bool 99queue_try_refpush(struct u_sink_queue *q, struct xrt_frame *xf) 100{ 101 if (queue_is_full(q)) { 102 // Drop the oldest frame 103 if (!queue_is_empty(q)) { 104 struct xrt_frame *old = queue_pop(q); 105 xrt_frame_reference(&old, NULL); 106 } 107 } 108 struct u_sink_queue_elem *elem = U_TYPED_CALLOC(struct u_sink_queue_elem); 109 xrt_frame_reference(&elem->frame, xf); 110 elem->next = NULL; 111 if (q->back == NULL) { // First frame 112 q->front = elem; 113 } else { // Next frame 114 q->back->next = elem; 115 } 116 q->back = elem; 117 q->size++; 118 return true; 119} 120 121//! Clears the queue and unreferences all of its frames. 122//! Call with q->mutex locked. 123static void 124queue_refclear(struct u_sink_queue *q) 125{ 126 while (!queue_is_empty(q)) { 127 assert((q->size > 1) ^ (q->front == q->back)); 128 struct xrt_frame *xf = queue_pop(q); 129 xrt_frame_reference(&xf, NULL); 130 } 131} 132 133static void * 134queue_mainloop(void *ptr) 135{ 136 U_TRACE_SET_THREAD_NAME("Sink Queue"); 137 138 struct u_sink_queue *q = (struct u_sink_queue *)ptr; 139 struct xrt_frame *frame = NULL; 140 141 pthread_mutex_lock(&q->mutex); 142 143 while (q->running) { 144 145 // No new frame, wait. 146 if (queue_is_empty(q)) { 147 pthread_cond_wait(&q->cond, &q->mutex); 148 } 149 150 // In this case, queue_break_apart woke us up to turn us off. 151 if (!q->running) { 152 break; 153 } 154 155 if (queue_is_empty(q)) { 156 continue; 157 } 158 159 SINK_TRACE_IDENT(queue_frame); 160 161 /* 162 * Dequeue frame. 163 * We need to take a reference on the current frame, this is to 164 * keep it alive during the call to the consumer should it be 165 * replaced. But we no longer need to hold onto the frame on the 166 * queue so we dequeue it. 167 */ 168 frame = queue_pop(q); 169 170 /* 171 * Unlock the mutex when we do the work, so a new frame can be 172 * queued. 173 */ 174 pthread_mutex_unlock(&q->mutex); 175 176 // Send to the consumer that does the work. 177 q->consumer->push_frame(q->consumer, frame); 178 179 /* 180 * Drop our reference we don't need it anymore, or it's held by 181 * the consumer. 182 */ 183 xrt_frame_reference(&frame, NULL); 184 185 // Have to lock it again. 186 pthread_mutex_lock(&q->mutex); 187 } 188 189 pthread_mutex_unlock(&q->mutex); 190 191 return NULL; 192} 193 194static void 195queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf) 196{ 197 SINK_TRACE_MARKER(); 198 199 struct u_sink_queue *q = (struct u_sink_queue *)xfs; 200 201 pthread_mutex_lock(&q->mutex); 202 203 // Only schedule new frames if we are running. 204 if (q->running) { 205 queue_try_refpush(q, xf); 206 } 207 208 // Wake up the thread. 209 pthread_cond_signal(&q->cond); 210 211 pthread_mutex_unlock(&q->mutex); 212} 213 214static void 215queue_break_apart(struct xrt_frame_node *node) 216{ 217 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node); 218 void *retval = NULL; 219 220 // The fields are protected. 221 pthread_mutex_lock(&q->mutex); 222 223 // Stop the thread and inhibit any new frames to be added to the queue. 224 q->running = false; 225 226 // Release any frame waiting for submission. 227 queue_refclear(q); 228 229 // Wake up the thread. 230 pthread_cond_signal(&q->cond); 231 232 // No longer need to protect fields. 233 pthread_mutex_unlock(&q->mutex); 234 235 // Wait for thread to finish. 236 pthread_join(q->thread, &retval); 237} 238 239static void 240queue_destroy(struct xrt_frame_node *node) 241{ 242 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node); 243 244 // Destroy resources. 245 pthread_mutex_destroy(&q->mutex); 246 pthread_cond_destroy(&q->cond); 247 free(q); 248} 249 250 251/* 252 * 253 * Exported functions. 254 * 255 */ 256 257bool 258u_sink_queue_create(struct xrt_frame_context *xfctx, 259 uint64_t max_size, 260 struct xrt_frame_sink *downstream, 261 struct xrt_frame_sink **out_xfs) 262{ 263 struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue); 264 int ret = 0; 265 266 q->base.push_frame = queue_frame; 267 q->node.break_apart = queue_break_apart; 268 q->node.destroy = queue_destroy; 269 q->consumer = downstream; 270 q->running = true; 271 272 q->size = 0; 273 q->max_size = max_size; 274 275 ret = pthread_mutex_init(&q->mutex, NULL); 276 if (ret != 0) { 277 free(q); 278 return false; 279 } 280 281 ret = pthread_cond_init(&q->cond, NULL); 282 if (ret) { 283 pthread_mutex_destroy(&q->mutex); 284 free(q); 285 return false; 286 } 287 288 ret = pthread_create(&q->thread, NULL, queue_mainloop, q); 289 if (ret != 0) { 290 pthread_cond_destroy(&q->cond); 291 pthread_mutex_destroy(&q->mutex); 292 free(q); 293 return false; 294 } 295 296 xrt_frame_context_add(xfctx, &q->node); 297 298 *out_xfs = &q->base; 299 300 return true; 301}