The open source OpenXR runtime
1// Copyright 2019-2021, Collabora, Ltd.
2// SPDX-License-Identifier: BSL-1.0
3/*!
4 * @file
5 * @brief An @ref xrt_frame_sink queue.
6 * @author Jakob Bornecrantz <jakob@collabora.com>
7 * @ingroup aux_util
8 */
9
10#include "util/u_misc.h"
11#include "util/u_sink.h"
12#include "util/u_trace_marker.h"
13
14#include <stdio.h>
15#include <pthread.h>
16
17
18/*!
19 * An @ref xrt_frame_sink queue that holds buffers a single xrt_frame
20 * and passes it to the downstream consumer on the queue thread. If another
21 * frame arrives before the current frame is consumed, the new frame replaces
22 * the old one.
23 *
24 * @implements xrt_frame_sink
25 * @implements xrt_frame_node
26 */
27struct u_sink_queue
28{
29 //! Base sink.
30 struct xrt_frame_sink base;
31 //! For tracking on the frame context.
32 struct xrt_frame_node node;
33
34 //! The consumer of the frames that are queued.
35 struct xrt_frame_sink *consumer;
36
37 //! The current queued frame.
38 struct xrt_frame *frame;
39
40 pthread_t thread;
41 pthread_mutex_t mutex;
42 pthread_cond_t cond;
43
44 struct
45 {
46 uint64_t current;
47 uint64_t last;
48 } seq;
49
50 //! Should we keep running.
51 bool running;
52};
53
54static void *
55queue_mainloop(void *ptr)
56{
57 U_TRACE_SET_THREAD_NAME("Sink Queue");
58
59 struct u_sink_queue *q = (struct u_sink_queue *)ptr;
60 struct xrt_frame *frame = NULL;
61
62 pthread_mutex_lock(&q->mutex);
63
64 while (q->running) {
65
66 // No new frame, wait.
67 if (q->seq.last >= q->seq.current) {
68 pthread_cond_wait(&q->cond, &q->mutex);
69 }
70
71 // Where we woken up to turn off.
72 if (!q->running) {
73 break;
74 }
75
76 // Just in case.
77 if (q->seq.last >= q->seq.current || q->frame == NULL) {
78 continue;
79 }
80
81 SINK_TRACE_IDENT(queue_frame);
82
83 // We have a new frame, send it out.
84 q->seq.last = q->seq.current;
85
86 /*
87 * We need to take a reference on the current frame, this is to
88 * keep it alive during the call to the consumer should it be
89 * replaced. But we no longer need to hold onto the frame on the
90 * queue so we move the pointer.
91 */
92 frame = q->frame;
93 q->frame = NULL;
94
95 /*
96 * Unlock the mutex when we do the work, so a new frame can be
97 * queued.
98 */
99 pthread_mutex_unlock(&q->mutex);
100
101 // Send to the consumer that does the work.
102 q->consumer->push_frame(q->consumer, frame);
103
104 /*
105 * Drop our reference we don't need it anymore, or it's held by
106 * the consumer.
107 */
108 xrt_frame_reference(&frame, NULL);
109
110 // Have to lock it again.
111 pthread_mutex_lock(&q->mutex);
112 }
113
114 pthread_mutex_unlock(&q->mutex);
115
116 return NULL;
117}
118
119static void
120queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf)
121{
122 SINK_TRACE_MARKER();
123
124 struct u_sink_queue *q = (struct u_sink_queue *)xfs;
125
126 pthread_mutex_lock(&q->mutex);
127
128 // Only schedule new frames if we are running.
129 if (q->running) {
130 q->seq.current++;
131 xrt_frame_reference(&q->frame, xf);
132 }
133
134 // Wake up the thread.
135 pthread_cond_signal(&q->cond);
136
137 pthread_mutex_unlock(&q->mutex);
138}
139
140static void
141queue_break_apart(struct xrt_frame_node *node)
142{
143 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
144 void *retval = NULL;
145
146 // The fields are protected.
147 pthread_mutex_lock(&q->mutex);
148
149 // Stop the thread and inhibit any new frames to be added to the queue.
150 q->running = false;
151
152 // Release any frame waiting for submission.
153 xrt_frame_reference(&q->frame, NULL);
154
155 // Wake up the thread.
156 pthread_cond_signal(&q->cond);
157
158 // No longer need to protect fields.
159 pthread_mutex_unlock(&q->mutex);
160
161 // Wait for thread to finish.
162 pthread_join(q->thread, &retval);
163}
164
165static void
166queue_destroy(struct xrt_frame_node *node)
167{
168 struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
169
170 // Destroy resources.
171 pthread_mutex_destroy(&q->mutex);
172 pthread_cond_destroy(&q->cond);
173 free(q);
174}
175
176
177/*
178 *
179 * Exported functions.
180 *
181 */
182
183bool
184u_sink_simple_queue_create(struct xrt_frame_context *xfctx,
185 struct xrt_frame_sink *downstream,
186 struct xrt_frame_sink **out_xfs)
187{
188 struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue);
189 int ret = 0;
190
191 q->base.push_frame = queue_frame;
192 q->node.break_apart = queue_break_apart;
193 q->node.destroy = queue_destroy;
194 q->consumer = downstream;
195 q->running = true;
196
197 ret = pthread_mutex_init(&q->mutex, NULL);
198 if (ret != 0) {
199 free(q);
200 return false;
201 }
202
203 ret = pthread_cond_init(&q->cond, NULL);
204 if (ret) {
205 pthread_mutex_destroy(&q->mutex);
206 free(q);
207 return false;
208 }
209
210 ret = pthread_create(&q->thread, NULL, queue_mainloop, q);
211 if (ret != 0) {
212 pthread_cond_destroy(&q->cond);
213 pthread_mutex_destroy(&q->mutex);
214 free(q);
215 return false;
216 }
217
218 xrt_frame_context_add(xfctx, &q->node);
219
220 *out_xfs = &q->base;
221
222 return true;
223}