atproto relay implementation in zig zlay.waow.tech

feat: validator cache eviction — cap at 500K entries, evict oldest 10% by resolve_time

prevents unbounded memory growth from caching every DID's signing key.
configurable via VALIDATOR_CACHE_SIZE env var. eviction runs in the
background resolve loop (not the hot validation path). adds
relay_validator_cache_evictions_total prometheus counter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+91
+6
src/broadcaster.zig
··· 34 34 cache_misses: std.atomic.Value(u64) = .{ .raw = 0 }, 35 35 slow_consumers: std.atomic.Value(u64) = .{ .raw = 0 }, 36 36 connected_inbound: std.atomic.Value(u64) = .{ .raw = 0 }, 37 + cache_evictions: std.atomic.Value(u64) = .{ .raw = 0 }, 37 38 start_time: i64 = 0, 38 39 }; 39 40 ··· 593 594 \\# TYPE relay_validator_migration_queue gauge 594 595 \\relay_validator_migration_queue {d} 595 596 \\ 597 + \\# TYPE relay_validator_cache_evictions_total counter 598 + \\relay_validator_cache_evictions_total {d} 599 + \\ 596 600 , .{ 597 601 stats.frames_in.load(.acquire), 598 602 stats.frames_out.load(.acquire), ··· 611 615 uptime, 612 616 cache_entries, 613 617 migration_queue_len, 618 + stats.cache_evictions.load(.acquire), 614 619 }) catch return fbs.getWritten(); 615 620 616 621 // linux-only process metrics from /proc ··· 789 794 try std.testing.expect(std.mem.indexOf(u8, output, "# TYPE relay_uptime_seconds gauge") != null); 790 795 try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_cache_entries 42") != null); 791 796 try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_migration_queue 3") != null); 797 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_cache_evictions_total 0") != null); 792 798 } 793 799 794 800 test "formatStatsResponse produces valid JSON" {
+85
src/validator.zig
··· 61 61 queue_cond: std.Thread.Condition = .{}, 62 62 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, 63 63 alive: std.atomic.Value(bool) = .{ .raw = true }, 64 + max_cache_size: u32 = 500_000, 64 65 65 66 const max_resolver_threads = 8; 66 67 const default_resolver_threads = 4; ··· 109 110 110 111 /// start background resolver threads 111 112 pub fn start(self: *Validator) !void { 113 + self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size); 112 114 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); 113 115 const count = @min(n, max_resolver_threads); 114 116 for (self.resolver_threads[0..count]) |*t| { ··· 445 447 }; 446 448 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 447 449 450 + // evict oldest entries if cache is at capacity 451 + const needs_eviction = blk: { 452 + self.cache_mutex.lock(); 453 + defer self.cache_mutex.unlock(); 454 + break :blk self.cache.count() >= self.max_cache_size; 455 + }; 456 + if (needs_eviction) { 457 + self.evictOldest(); 458 + } 459 + 448 460 const did_duped = self.allocator.dupe(u8, d) catch continue; 449 461 450 462 self.cache_mutex.lock(); ··· 545 557 defer self.queue_mutex.unlock(); 546 558 return self.migration_queue.items.len; 547 559 } 560 + 561 + /// evict the oldest 10% of cache entries by resolve_time. 562 + /// called from resolveLoop when cache is at capacity. 563 + fn evictOldest(self: *Validator) void { 564 + self.cache_mutex.lock(); 565 + defer self.cache_mutex.unlock(); 566 + 567 + const count = self.cache.count(); 568 + if (count == 0) return; 569 + const evict_count = @max(1, count / 10); 570 + 571 + // collect all entries with their resolve_times 572 + const Entry = struct { key: []const u8, resolve_time: i64 }; 573 + const entries = self.allocator.alloc(Entry, count) catch return; 574 + defer self.allocator.free(entries); 575 + 576 + var i: usize = 0; 577 + var it = self.cache.iterator(); 578 + while (it.next()) |entry| { 579 + entries[i] = .{ .key = entry.key_ptr.*, .resolve_time = entry.value_ptr.resolve_time }; 580 + i += 1; 581 + } 582 + 583 + // sort by resolve_time ascending (oldest first) 584 + std.mem.sort(Entry, entries[0..i], {}, struct { 585 + fn lessThan(_: void, a: Entry, b: Entry) bool { 586 + return a.resolve_time < b.resolve_time; 587 + } 588 + }.lessThan); 589 + 590 + // evict the oldest entries 591 + for (entries[0..evict_count]) |entry| { 592 + if (self.cache.fetchRemove(entry.key)) |removed| { 593 + self.allocator.free(removed.key); 594 + } 595 + } 596 + 597 + _ = self.stats.cache_evictions.fetchAdd(evict_count, .monotonic); 598 + } 548 599 }; 549 600 550 601 /// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path" ··· 698 749 try std.testing.expect(result.valid); 699 750 try std.testing.expect(result.skipped); 700 751 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 752 + } 753 + 754 + test "evictOldest removes oldest entries by resolve_time" { 755 + var stats = broadcaster.Stats{}; 756 + var v = Validator.init(std.testing.allocator, &stats); 757 + v.max_cache_size = 5; 758 + defer v.deinit(); 759 + 760 + // insert 5 entries with staggered resolve_times 761 + const dids = [_][]const u8{ "did:plc:aaa", "did:plc:bbb", "did:plc:ccc", "did:plc:ddd", "did:plc:eee" }; 762 + for (dids, 0..) |did, i| { 763 + const key = try std.testing.allocator.dupe(u8, did); 764 + v.cache.put(std.testing.allocator, key, .{ 765 + .key_type = .p256, 766 + .raw = .{0} ** 33, 767 + .len = 33, 768 + .resolve_time = @intCast(100 + i), // 100, 101, 102, 103, 104 769 + }) catch { 770 + std.testing.allocator.free(key); 771 + return error.TestFailed; 772 + }; 773 + } 774 + try std.testing.expectEqual(@as(u32, 5), v.cache.count()); 775 + 776 + // evict — should remove oldest 10% = 1 entry (the one with resolve_time=100) 777 + v.evictOldest(); 778 + 779 + try std.testing.expectEqual(@as(u32, 4), v.cache.count()); 780 + // oldest entry ("did:plc:aaa" with resolve_time=100) should be gone 781 + try std.testing.expect(v.cache.get("did:plc:aaa") == null); 782 + // newest entries should still be present 783 + try std.testing.expect(v.cache.get("did:plc:eee") != null); 784 + try std.testing.expect(v.cache.get("did:plc:ddd") != null); 785 + try std.testing.expectEqual(@as(u64, 1), stats.cache_evictions.load(.acquire)); 701 786 } 702 787 703 788 test "checkCommitStructure rejects too many ops" {