atproto relay implementation in zig zlay.waow.tech

feat: multi-threaded DID resolver, cache warming, and listHosts endpoint

spawn N resolver threads (default 4, configurable via RESOLVER_THREADS)
instead of 1, and pre-populate the resolve queue with all active DIDs
from the database on startup. adds com.atproto.sync.listHosts XRPC
endpoint with Indigo-compatible response format (includes seq field,
filters by last_seq > 0, default limit 200).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+120 -5
+76
src/main.zig
··· 11 //! /xrpc/com.atproto.sync.getRepoStatus — single account status 12 //! /xrpc/com.atproto.sync.getLatestCommit — latest commit CID + rev 13 //! /xrpc/com.atproto.sync.listReposByCollection — repos with records in a collection 14 //! /xrpc/com.atproto.sync.requestCrawl — request PDS crawl (POST) 15 //! /admin/hosts — list all hosts (GET, admin) 16 //! /admin/hosts/block — block a host (POST, admin) ··· 117 118 // start: loads active hosts from DB, spawns subscriber threads 119 try slurper.start(); 120 121 // start GC thread (runs every 10 minutes) 122 const gc_thread = try std.Thread.spawn(.{}, gcLoop, .{&dp}); ··· 259 handleGetLatestCommit(request, query, persist); 260 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 261 handleListReposByCollection(request, query, ci); 262 } else if (std.mem.eql(u8, path, "/admin/hosts")) { 263 handleAdminListHosts(request, persist, slurper); 264 } else if (std.mem.eql(u8, path, "/")) { ··· 802 w.writeAll(last) catch return; 803 w.writeAll("\"") catch return; 804 } 805 } 806 807 w.writeByte('}') catch return;
··· 11 //! /xrpc/com.atproto.sync.getRepoStatus — single account status 12 //! /xrpc/com.atproto.sync.getLatestCommit — latest commit CID + rev 13 //! /xrpc/com.atproto.sync.listReposByCollection — repos with records in a collection 14 + //! /xrpc/com.atproto.sync.listHosts — paginated active host listing 15 //! /xrpc/com.atproto.sync.requestCrawl — request PDS crawl (POST) 16 //! /admin/hosts — list all hosts (GET, admin) 17 //! /admin/hosts/block — block a host (POST, admin) ··· 118 119 // start: loads active hosts from DB, spawns subscriber threads 120 try slurper.start(); 121 + 122 + // pre-populate resolver queue with all known active DIDs 123 + val.warmCache(&dp); 124 125 // start GC thread (runs every 10 minutes) 126 const gc_thread = try std.Thread.spawn(.{}, gcLoop, .{&dp}); ··· 263 handleGetLatestCommit(request, query, persist); 264 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 265 handleListReposByCollection(request, query, ci); 266 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 267 + handleListHosts(request, query, persist); 268 } else if (std.mem.eql(u8, path, "/admin/hosts")) { 269 handleAdminListHosts(request, persist, slurper); 270 } else if (std.mem.eql(u8, path, "/")) { ··· 808 w.writeAll(last) catch return; 809 w.writeAll("\"") catch return; 810 } 811 + } 812 + 813 + w.writeByte('}') catch return; 814 + respondJson(request, .ok, fbs.getWritten()); 815 + } 816 + 817 + fn handleListHosts(request: *http.Server.Request, query: []const u8, persist: *event_log_mod.DiskPersist) void { 818 + const cursor_str = queryParam(query, "cursor") orelse "0"; 819 + const limit_str = queryParam(query, "limit") orelse "200"; 820 + 821 + const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 822 + respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 823 + return; 824 + }; 825 + if (cursor_val < 0) { 826 + respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 827 + return; 828 + } 829 + 830 + const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 831 + respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 832 + return; 833 + }; 834 + if (limit < 1 or limit > 1000) { 835 + respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 836 + return; 837 + } 838 + 839 + var result = persist.db.query( 840 + "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 841 + .{ cursor_val, limit }, 842 + ) catch { 843 + respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 844 + return; 845 + }; 846 + defer result.deinit(); 847 + 848 + var buf: [65536]u8 = undefined; 849 + var fbs = std.io.fixedBufferStream(&buf); 850 + const w = fbs.writer(); 851 + 852 + var count: i64 = 0; 853 + var last_id: i64 = 0; 854 + 855 + w.writeAll("{\"hosts\":[") catch return; 856 + 857 + while (result.nextUnsafe() catch null) |row| { 858 + if (count > 0) w.writeByte(',') catch return; 859 + 860 + const id = row.get(i64, 0); 861 + const hostname = row.get([]const u8, 1); 862 + const status = row.get([]const u8, 2); 863 + const seq = row.get(i64, 3); 864 + 865 + w.writeAll("{\"hostname\":\"") catch return; 866 + w.writeAll(hostname) catch return; 867 + w.writeAll("\"") catch return; 868 + std.fmt.format(w, ",\"seq\":{d}", .{seq}) catch return; 869 + w.writeAll(",\"status\":\"") catch return; 870 + w.writeAll(status) catch return; 871 + w.writeAll("\"}") catch return; 872 + 873 + last_id = id; 874 + count += 1; 875 + } 876 + 877 + w.writeByte(']') catch return; 878 + 879 + if (count >= limit and count > 1) { 880 + std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_id}) catch return; 881 } 882 883 w.writeByte('}') catch return;
+44 -5
src/validator.zig
··· 8 const std = @import("std"); 9 const zat = @import("zat"); 10 const broadcaster = @import("broadcaster.zig"); 11 12 const Allocator = std.mem.Allocator; 13 const log = std.log.scoped(.relay); ··· 50 queue: std.ArrayListUnmanaged([]const u8) = .{}, 51 queue_mutex: std.Thread.Mutex = .{}, 52 queue_cond: std.Thread.Condition = .{}, 53 - resolver_thread: ?std.Thread = null, 54 alive: std.atomic.Value(bool) = .{ .raw = true }, 55 56 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 57 return initWithConfig(allocator, stats, .{}); ··· 67 68 pub fn deinit(self: *Validator) void { 69 self.alive.store(false, .release); 70 - self.queue_cond.signal(); 71 - if (self.resolver_thread) |t| t.join(); 72 73 // free cache keys (CachedKey is inline, no separate free needed) 74 var cache_it = self.cache.iterator(); ··· 84 self.queue.deinit(self.allocator); 85 } 86 87 - /// start the background resolver thread 88 pub fn start(self: *Validator) !void { 89 - self.resolver_thread = try std.Thread.spawn(.{}, resolveLoop, .{self}); 90 } 91 92 /// validate a #sync frame: signature verification only (no ops, no MST). ··· 443 return self.cache.count(); 444 } 445 }; 446 447 // --- tests --- 448
··· 8 const std = @import("std"); 9 const zat = @import("zat"); 10 const broadcaster = @import("broadcaster.zig"); 11 + const event_log_mod = @import("event_log.zig"); 12 13 const Allocator = std.mem.Allocator; 14 const log = std.log.scoped(.relay); ··· 51 queue: std.ArrayListUnmanaged([]const u8) = .{}, 52 queue_mutex: std.Thread.Mutex = .{}, 53 queue_cond: std.Thread.Condition = .{}, 54 + resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, 55 alive: std.atomic.Value(bool) = .{ .raw = true }, 56 + 57 + const max_resolver_threads = 8; 58 + const default_resolver_threads = 4; 59 60 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 61 return initWithConfig(allocator, stats, .{}); ··· 71 72 pub fn deinit(self: *Validator) void { 73 self.alive.store(false, .release); 74 + self.queue_cond.broadcast(); 75 + for (&self.resolver_threads) |*t| { 76 + if (t.*) |thread| { 77 + thread.join(); 78 + t.* = null; 79 + } 80 + } 81 82 // free cache keys (CachedKey is inline, no separate free needed) 83 var cache_it = self.cache.iterator(); ··· 93 self.queue.deinit(self.allocator); 94 } 95 96 + /// start background resolver threads 97 pub fn start(self: *Validator) !void { 98 + const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); 99 + const count = @min(n, max_resolver_threads); 100 + for (self.resolver_threads[0..count]) |*t| { 101 + t.* = try std.Thread.spawn(.{}, resolveLoop, .{self}); 102 + } 103 + } 104 + 105 + /// pre-populate the resolve queue with all active DIDs from the database. 106 + /// call after slurper.start() so subscribers are already running. 107 + pub fn warmCache(self: *Validator, persist: *event_log_mod.DiskPersist) void { 108 + var result = persist.db.query( 109 + "SELECT did FROM account WHERE status = 'active' AND upstream_status = 'active' ORDER BY uid ASC", 110 + .{}, 111 + ) catch |err| { 112 + log.warn("cache warming: query failed: {s}", .{@errorName(err)}); 113 + return; 114 + }; 115 + defer result.deinit(); 116 + 117 + var count: u64 = 0; 118 + while (result.nextUnsafe() catch null) |row| { 119 + const did = row.get([]const u8, 0); 120 + self.queueResolve(did); 121 + count += 1; 122 + } 123 + log.info("cache warming: queued {d} DIDs for resolution", .{count}); 124 } 125 126 /// validate a #sync frame: signature verification only (no ops, no MST). ··· 477 return self.cache.count(); 478 } 479 }; 480 + 481 + fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 482 + const val = std.posix.getenv(key) orelse return default; 483 + return std.fmt.parseInt(T, val, 10) catch default; 484 + } 485 486 // --- tests --- 487