prefect server in zig
1const std = @import("std");
2const Thread = std.Thread;
3const Mutex = Thread.Mutex;
4const log = @import("../logging.zig");
5
6/// Message wrapper for broker
7pub const Message = struct {
8 id: []const u8,
9 topic: []const u8,
10 payload: []const u8,
11 timestamp: i64,
12};
13
14/// Bounded channel for message passing
15pub fn BoundedChannel(comptime T: type, comptime capacity: usize) type {
16 return struct {
17 const Self = @This();
18
19 buffer: [capacity]T = undefined,
20 head: usize = 0,
21 tail: usize = 0,
22 count: usize = 0,
23 mutex: Mutex = .{},
24 not_empty: Thread.Condition = .{},
25 not_full: Thread.Condition = .{},
26 closed: bool = false,
27
28 pub fn init() Self {
29 return .{};
30 }
31
32 /// Try to send without blocking. Returns false if full or closed.
33 pub fn trySend(self: *Self, item: T) bool {
34 self.mutex.lock();
35 defer self.mutex.unlock();
36
37 if (self.closed or self.count >= capacity) {
38 return false;
39 }
40
41 self.buffer[self.tail] = item;
42 self.tail = (self.tail + 1) % capacity;
43 self.count += 1;
44 self.not_empty.signal();
45 return true;
46 }
47
48 /// Receive with timeout. Returns null if closed or timeout.
49 pub fn receiveTimeout(self: *Self, timeout_ns: u64) ?T {
50 self.mutex.lock();
51 defer self.mutex.unlock();
52
53 while (self.count == 0 and !self.closed) {
54 self.not_empty.timedWait(&self.mutex, timeout_ns) catch {
55 return null;
56 };
57 }
58
59 if (self.count == 0) return null;
60
61 const item = self.buffer[self.head];
62 self.head = (self.head + 1) % capacity;
63 self.count -= 1;
64 self.not_full.signal();
65 return item;
66 }
67
68 /// Drain up to max items into provided slice. Returns count drained.
69 pub fn drain(self: *Self, out: []T, max: usize) usize {
70 self.mutex.lock();
71 defer self.mutex.unlock();
72
73 const to_drain = @min(self.count, @min(max, out.len));
74 for (0..to_drain) |i| {
75 out[i] = self.buffer[self.head];
76 self.head = (self.head + 1) % capacity;
77 }
78 self.count -= to_drain;
79 if (to_drain > 0) self.not_full.broadcast();
80 return to_drain;
81 }
82
83 pub fn len(self: *Self) usize {
84 self.mutex.lock();
85 defer self.mutex.unlock();
86 return self.count;
87 }
88
89 pub fn close(self: *Self) void {
90 self.mutex.lock();
91 defer self.mutex.unlock();
92 self.closed = true;
93 self.not_empty.broadcast();
94 self.not_full.broadcast();
95 }
96
97 pub fn isClosed(self: *Self) bool {
98 self.mutex.lock();
99 defer self.mutex.unlock();
100 return self.closed;
101 }
102 };
103}
104
105/// Stored event data (owns memory via fixed buffers)
106/// Note: data exceeding buffer sizes is truncated and logged
107pub const StoredEvent = struct {
108 id: [64]u8,
109 id_len: usize,
110 occurred: [32]u8,
111 occurred_len: usize,
112 event_name: [128]u8,
113 event_name_len: usize,
114 resource_id: [256]u8,
115 resource_id_len: usize,
116 resource: [2048]u8, // resource object JSON
117 resource_len: usize,
118 payload: [4096]u8, // payload object JSON
119 payload_len: usize,
120 related: [2048]u8, // related array JSON
121 related_len: usize,
122 follows: [64]u8, // follows UUID (empty if null)
123 follows_len: usize,
124 truncated: bool, // true if any field was truncated
125
126 pub fn idSlice(self: *const StoredEvent) []const u8 {
127 return self.id[0..self.id_len];
128 }
129
130 pub fn occurredSlice(self: *const StoredEvent) []const u8 {
131 return self.occurred[0..self.occurred_len];
132 }
133
134 pub fn eventNameSlice(self: *const StoredEvent) []const u8 {
135 return self.event_name[0..self.event_name_len];
136 }
137
138 pub fn resourceIdSlice(self: *const StoredEvent) []const u8 {
139 return self.resource_id[0..self.resource_id_len];
140 }
141
142 pub fn resourceSlice(self: *const StoredEvent) []const u8 {
143 return self.resource[0..self.resource_len];
144 }
145
146 pub fn payloadSlice(self: *const StoredEvent) []const u8 {
147 return self.payload[0..self.payload_len];
148 }
149
150 pub fn relatedSlice(self: *const StoredEvent) []const u8 {
151 return self.related[0..self.related_len];
152 }
153
154 pub fn followsSlice(self: *const StoredEvent) ?[]const u8 {
155 if (self.follows_len == 0) return null;
156 return self.follows[0..self.follows_len];
157 }
158};
159
160/// Event channel with 50k capacity (matches Prefect's backpressure limit)
161pub const EventChannel = BoundedChannel(StoredEvent, 50000);
162
163/// Global event channel
164var event_channel: EventChannel = EventChannel.init();
165var dropped_count: usize = 0;
166var dropped_mutex: Mutex = .{};
167
168/// Publish an event to the events topic. Returns false if dropped due to backpressure.
169pub fn publishEvent(
170 id: []const u8,
171 occurred: []const u8,
172 event_name: []const u8,
173 resource_id: []const u8,
174 resource: []const u8,
175 payload: []const u8,
176 related: []const u8,
177 follows: ?[]const u8,
178) bool {
179 var stored: StoredEvent = undefined;
180 stored.truncated = false;
181
182 // helper to copy with truncation tracking
183 const copyField = struct {
184 fn f(dest: []u8, src: []const u8, truncated: *bool) usize {
185 const copy_len = @min(src.len, dest.len);
186 @memcpy(dest[0..copy_len], src[0..copy_len]);
187 if (src.len > dest.len) truncated.* = true;
188 return copy_len;
189 }
190 }.f;
191
192 stored.id_len = copyField(&stored.id, id, &stored.truncated);
193 stored.occurred_len = copyField(&stored.occurred, occurred, &stored.truncated);
194 stored.event_name_len = copyField(&stored.event_name, event_name, &stored.truncated);
195 stored.resource_id_len = copyField(&stored.resource_id, resource_id, &stored.truncated);
196 stored.resource_len = copyField(&stored.resource, resource, &stored.truncated);
197 stored.payload_len = copyField(&stored.payload, payload, &stored.truncated);
198 stored.related_len = copyField(&stored.related, related, &stored.truncated);
199
200 if (follows) |f| {
201 stored.follows_len = copyField(&stored.follows, f, &stored.truncated);
202 } else {
203 stored.follows_len = 0;
204 }
205
206 if (stored.truncated) {
207 log.warn("events", "event {s} truncated (data exceeded buffer)", .{stored.id[0..@min(stored.id_len, 36)]});
208 }
209
210 if (!event_channel.trySend(stored)) {
211 dropped_mutex.lock();
212 dropped_count += 1;
213 const count = dropped_count;
214 dropped_mutex.unlock();
215
216 if (count % 100 == 1) {
217 log.warn("events", "backpressure: dropped {d} events", .{count});
218 }
219 return false;
220 }
221
222 return true;
223}
224
225/// Get the event channel for consumers
226pub fn getEventChannel() *EventChannel {
227 return &event_channel;
228}
229
230/// Get dropped event count
231pub fn getDroppedCount() usize {
232 dropped_mutex.lock();
233 defer dropped_mutex.unlock();
234 return dropped_count;
235}
236
237/// Close the event channel (for shutdown)
238pub fn close() void {
239 event_channel.close();
240}