atproto relay implementation in zig zlay.waow.tech

fix: pullHosts direct PDS crawling bootstrap

Replace broken low-level HTTP request flow (req.reader() doesn't exist
in zig 0.15) with client.fetch() + Io.Writer.Allocating — same proven
pattern as xrpc/transport.zig. Move HTTP client outside pagination loop.

Also adds connected_inbound metric and removes per-host rate limiter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+121 -30
+8 -1
src/broadcaster.zig
··· 31 31 cache_hits: std.atomic.Value(u64) = .{ .raw = 0 }, 32 32 cache_misses: std.atomic.Value(u64) = .{ .raw = 0 }, 33 33 slow_consumers: std.atomic.Value(u64) = .{ .raw = 0 }, 34 + connected_inbound: std.atomic.Value(u64) = .{ .raw = 0 }, 34 35 start_time: i64 = 0, 35 36 }; 36 37 ··· 541 542 \\# TYPE relay_slow_consumers_total counter 542 543 \\relay_slow_consumers_total {d} 543 544 \\ 545 + \\# TYPE relay_connected_inbound gauge 546 + \\relay_connected_inbound {d} 547 + \\ 544 548 \\# TYPE relay_upstream_seq gauge 545 549 \\relay_upstream_seq {d} 546 550 \\ ··· 561 565 stats.cache_hits.load(.acquire), 562 566 stats.cache_misses.load(.acquire), 563 567 stats.slow_consumers.load(.acquire), 568 + stats.connected_inbound.load(.acquire), 564 569 stats.seq.load(.acquire), 565 570 stats.relay_seq.load(.acquire), 566 571 uptime, ··· 570 575 pub fn formatStatsResponse(stats: *const Stats, buf: []u8) []const u8 { 571 576 var json_buf: [2048]u8 = undefined; 572 577 const json = std.fmt.bufPrint(&json_buf, 573 - \\{{"seq":{d},"relay_seq":{d},"consumers":{d},"frames_in":{d},"frames_out":{d},"validated":{d},"failed":{d},"skipped":{d},"decode_errors":{d},"cache_hits":{d},"cache_misses":{d},"slow_consumers":{d},"uptime_seconds":{d}}} 578 + \\{{"seq":{d},"relay_seq":{d},"consumers":{d},"connected_inbound":{d},"frames_in":{d},"frames_out":{d},"validated":{d},"failed":{d},"skipped":{d},"decode_errors":{d},"cache_hits":{d},"cache_misses":{d},"slow_consumers":{d},"uptime_seconds":{d}}} 574 579 , .{ 575 580 stats.seq.load(.acquire), 576 581 stats.relay_seq.load(.acquire), 577 582 stats.consumer_count.load(.acquire), 583 + stats.connected_inbound.load(.acquire), 578 584 stats.frames_in.load(.acquire), 579 585 stats.frames_out.load(.acquire), 580 586 stats.validated.load(.acquire), ··· 689 695 try std.testing.expect(std.mem.indexOf(u8, output, "relay_consumers_active 3") != null); 690 696 try std.testing.expect(std.mem.indexOf(u8, output, "relay_validation_total{result=\"validated\"} 500") != null); 691 697 try std.testing.expect(std.mem.indexOf(u8, output, "relay_validation_total{result=\"failed\"} 2") != null); 698 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_connected_inbound 0") != null); 692 699 try std.testing.expect(std.mem.indexOf(u8, output, "relay_upstream_seq 99999") != null); 693 700 try std.testing.expect(std.mem.indexOf(u8, output, "relay_seq 12345") != null); 694 701 try std.testing.expect(std.mem.indexOf(u8, output, "# TYPE relay_uptime_seconds gauge") != null);
+113 -4
src/slurper.zig
··· 237 237 }; 238 238 } 239 239 240 - /// start the slurper: load active hosts from DB, spawn workers, start crawl processor. 241 - /// the seed host is always added if no hosts exist yet. 240 + /// start the slurper: bootstrap hosts from seed relay, load from DB, spawn workers. 241 + /// Go relay: pull-hosts bootstraps from bsky.network's listHosts, then crawls each PDS directly. 242 242 pub fn start(self: *Slurper) !void { 243 - // ensure seed host exists in DB 244 - _ = try self.persist.getOrCreateHost(self.options.seed_host); 243 + // bootstrap: if DB has no active hosts, pull from seed relay's listHosts API 244 + const existing = try self.persist.listActiveHosts(self.allocator); 245 + const need_bootstrap = existing.len == 0; 246 + for (existing) |h| { 247 + self.allocator.free(h.hostname); 248 + self.allocator.free(h.status); 249 + } 250 + self.allocator.free(existing); 251 + 252 + if (need_bootstrap) { 253 + log.info("no active hosts in DB, bootstrapping from {s}", .{self.options.seed_host}); 254 + self.pullHosts() catch |err| { 255 + log.warn("bootstrap from {s} failed: {s}", .{ self.options.seed_host, @errorName(err) }); 256 + }; 257 + } 245 258 246 259 // load all active hosts and spawn workers 247 260 const hosts = try self.persist.listActiveHosts(self.allocator); ··· 265 278 self.crawl_thread = try std.Thread.spawn(.{}, processCrawlQueue, .{self}); 266 279 } 267 280 281 + /// pull PDS host list from the seed relay's com.atproto.sync.listHosts endpoint. 282 + /// Go relay: cmd/relay/pull.go — one-time bootstrap, reads REST API, not firehose. 283 + pub fn pullHosts(self: *Slurper) !void { 284 + var cursor: ?[]const u8 = null; 285 + var total: usize = 0; 286 + const limit = 500; 287 + 288 + var client: http.Client = .{ .allocator = self.allocator }; 289 + defer client.deinit(); 290 + 291 + while (true) { 292 + if (self.shutdown.load(.acquire)) break; 293 + 294 + // build URL with pagination 295 + var url_buf: [512]u8 = undefined; 296 + const url = if (cursor) |c| 297 + std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listHosts?limit={d}&cursor={s}", .{ self.options.seed_host, limit, c }) catch break 298 + else 299 + std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listHosts?limit={d}", .{ self.options.seed_host, limit }) catch break; 300 + 301 + var aw: std.Io.Writer.Allocating = .init(self.allocator); 302 + defer aw.deinit(); 303 + 304 + const result = client.fetch(.{ 305 + .location = .{ .url = url }, 306 + .response_writer = &aw.writer, 307 + .method = .GET, 308 + }) catch |err| { 309 + log.warn("pullHosts: fetch failed: {s}", .{@errorName(err)}); 310 + break; 311 + }; 312 + 313 + if (result.status != .ok) { 314 + log.warn("pullHosts: got status {d}", .{@intFromEnum(result.status)}); 315 + break; 316 + } 317 + 318 + const body = aw.toArrayList().items; 319 + 320 + // parse JSON response: { "hosts": [{"hostname": "...", "status": "..."}, ...], "cursor": "..." } 321 + const parsed = std.json.parseFromSlice(ListHostsResponse, self.allocator, body, .{ .ignore_unknown_fields = true }) catch |err| { 322 + log.warn("pullHosts: JSON parse failed: {s}", .{@errorName(err)}); 323 + break; 324 + }; 325 + defer parsed.deinit(); 326 + 327 + const hosts = parsed.value.hosts orelse break; 328 + if (hosts.len == 0) break; 329 + 330 + var added: usize = 0; 331 + for (hosts) |host| { 332 + // skip non-active hosts 333 + if (host.status) |s| { 334 + if (!std.mem.eql(u8, s, "active")) continue; 335 + } 336 + // validate hostname format (rejects IPs, localhost, etc.) 337 + const normalized = validateHostname(self.allocator, host.hostname) catch continue; 338 + defer self.allocator.free(normalized); 339 + 340 + // skip banned domains 341 + if (self.persist.isDomainBanned(normalized)) continue; 342 + 343 + // insert into DB (no describeServer check — the seed relay already vetted them) 344 + _ = self.persist.getOrCreateHost(normalized) catch continue; 345 + added += 1; 346 + } 347 + total += added; 348 + log.info("pullHosts: page fetched, {d} hosts added ({d} total)", .{ added, total }); 349 + 350 + // advance cursor 351 + if (parsed.value.cursor) |next_cursor| { 352 + // free previous cursor if we allocated one 353 + if (cursor) |prev| self.allocator.free(prev); 354 + cursor = self.allocator.dupe(u8, next_cursor) catch break; 355 + } else { 356 + break; // no more pages 357 + } 358 + } 359 + 360 + // free final cursor 361 + if (cursor) |c| self.allocator.free(c); 362 + log.info("pullHosts: bootstrap complete, {d} hosts added from {s}", .{ total, self.options.seed_host }); 363 + } 364 + 365 + const ListHostsResponse = struct { 366 + hosts: ?[]const ListHostEntry = null, 367 + cursor: ?[]const u8 = null, 368 + }; 369 + 370 + const ListHostEntry = struct { 371 + hostname: []const u8, 372 + status: ?[]const u8 = null, 373 + }; 374 + 268 375 /// add a crawl request (from requestCrawl endpoint) 269 376 pub fn addCrawlRequest(self: *Slurper, hostname: []const u8) !void { 270 377 const duped = try self.allocator.dupe(u8, hostname); ··· 372 479 .thread = thread, 373 480 .subscriber = sub, 374 481 }); 482 + _ = self.bc.stats.connected_inbound.fetchAdd(1, .monotonic); 375 483 } 376 484 377 485 /// worker thread wrapper — runs subscriber, cleans up on exit ··· 382 490 self.workers_mutex.lock(); 383 491 defer self.workers_mutex.unlock(); 384 492 _ = self.workers.remove(host_id); 493 + _ = self.bc.stats.connected_inbound.fetchSub(1, .monotonic); 385 494 386 495 log.info("worker for host_id={d} ({s}) exited", .{ host_id, sub.options.hostname }); 387 496
-25
src/subscriber.zig
··· 18 18 19 19 const max_consecutive_failures = 15; 20 20 const cursor_flush_interval_sec = 4; // flush cursor to DB every N seconds (Go relay: 4s) 21 - const default_rate_limit: u64 = 100; // events per second per host (Go relay: 50/sec baseline) 22 21 23 22 pub const Options = struct { 24 23 hostname: []const u8 = "bsky.network", ··· 38 37 39 38 // per-host shutdown (e.g. FutureCursor — stops only this subscriber) 40 39 host_shutdown: std.atomic.Value(bool) = .{ .raw = false }, 41 - 42 - // per-host rate limiting (token bucket) 43 - rate_tokens: u64 = default_rate_limit, 44 - rate_last_refill: i64 = 0, 45 - rate_dropped: u64 = 0, 46 40 47 41 pub fn init( 48 42 allocator: Allocator, ··· 234 228 sub.flushCursor(); 235 229 sub.last_cursor_flush = now; 236 230 } 237 - } 238 - 239 - // per-host rate limiting (token bucket, refills once per second) 240 - // Go relay: sliding window limiters per host (50/sec baseline) 241 - { 242 - const now = std.time.timestamp(); 243 - if (now > sub.rate_last_refill) { 244 - sub.rate_tokens = default_rate_limit; 245 - sub.rate_last_refill = now; 246 - if (sub.rate_dropped > 0) { 247 - log.warn("host {s}: rate limited, dropped {d} events in last window", .{ sub.options.hostname, sub.rate_dropped }); 248 - sub.rate_dropped = 0; 249 - } 250 - } 251 - if (sub.rate_tokens == 0) { 252 - sub.rate_dropped += 1; 253 - return; 254 - } 255 - sub.rate_tokens -= 1; 256 231 } 257 232 258 233 // route by frame type