atproto relay implementation in zig zlay.waow.tech

perf: DID resolve dedupe, migration interleave, mallinfo overflow fix

- add in-flight set to prevent duplicate DID entries in resolve queue
- interleave migration checks (1 per 10 DID resolutions) to prevent
starvation of the 140K+ migration queue
- bitcast mallinfo i32 fields to u32, extending useful range to 4 GiB
- extract magic numbers into named constants

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+27 -6
+7 -2
src/broadcaster.zig
··· 683 683 } else |_| {} 684 684 685 685 // glibc malloc arena stats — distinguishes in-use heap from fragmentation 686 - // mallinfo uses int fields (cap at 2 GiB) but sufficient for trend analysis 686 + // mallinfo returns c_int (i32) fields which overflow at 2 GiB; bitcast to u32 687 + // extends useful range to 4 GiB per field 687 688 const mi = malloc_h.mallinfo(); 689 + const arena: u64 = @as(u32, @bitCast(mi.arena)); 690 + const in_use: u64 = @as(u32, @bitCast(mi.uordblks)); 691 + const free_bytes: u64 = @as(u32, @bitCast(mi.fordblks)); 692 + const mmap_bytes: u64 = @as(u32, @bitCast(mi.hblkhd)); 688 693 std.fmt.format(w, 689 694 \\# TYPE relay_malloc_arena_bytes gauge 690 695 \\# HELP relay_malloc_arena_bytes total bytes claimed from OS by malloc ··· 702 707 \\# HELP relay_malloc_mmap_bytes bytes allocated via mmap (large blocks) 703 708 \\relay_malloc_mmap_bytes {d} 704 709 \\ 705 - , .{ mi.arena, mi.uordblks, mi.fordblks, mi.hblkhd }) catch {}; 710 + , .{ arena, in_use, free_bytes, mmap_bytes }) catch {}; 706 711 } 707 712 708 713 const posix_vfs = @cImport(@cInclude("sys/statvfs.h"));
+20 -4
src/validator.zig
··· 55 55 cache_mutex: std.Thread.Mutex = .{}, 56 56 // background resolve queue 57 57 queue: std.ArrayListUnmanaged([]const u8) = .{}, 58 + // in-flight set — prevents duplicate DID entries in the queue 59 + queued_set: std.StringHashMapUnmanaged(void) = .{}, 58 60 // migration validation queue 59 61 migration_queue: std.ArrayListUnmanaged(MigrationCheck) = .{}, 60 62 queue_mutex: std.Thread.Mutex = .{}, ··· 65 67 66 68 const max_resolver_threads = 8; 67 69 const default_resolver_threads = 4; 70 + // process 1 migration check per N DID resolutions to prevent migration queue starvation 71 + const migration_interleave_interval = 10; 72 + // recreate http resolver after N resolutions to bound connection pool growth 73 + const resolver_recycle_interval = 1000; 68 74 69 75 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 70 76 return initWithConfig(allocator, stats, .{}); ··· 100 106 self.allocator.free(did); 101 107 } 102 108 self.queue.deinit(self.allocator); 109 + self.queued_set.deinit(self.allocator); 103 110 104 111 // free migration queue 105 112 for (self.migration_queue.items) |mc| { ··· 387 394 388 395 self.queue_mutex.lock(); 389 396 defer self.queue_mutex.unlock(); 397 + 398 + // skip if already queued (prevents duplicate in-flight resolutions) 399 + if (self.queued_set.contains(duped)) { 400 + self.allocator.free(duped); 401 + return; 402 + } 403 + 390 404 self.queue.append(self.allocator, duped) catch { 391 405 self.allocator.free(duped); 392 406 return; 393 407 }; 408 + self.queued_set.put(self.allocator, duped, {}) catch {}; 394 409 self.queue_cond.signal(); 395 410 } 396 411 ··· 408 423 while (self.queue.items.len == 0 and self.migration_queue.items.len == 0 and self.alive.load(.acquire)) { 409 424 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; 410 425 } 411 - if (self.queue.items.len > 0) { 412 - did = self.queue.orderedRemove(0); 413 - } else if (self.migration_queue.items.len > 0) { 426 + if (self.migration_queue.items.len > 0 and (self.queue.items.len == 0 or resolve_count % migration_interleave_interval == 0)) { 414 427 migration = self.migration_queue.orderedRemove(0); 428 + } else if (self.queue.items.len > 0) { 429 + did = self.queue.orderedRemove(0); 430 + _ = self.queued_set.remove(did.?); 415 431 } 416 432 } 417 433 ··· 427 443 428 444 // periodically recreate resolver to free accumulated http.Client state 429 445 resolve_count += 1; 430 - if (resolve_count % 1000 == 0) { 446 + if (resolve_count % resolver_recycle_interval == 0) { 431 447 resolver.deinit(); 432 448 resolver = zat.DidResolver.init(self.allocator); 433 449 }