prefect server in zig

replace BoundedChannel with growable Queue

memory broker was pre-allocating 50,000 × 8KB = 400MB for message
storage. replaced with ArrayList-backed Queue that grows on demand,
matching Python's asyncio.Queue() behavior.

memory usage: 432MB → 39MB

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+37 -68
+2 -2
src/broker/CLAUDE.md
··· 8 8 - `Message` - id, topic, data, attributes, timestamp 9 9 - `MessageHandler` - callback for consuming messages 10 10 - `ConsumerHandle` - opaque handle for unsubscribing 11 - - `BoundedChannel` - generic bounded queue for backpressure 11 + - `Queue` - generic growable queue (ArrayList-backed) 12 12 13 13 ## fan-out 14 14 ··· 30 30 31 31 ## memory backend 32 32 33 - uses BoundedChannel (heap-allocated, 50k capacity) per topic. 33 + uses growable Queue (ArrayList-backed) per topic. 34 34 broadcasts to all subscribers (simulates fan-out).
+35 -66
src/broker/core.zig
··· 98 98 }; 99 99 100 100 // ============================================================================ 101 - // Memory Broker - wraps BoundedChannel for in-process messaging 101 + // Memory Broker - uses growable queue for in-process messaging 102 102 // ============================================================================ 103 103 104 104 pub const MemoryBroker = struct { 105 105 const Self = @This(); 106 - const CHANNEL_CAPACITY = 50000; 107 106 108 107 /// Topic subscription 109 108 const Subscription = struct { ··· 113 112 active: bool, 114 113 }; 115 114 116 - /// Topic with its channel and subscriptions 115 + /// Topic with its queue and subscriptions 117 116 const Topic = struct { 118 - channel: BoundedChannel(StoredMessage, CHANNEL_CAPACITY), 117 + queue: Queue(StoredMessage), 119 118 subscriptions: std.ArrayListUnmanaged(Subscription), 120 119 worker_thread: ?Thread, 121 120 running: bool, ··· 146 145 self.alloc.free(sub.group); 147 146 } 148 147 entry.value_ptr.*.subscriptions.deinit(self.alloc); 149 - entry.value_ptr.*.channel.deinit(); 148 + entry.value_ptr.*.queue.deinit(); 150 149 self.alloc.destroy(entry.value_ptr.*); 151 150 } 152 151 self.topics.deinit(self.alloc); ··· 163 162 164 163 const topic = try self.alloc.create(Topic); 165 164 topic.* = .{ 166 - .channel = try BoundedChannel(StoredMessage, CHANNEL_CAPACITY).init(self.alloc), 165 + .queue = Queue(StoredMessage).init(self.alloc), 167 166 .subscriptions = .{}, 168 167 .worker_thread = null, 169 168 .running = false, ··· 190 189 log.warn("broker", "message {s} truncated", .{msg.id[0..@min(msg.id_len, 36)]}); 191 190 } 192 191 193 - if (!topic.channel.trySend(msg)) { 194 - log.warn("broker", "backpressure: message dropped on topic {s}", .{topic_name}); 195 - return error.ChannelFull; 196 - } 192 + topic.queue.push(msg) catch { 193 + log.warn("broker", "failed to enqueue message on topic {s}", .{topic_name}); 194 + return error.QueueError; 195 + }; 197 196 } 198 197 199 198 pub fn subscribe(self: *Self, topic_name: []const u8, group: []const u8, handler: MessageHandler) !ConsumerHandle { ··· 258 257 topic.running = false; 259 258 topic.mutex.unlock(); 260 259 261 - topic.channel.close(); 260 + topic.queue.close(); 262 261 263 262 if (topic.worker_thread) |t| { 264 263 t.join(); ··· 275 274 const should_run = topic.running; 276 275 topic.mutex.unlock(); 277 276 278 - if (!should_run and topic.channel.len() == 0) break; 277 + if (!should_run and topic.queue.len() == 0) break; 279 278 280 - if (topic.channel.receiveTimeout(timeout_ns)) |msg| { 279 + if (topic.queue.popTimeout(timeout_ns)) |msg| { 281 280 // create Message from StoredMessage with proper topic 282 281 const message = Message{ 283 282 .id = msg.id[0..msg.id_len], ··· 310 309 pub const RedisClient = redis.RedisClient; 311 310 312 311 // ============================================================================ 313 - // Storage types for bounded channel 312 + // Storage types for message queue 314 313 // ============================================================================ 315 314 316 315 /// Stored message data with fixed-size buffers (no allocation in hot path) ··· 331 330 } 332 331 333 332 // ============================================================================ 334 - // Bounded Channel (moved from messaging.zig for reuse) 333 + // Growable Queue - unbounded, like Python's asyncio.Queue 335 334 // ============================================================================ 336 335 337 - pub fn BoundedChannel(comptime T: type, comptime capacity: usize) type { 336 + pub fn Queue(comptime T: type) type { 338 337 return struct { 339 338 const Self = @This(); 340 339 341 - buffer: []T, 340 + items: std.ArrayListUnmanaged(T), 342 341 alloc: Allocator, 343 - head: usize = 0, 344 - tail: usize = 0, 345 - count: usize = 0, 346 - mutex: Thread.Mutex = .{}, 347 - not_empty: Thread.Condition = .{}, 348 - not_full: Thread.Condition = .{}, 349 - closed: bool = false, 342 + mutex: Thread.Mutex, 343 + not_empty: Thread.Condition, 344 + closed: bool, 350 345 351 - pub fn init(alloc: Allocator) !Self { 352 - const buffer = try alloc.alloc(T, capacity); 346 + pub fn init(alloc: Allocator) Self { 353 347 return .{ 354 - .buffer = buffer, 348 + .items = .{}, 355 349 .alloc = alloc, 350 + .mutex = .{}, 351 + .not_empty = .{}, 352 + .closed = false, 356 353 }; 357 354 } 358 355 359 356 pub fn deinit(self: *Self) void { 360 - self.alloc.free(self.buffer); 357 + self.items.deinit(self.alloc); 361 358 } 362 359 363 - pub fn trySend(self: *Self, item: T) bool { 360 + /// Push item to queue (grows as needed) 361 + pub fn push(self: *Self, item: T) !void { 364 362 self.mutex.lock(); 365 363 defer self.mutex.unlock(); 366 364 367 - if (self.closed or self.count >= self.buffer.len) { 368 - return false; 369 - } 365 + if (self.closed) return error.QueueClosed; 370 366 371 - self.buffer[self.tail] = item; 372 - self.tail = (self.tail + 1) % self.buffer.len; 373 - self.count += 1; 367 + try self.items.append(self.alloc, item); 374 368 self.not_empty.signal(); 375 - return true; 376 369 } 377 370 378 - pub fn receiveTimeout(self: *Self, timeout_ns: u64) ?T { 371 + /// Pop with timeout (returns null on timeout or if closed with empty queue) 372 + pub fn popTimeout(self: *Self, timeout_ns: u64) ?T { 379 373 self.mutex.lock(); 380 374 defer self.mutex.unlock(); 381 375 382 - while (self.count == 0 and !self.closed) { 376 + while (self.items.items.len == 0 and !self.closed) { 383 377 self.not_empty.timedWait(&self.mutex, timeout_ns) catch { 384 378 return null; 385 379 }; 386 380 } 387 381 388 - if (self.count == 0) return null; 389 - 390 - const item = self.buffer[self.head]; 391 - self.head = (self.head + 1) % self.buffer.len; 392 - self.count -= 1; 393 - self.not_full.signal(); 394 - return item; 395 - } 382 + if (self.items.items.len == 0) return null; 396 383 397 - pub fn drain(self: *Self, out: []T, max: usize) usize { 398 - self.mutex.lock(); 399 - defer self.mutex.unlock(); 400 - 401 - const to_drain = @min(self.count, @min(max, out.len)); 402 - for (0..to_drain) |i| { 403 - out[i] = self.buffer[self.head]; 404 - self.head = (self.head + 1) % self.buffer.len; 405 - } 406 - self.count -= to_drain; 407 - if (to_drain > 0) self.not_full.broadcast(); 408 - return to_drain; 384 + return self.items.orderedRemove(0); 409 385 } 410 386 411 387 pub fn len(self: *Self) usize { 412 388 self.mutex.lock(); 413 389 defer self.mutex.unlock(); 414 - return self.count; 390 + return self.items.items.len; 415 391 } 416 392 417 393 pub fn close(self: *Self) void { ··· 419 395 defer self.mutex.unlock(); 420 396 self.closed = true; 421 397 self.not_empty.broadcast(); 422 - self.not_full.broadcast(); 423 - } 424 - 425 - pub fn isClosed(self: *Self) bool { 426 - self.mutex.lock(); 427 - defer self.mutex.unlock(); 428 - return self.closed; 429 398 } 430 399 }; 431 400 }