The open source OpenXR runtime
at main 223 lines 4.6 kB view raw
1// Copyright 2019-2021, Collabora, Ltd. 2// SPDX-License-Identifier: BSL-1.0 3/*! 4 * @file 5 * @brief An @ref xrt_frame_sink queue. 6 * @author Jakob Bornecrantz <jakob@collabora.com> 7 * @ingroup aux_util 8 */ 9 10#include "util/u_misc.h" 11#include "util/u_sink.h" 12#include "util/u_trace_marker.h" 13 14#include <stdio.h> 15#include <pthread.h> 16 17 18/*! 19 * An @ref xrt_frame_sink queue that holds buffers a single xrt_frame 20 * and passes it to the downstream consumer on the queue thread. If another 21 * frame arrives before the current frame is consumed, the new frame replaces 22 * the old one. 23 * 24 * @implements xrt_frame_sink 25 * @implements xrt_frame_node 26 */ 27struct u_sink_queue 28{ 29 //! Base sink. 30 struct xrt_frame_sink base; 31 //! For tracking on the frame context. 32 struct xrt_frame_node node; 33 34 //! The consumer of the frames that are queued. 35 struct xrt_frame_sink *consumer; 36 37 //! The current queued frame. 38 struct xrt_frame *frame; 39 40 pthread_t thread; 41 pthread_mutex_t mutex; 42 pthread_cond_t cond; 43 44 struct 45 { 46 uint64_t current; 47 uint64_t last; 48 } seq; 49 50 //! Should we keep running. 51 bool running; 52}; 53 54static void * 55queue_mainloop(void *ptr) 56{ 57 U_TRACE_SET_THREAD_NAME("Sink Queue"); 58 59 struct u_sink_queue *q = (struct u_sink_queue *)ptr; 60 struct xrt_frame *frame = NULL; 61 62 pthread_mutex_lock(&q->mutex); 63 64 while (q->running) { 65 66 // No new frame, wait. 67 if (q->seq.last >= q->seq.current) { 68 pthread_cond_wait(&q->cond, &q->mutex); 69 } 70 71 // Where we woken up to turn off. 72 if (!q->running) { 73 break; 74 } 75 76 // Just in case. 77 if (q->seq.last >= q->seq.current || q->frame == NULL) { 78 continue; 79 } 80 81 SINK_TRACE_IDENT(queue_frame); 82 83 // We have a new frame, send it out. 84 q->seq.last = q->seq.current; 85 86 /* 87 * We need to take a reference on the current frame, this is to 88 * keep it alive during the call to the consumer should it be 89 * replaced. But we no longer need to hold onto the frame on the 90 * queue so we move the pointer. 91 */ 92 frame = q->frame; 93 q->frame = NULL; 94 95 /* 96 * Unlock the mutex when we do the work, so a new frame can be 97 * queued. 98 */ 99 pthread_mutex_unlock(&q->mutex); 100 101 // Send to the consumer that does the work. 102 q->consumer->push_frame(q->consumer, frame); 103 104 /* 105 * Drop our reference we don't need it anymore, or it's held by 106 * the consumer. 107 */ 108 xrt_frame_reference(&frame, NULL); 109 110 // Have to lock it again. 111 pthread_mutex_lock(&q->mutex); 112 } 113 114 pthread_mutex_unlock(&q->mutex); 115 116 return NULL; 117} 118 119static void 120queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf) 121{ 122 SINK_TRACE_MARKER(); 123 124 struct u_sink_queue *q = (struct u_sink_queue *)xfs; 125 126 pthread_mutex_lock(&q->mutex); 127 128 // Only schedule new frames if we are running. 129 if (q->running) { 130 q->seq.current++; 131 xrt_frame_reference(&q->frame, xf); 132 } 133 134 // Wake up the thread. 135 pthread_cond_signal(&q->cond); 136 137 pthread_mutex_unlock(&q->mutex); 138} 139 140static void 141queue_break_apart(struct xrt_frame_node *node) 142{ 143 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node); 144 void *retval = NULL; 145 146 // The fields are protected. 147 pthread_mutex_lock(&q->mutex); 148 149 // Stop the thread and inhibit any new frames to be added to the queue. 150 q->running = false; 151 152 // Release any frame waiting for submission. 153 xrt_frame_reference(&q->frame, NULL); 154 155 // Wake up the thread. 156 pthread_cond_signal(&q->cond); 157 158 // No longer need to protect fields. 159 pthread_mutex_unlock(&q->mutex); 160 161 // Wait for thread to finish. 162 pthread_join(q->thread, &retval); 163} 164 165static void 166queue_destroy(struct xrt_frame_node *node) 167{ 168 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node); 169 170 // Destroy resources. 171 pthread_mutex_destroy(&q->mutex); 172 pthread_cond_destroy(&q->cond); 173 free(q); 174} 175 176 177/* 178 * 179 * Exported functions. 180 * 181 */ 182 183bool 184u_sink_simple_queue_create(struct xrt_frame_context *xfctx, 185 struct xrt_frame_sink *downstream, 186 struct xrt_frame_sink **out_xfs) 187{ 188 struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue); 189 int ret = 0; 190 191 q->base.push_frame = queue_frame; 192 q->node.break_apart = queue_break_apart; 193 q->node.destroy = queue_destroy; 194 q->consumer = downstream; 195 q->running = true; 196 197 ret = pthread_mutex_init(&q->mutex, NULL); 198 if (ret != 0) { 199 free(q); 200 return false; 201 } 202 203 ret = pthread_cond_init(&q->cond, NULL); 204 if (ret) { 205 pthread_mutex_destroy(&q->mutex); 206 free(q); 207 return false; 208 } 209 210 ret = pthread_create(&q->thread, NULL, queue_mainloop, q); 211 if (ret != 0) { 212 pthread_cond_destroy(&q->cond); 213 pthread_mutex_destroy(&q->mutex); 214 free(q); 215 return false; 216 } 217 218 xrt_frame_context_add(xfctx, &q->node); 219 220 *out_xfs = &q->base; 221 222 return true; 223}