prefect server in zig
at main 240 lines 7.4 kB view raw
1const std = @import("std"); 2const Thread = std.Thread; 3const Mutex = Thread.Mutex; 4const log = @import("../logging.zig"); 5 6/// Message wrapper for broker 7pub const Message = struct { 8 id: []const u8, 9 topic: []const u8, 10 payload: []const u8, 11 timestamp: i64, 12}; 13 14/// Bounded channel for message passing 15pub fn BoundedChannel(comptime T: type, comptime capacity: usize) type { 16 return struct { 17 const Self = @This(); 18 19 buffer: [capacity]T = undefined, 20 head: usize = 0, 21 tail: usize = 0, 22 count: usize = 0, 23 mutex: Mutex = .{}, 24 not_empty: Thread.Condition = .{}, 25 not_full: Thread.Condition = .{}, 26 closed: bool = false, 27 28 pub fn init() Self { 29 return .{}; 30 } 31 32 /// Try to send without blocking. Returns false if full or closed. 33 pub fn trySend(self: *Self, item: T) bool { 34 self.mutex.lock(); 35 defer self.mutex.unlock(); 36 37 if (self.closed or self.count >= capacity) { 38 return false; 39 } 40 41 self.buffer[self.tail] = item; 42 self.tail = (self.tail + 1) % capacity; 43 self.count += 1; 44 self.not_empty.signal(); 45 return true; 46 } 47 48 /// Receive with timeout. Returns null if closed or timeout. 49 pub fn receiveTimeout(self: *Self, timeout_ns: u64) ?T { 50 self.mutex.lock(); 51 defer self.mutex.unlock(); 52 53 while (self.count == 0 and !self.closed) { 54 self.not_empty.timedWait(&self.mutex, timeout_ns) catch { 55 return null; 56 }; 57 } 58 59 if (self.count == 0) return null; 60 61 const item = self.buffer[self.head]; 62 self.head = (self.head + 1) % capacity; 63 self.count -= 1; 64 self.not_full.signal(); 65 return item; 66 } 67 68 /// Drain up to max items into provided slice. Returns count drained. 69 pub fn drain(self: *Self, out: []T, max: usize) usize { 70 self.mutex.lock(); 71 defer self.mutex.unlock(); 72 73 const to_drain = @min(self.count, @min(max, out.len)); 74 for (0..to_drain) |i| { 75 out[i] = self.buffer[self.head]; 76 self.head = (self.head + 1) % capacity; 77 } 78 self.count -= to_drain; 79 if (to_drain > 0) self.not_full.broadcast(); 80 return to_drain; 81 } 82 83 pub fn len(self: *Self) usize { 84 self.mutex.lock(); 85 defer self.mutex.unlock(); 86 return self.count; 87 } 88 89 pub fn close(self: *Self) void { 90 self.mutex.lock(); 91 defer self.mutex.unlock(); 92 self.closed = true; 93 self.not_empty.broadcast(); 94 self.not_full.broadcast(); 95 } 96 97 pub fn isClosed(self: *Self) bool { 98 self.mutex.lock(); 99 defer self.mutex.unlock(); 100 return self.closed; 101 } 102 }; 103} 104 105/// Stored event data (owns memory via fixed buffers) 106/// Note: data exceeding buffer sizes is truncated and logged 107pub const StoredEvent = struct { 108 id: [64]u8, 109 id_len: usize, 110 occurred: [32]u8, 111 occurred_len: usize, 112 event_name: [128]u8, 113 event_name_len: usize, 114 resource_id: [256]u8, 115 resource_id_len: usize, 116 resource: [2048]u8, // resource object JSON 117 resource_len: usize, 118 payload: [4096]u8, // payload object JSON 119 payload_len: usize, 120 related: [2048]u8, // related array JSON 121 related_len: usize, 122 follows: [64]u8, // follows UUID (empty if null) 123 follows_len: usize, 124 truncated: bool, // true if any field was truncated 125 126 pub fn idSlice(self: *const StoredEvent) []const u8 { 127 return self.id[0..self.id_len]; 128 } 129 130 pub fn occurredSlice(self: *const StoredEvent) []const u8 { 131 return self.occurred[0..self.occurred_len]; 132 } 133 134 pub fn eventNameSlice(self: *const StoredEvent) []const u8 { 135 return self.event_name[0..self.event_name_len]; 136 } 137 138 pub fn resourceIdSlice(self: *const StoredEvent) []const u8 { 139 return self.resource_id[0..self.resource_id_len]; 140 } 141 142 pub fn resourceSlice(self: *const StoredEvent) []const u8 { 143 return self.resource[0..self.resource_len]; 144 } 145 146 pub fn payloadSlice(self: *const StoredEvent) []const u8 { 147 return self.payload[0..self.payload_len]; 148 } 149 150 pub fn relatedSlice(self: *const StoredEvent) []const u8 { 151 return self.related[0..self.related_len]; 152 } 153 154 pub fn followsSlice(self: *const StoredEvent) ?[]const u8 { 155 if (self.follows_len == 0) return null; 156 return self.follows[0..self.follows_len]; 157 } 158}; 159 160/// Event channel with 50k capacity (matches Prefect's backpressure limit) 161pub const EventChannel = BoundedChannel(StoredEvent, 50000); 162 163/// Global event channel 164var event_channel: EventChannel = EventChannel.init(); 165var dropped_count: usize = 0; 166var dropped_mutex: Mutex = .{}; 167 168/// Publish an event to the events topic. Returns false if dropped due to backpressure. 169pub fn publishEvent( 170 id: []const u8, 171 occurred: []const u8, 172 event_name: []const u8, 173 resource_id: []const u8, 174 resource: []const u8, 175 payload: []const u8, 176 related: []const u8, 177 follows: ?[]const u8, 178) bool { 179 var stored: StoredEvent = undefined; 180 stored.truncated = false; 181 182 // helper to copy with truncation tracking 183 const copyField = struct { 184 fn f(dest: []u8, src: []const u8, truncated: *bool) usize { 185 const copy_len = @min(src.len, dest.len); 186 @memcpy(dest[0..copy_len], src[0..copy_len]); 187 if (src.len > dest.len) truncated.* = true; 188 return copy_len; 189 } 190 }.f; 191 192 stored.id_len = copyField(&stored.id, id, &stored.truncated); 193 stored.occurred_len = copyField(&stored.occurred, occurred, &stored.truncated); 194 stored.event_name_len = copyField(&stored.event_name, event_name, &stored.truncated); 195 stored.resource_id_len = copyField(&stored.resource_id, resource_id, &stored.truncated); 196 stored.resource_len = copyField(&stored.resource, resource, &stored.truncated); 197 stored.payload_len = copyField(&stored.payload, payload, &stored.truncated); 198 stored.related_len = copyField(&stored.related, related, &stored.truncated); 199 200 if (follows) |f| { 201 stored.follows_len = copyField(&stored.follows, f, &stored.truncated); 202 } else { 203 stored.follows_len = 0; 204 } 205 206 if (stored.truncated) { 207 log.warn("events", "event {s} truncated (data exceeded buffer)", .{stored.id[0..@min(stored.id_len, 36)]}); 208 } 209 210 if (!event_channel.trySend(stored)) { 211 dropped_mutex.lock(); 212 dropped_count += 1; 213 const count = dropped_count; 214 dropped_mutex.unlock(); 215 216 if (count % 100 == 1) { 217 log.warn("events", "backpressure: dropped {d} events", .{count}); 218 } 219 return false; 220 } 221 222 return true; 223} 224 225/// Get the event channel for consumers 226pub fn getEventChannel() *EventChannel { 227 return &event_channel; 228} 229 230/// Get dropped event count 231pub fn getDroppedCount() usize { 232 dropped_mutex.lock(); 233 defer dropped_mutex.unlock(); 234 return dropped_count; 235} 236 237/// Close the event channel (for shutdown) 238pub fn close() void { 239 event_channel.close(); 240}