atproto relay implementation in zig zlay.waow.tech

perf: reduce memory footprint — gate history, drop outbuf, halve stacks

- skip 50K frame ring buffer when disk persist is active (always in prod)
- remove redundant outbuf in event_log; flush writes directly from evtbuf
- reduce thread stacks from 2 MiB to 1 MiB (~2,700 threads)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+13 -15
+4 -2
src/broadcaster.zig
··· 362 pub fn broadcast(self: *Broadcaster, seq: u64, data: []const u8) void { 363 self.stats.seq.store(seq, .release); 364 365 - // add to history for cursor replay 366 - _ = self.history.push(seq, data); 367 368 // create one shared frame for all consumers 369 const frame = SharedFrame.create(self.allocator, data) catch return;
··· 362 pub fn broadcast(self: *Broadcaster, seq: u64, data: []const u8) void { 363 self.stats.seq.store(seq, .release); 364 365 + // add to history for cursor replay (only when no disk persist) 366 + if (self.persist == null) { 367 + _ = self.history.push(seq, data); 368 + } 369 370 // create one shared frame for all consumers 371 const frame = SharedFrame.create(self.allocator, data) catch return;
+7 -11
src/event_log.zig
··· 97 max_did_cache_size: u32 = 500_000, 98 99 // write buffer (flushed periodically or when threshold hit) 100 - outbuf: std.ArrayListUnmanaged(u8) = .{}, 101 evtbuf: std.ArrayListUnmanaged(PersistJob) = .{}, 102 mutex: std.Thread.Mutex = .{}, 103 ··· 229 // free write buffer 230 for (self.evtbuf.items) |job| self.allocator.free(job.data); 231 self.evtbuf.deinit(self.allocator); 232 - self.outbuf.deinit(self.allocator); 233 234 // free DID cache keys 235 { ··· 623 std.mem.writeInt(u64, data[20..28], seq, .little); 624 625 try self.evtbuf.append(self.allocator, .{ .data = data, .seq = seq }); 626 - try self.outbuf.appendSlice(self.allocator, data); 627 self.event_counter += 1; 628 629 // flush if threshold hit ··· 842 fn flushLocked(self: *DiskPersist) !void { 843 if (self.evtbuf.items.len == 0) return; 844 845 - // write buffered bytes to current file 846 const file = self.current_file orelse return; 847 - file.writeAll(self.outbuf.items) catch |err| { 848 - log.err("flush: write failed: {s}", .{@errorName(err)}); 849 - return err; 850 - }; 851 - 852 - // clear buffers 853 - self.outbuf.clearRetainingCapacity(); 854 855 // free job data 856 for (self.evtbuf.items) |job| {
··· 97 max_did_cache_size: u32 = 500_000, 98 99 // write buffer (flushed periodically or when threshold hit) 100 evtbuf: std.ArrayListUnmanaged(PersistJob) = .{}, 101 mutex: std.Thread.Mutex = .{}, 102 ··· 228 // free write buffer 229 for (self.evtbuf.items) |job| self.allocator.free(job.data); 230 self.evtbuf.deinit(self.allocator); 231 232 // free DID cache keys 233 { ··· 621 std.mem.writeInt(u64, data[20..28], seq, .little); 622 623 try self.evtbuf.append(self.allocator, .{ .data = data, .seq = seq }); 624 self.event_counter += 1; 625 626 // flush if threshold hit ··· 839 fn flushLocked(self: *DiskPersist) !void { 840 if (self.evtbuf.items.len == 0) return; 841 842 + // write buffered events to current file 843 const file = self.current_file orelse return; 844 + for (self.evtbuf.items) |job| { 845 + file.writeAll(job.data) catch |err| { 846 + log.err("flush: write failed: {s}", .{@errorName(err)}); 847 + return err; 848 + }; 849 + } 850 851 // free job data 852 for (self.evtbuf.items) |job| {
+2 -2
src/main.zig
··· 38 39 /// zig's default thread stack is 16 MB. with ~2,750 subscriber threads that's 40 /// 44 GB of virtual memory. most threads need far less — websocket read loops, 41 - /// CBOR decoding, HTTP handlers. 2 MB is generous for all of these. 42 - pub const default_stack_size = 2 * 1024 * 1024; 43 44 var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 45
··· 38 39 /// zig's default thread stack is 16 MB. with ~2,750 subscriber threads that's 40 /// 44 GB of virtual memory. most threads need far less — websocket read loops, 41 + /// CBOR decoding, HTTP handlers. 1 MB is sufficient for all of these. 42 + pub const default_stack_size = 1 * 1024 * 1024; 43 44 var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 45