atproto relay implementation in zig zlay.waow.tech

perf: share single TLS CA bundle across all subscriber connections

each subscriber was loading its own copy of system CA certificates via
Bundle.rescan() per connection. with ~2700 hosts, that's ~2700 copies
of the same cert data in memory. load once in slurper.start(), pass
to all subscribers via config.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+15
+13
src/slurper.zig
··· 204 shutdown: *std.atomic.Value(bool), 205 options: Options, 206 207 // active subscriber threads, keyed by host_id 208 workers: std.AutoHashMapUnmanaged(u64, WorkerEntry) = .{}, 209 workers_mutex: std.Thread.Mutex = .{}, ··· 237 /// start the slurper: bootstrap hosts from seed relay, load from DB, spawn workers. 238 /// Go relay: pull-hosts bootstraps from bsky.network's listHosts, then crawls each PDS directly. 239 pub fn start(self: *Slurper) !void { 240 // always pull hosts from seed relay on startup — idempotent (getOrCreateHost skips existing) 241 // Go relay: pull-hosts is a separate command, but we run it inline for simplicity 242 log.info("pulling hosts from {s}...", .{self.options.seed_host}); ··· 435 .hostname = hostname_duped, 436 .max_message_size = self.options.max_message_size, 437 .host_id = host_id, 438 }, 439 ); 440 sub.collection_index = self.collection_index; ··· 525 // clean up crawl queue 526 for (self.crawl_queue.items) |h| self.allocator.free(h); 527 self.crawl_queue.deinit(self.allocator); 528 } 529 }; 530
··· 204 shutdown: *std.atomic.Value(bool), 205 options: Options, 206 207 + // shared TLS CA bundle — loaded once, used by all subscriber connections 208 + ca_bundle: ?std.crypto.Certificate.Bundle = null, 209 + 210 // active subscriber threads, keyed by host_id 211 workers: std.AutoHashMapUnmanaged(u64, WorkerEntry) = .{}, 212 workers_mutex: std.Thread.Mutex = .{}, ··· 240 /// start the slurper: bootstrap hosts from seed relay, load from DB, spawn workers. 241 /// Go relay: pull-hosts bootstraps from bsky.network's listHosts, then crawls each PDS directly. 242 pub fn start(self: *Slurper) !void { 243 + // load CA bundle once — shared by all subscriber TLS connections 244 + var bundle: std.crypto.Certificate.Bundle = .{}; 245 + try bundle.rescan(self.allocator); 246 + self.ca_bundle = bundle; 247 + log.info("loaded shared CA bundle", .{}); 248 + 249 // always pull hosts from seed relay on startup — idempotent (getOrCreateHost skips existing) 250 // Go relay: pull-hosts is a separate command, but we run it inline for simplicity 251 log.info("pulling hosts from {s}...", .{self.options.seed_host}); ··· 444 .hostname = hostname_duped, 445 .max_message_size = self.options.max_message_size, 446 .host_id = host_id, 447 + .ca_bundle = self.ca_bundle, 448 }, 449 ); 450 sub.collection_index = self.collection_index; ··· 535 // clean up crawl queue 536 for (self.crawl_queue.items) |h| self.allocator.free(h); 537 self.crawl_queue.deinit(self.allocator); 538 + 539 + // free shared CA bundle 540 + if (self.ca_bundle) |*b| b.deinit(self.allocator); 541 } 542 }; 543
+2
src/subscriber.zig
··· 44 hostname: []const u8 = "bsky.network", 45 max_message_size: usize = 5 * 1024 * 1024, 46 host_id: u64 = 0, 47 }; 48 49 /// simple sliding window rate limiter — tracks event counts per second/hour/day. ··· 219 .port = 443, 220 .tls = true, 221 .max_size = self.options.max_message_size, 222 }); 223 defer client.deinit(); 224
··· 44 hostname: []const u8 = "bsky.network", 45 max_message_size: usize = 5 * 1024 * 1024, 46 host_id: u64 = 0, 47 + ca_bundle: ?std.crypto.Certificate.Bundle = null, 48 }; 49 50 /// simple sliding window rate limiter — tracks event counts per second/hour/day. ··· 220 .port = 443, 221 .tls = true, 222 .max_size = self.options.max_message_size, 223 + .ca_bundle = self.options.ca_bundle, 224 }); 225 defer client.deinit(); 226