The open source OpenXR runtime
1// Copyright 2019-2021, Collabora, Ltd.
2// SPDX-License-Identifier: BSL-1.0
3/*!
4 * @file
5 * @brief An @ref xrt_frame_sink queue.
6 * @author Jakob Bornecrantz <jakob@collabora.com>
7 * @ingroup aux_util
8 */
9
10#include "util/u_misc.h"
11#include "util/u_sink.h"
12#include "util/u_trace_marker.h"
13
14#include <stdio.h>
15#include <pthread.h>
16
17struct u_sink_queue_elem
18{
19 struct xrt_frame *frame;
20 struct u_sink_queue_elem *next;
21};
22
23/*!
24 * An @ref xrt_frame_sink queue, any frames received will be pushed to the
25 * downstream consumer on the queue thread. Will drop frames should multiple
26 * frames be queued up.
27 *
28 * @implements xrt_frame_sink
29 * @implements xrt_frame_node
30 */
31struct u_sink_queue
32{
33 //! Base sink.
34 struct xrt_frame_sink base;
35 //! For tracking on the frame context.
36 struct xrt_frame_node node;
37
38 //! The consumer of the frames that are queued.
39 struct xrt_frame_sink *consumer;
40
41 //! Front of the queue (oldest frame, first to be consumed)
42 struct u_sink_queue_elem *front;
43
44 //! Back of the queue (newest frame, back->next is always null)
45 struct u_sink_queue_elem *back;
46
47 //! Number of currently enqueued frames
48 uint64_t size;
49
50 //! Max amount of frames before dropping new ones. 0 means unbounded.
51 uint64_t max_size;
52
53 pthread_t thread;
54 pthread_mutex_t mutex;
55
56 //! So we can wake the mainloop up
57 pthread_cond_t cond;
58
59 //! Should we keep running.
60 bool running;
61};
62
63//! Call with q->mutex locked.
64static bool
65queue_is_empty(struct u_sink_queue *q)
66{
67 return q->size == 0;
68}
69
70//! Call with q->mutex locked.
71static bool
72queue_is_full(struct u_sink_queue *q)
73{
74 bool is_unbounded = q->max_size == 0;
75 return q->size >= q->max_size && !is_unbounded;
76}
77
78//! Pops the oldest frame, reference counting unchanged.
79//! Call with q->mutex locked.
80static struct xrt_frame *
81queue_pop(struct u_sink_queue *q)
82{
83 assert(!queue_is_empty(q));
84 struct xrt_frame *frame = q->front->frame;
85 struct u_sink_queue_elem *old_front = q->front;
86 q->front = q->front->next;
87 free(old_front);
88 q->size--;
89 if (q->front == NULL) {
90 assert(queue_is_empty(q));
91 q->back = NULL;
92 }
93 return frame;
94}
95
96//! Tries to push a frame and increases its reference count.
97//! Call with q->mutex locked.
98static bool
99queue_try_refpush(struct u_sink_queue *q, struct xrt_frame *xf)
100{
101 if (queue_is_full(q)) {
102 // Drop the oldest frame
103 if (!queue_is_empty(q)) {
104 struct xrt_frame *old = queue_pop(q);
105 xrt_frame_reference(&old, NULL);
106 }
107 }
108 struct u_sink_queue_elem *elem = U_TYPED_CALLOC(struct u_sink_queue_elem);
109 xrt_frame_reference(&elem->frame, xf);
110 elem->next = NULL;
111 if (q->back == NULL) { // First frame
112 q->front = elem;
113 } else { // Next frame
114 q->back->next = elem;
115 }
116 q->back = elem;
117 q->size++;
118 return true;
119}
120
121//! Clears the queue and unreferences all of its frames.
122//! Call with q->mutex locked.
123static void
124queue_refclear(struct u_sink_queue *q)
125{
126 while (!queue_is_empty(q)) {
127 assert((q->size > 1) ^ (q->front == q->back));
128 struct xrt_frame *xf = queue_pop(q);
129 xrt_frame_reference(&xf, NULL);
130 }
131}
132
133static void *
134queue_mainloop(void *ptr)
135{
136 U_TRACE_SET_THREAD_NAME("Sink Queue");
137
138 struct u_sink_queue *q = (struct u_sink_queue *)ptr;
139 struct xrt_frame *frame = NULL;
140
141 pthread_mutex_lock(&q->mutex);
142
143 while (q->running) {
144
145 // No new frame, wait.
146 if (queue_is_empty(q)) {
147 pthread_cond_wait(&q->cond, &q->mutex);
148 }
149
150 // In this case, queue_break_apart woke us up to turn us off.
151 if (!q->running) {
152 break;
153 }
154
155 if (queue_is_empty(q)) {
156 continue;
157 }
158
159 SINK_TRACE_IDENT(queue_frame);
160
161 /*
162 * Dequeue frame.
163 * We need to take a reference on the current frame, this is to
164 * keep it alive during the call to the consumer should it be
165 * replaced. But we no longer need to hold onto the frame on the
166 * queue so we dequeue it.
167 */
168 frame = queue_pop(q);
169
170 /*
171 * Unlock the mutex when we do the work, so a new frame can be
172 * queued.
173 */
174 pthread_mutex_unlock(&q->mutex);
175
176 // Send to the consumer that does the work.
177 q->consumer->push_frame(q->consumer, frame);
178
179 /*
180 * Drop our reference we don't need it anymore, or it's held by
181 * the consumer.
182 */
183 xrt_frame_reference(&frame, NULL);
184
185 // Have to lock it again.
186 pthread_mutex_lock(&q->mutex);
187 }
188
189 pthread_mutex_unlock(&q->mutex);
190
191 return NULL;
192}
193
194static void
195queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf)
196{
197 SINK_TRACE_MARKER();
198
199 struct u_sink_queue *q = (struct u_sink_queue *)xfs;
200
201 pthread_mutex_lock(&q->mutex);
202
203 // Only schedule new frames if we are running.
204 if (q->running) {
205 queue_try_refpush(q, xf);
206 }
207
208 // Wake up the thread.
209 pthread_cond_signal(&q->cond);
210
211 pthread_mutex_unlock(&q->mutex);
212}
213
214static void
215queue_break_apart(struct xrt_frame_node *node)
216{
217 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
218 void *retval = NULL;
219
220 // The fields are protected.
221 pthread_mutex_lock(&q->mutex);
222
223 // Stop the thread and inhibit any new frames to be added to the queue.
224 q->running = false;
225
226 // Release any frame waiting for submission.
227 queue_refclear(q);
228
229 // Wake up the thread.
230 pthread_cond_signal(&q->cond);
231
232 // No longer need to protect fields.
233 pthread_mutex_unlock(&q->mutex);
234
235 // Wait for thread to finish.
236 pthread_join(q->thread, &retval);
237}
238
239static void
240queue_destroy(struct xrt_frame_node *node)
241{
242 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
243
244 // Destroy resources.
245 pthread_mutex_destroy(&q->mutex);
246 pthread_cond_destroy(&q->cond);
247 free(q);
248}
249
250
251/*
252 *
253 * Exported functions.
254 *
255 */
256
257bool
258u_sink_queue_create(struct xrt_frame_context *xfctx,
259 uint64_t max_size,
260 struct xrt_frame_sink *downstream,
261 struct xrt_frame_sink **out_xfs)
262{
263 struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue);
264 int ret = 0;
265
266 q->base.push_frame = queue_frame;
267 q->node.break_apart = queue_break_apart;
268 q->node.destroy = queue_destroy;
269 q->consumer = downstream;
270 q->running = true;
271
272 q->size = 0;
273 q->max_size = max_size;
274
275 ret = pthread_mutex_init(&q->mutex, NULL);
276 if (ret != 0) {
277 free(q);
278 return false;
279 }
280
281 ret = pthread_cond_init(&q->cond, NULL);
282 if (ret) {
283 pthread_mutex_destroy(&q->mutex);
284 free(q);
285 return false;
286 }
287
288 ret = pthread_create(&q->thread, NULL, queue_mainloop, q);
289 if (ret != 0) {
290 pthread_cond_destroy(&q->cond);
291 pthread_mutex_destroy(&q->mutex);
292 free(q);
293 return false;
294 }
295
296 xrt_frame_context_add(xfctx, &q->node);
297
298 *out_xfs = &q->base;
299
300 return true;
301}