atproto relay implementation in zig zlay.waow.tech

feat: add per-host rate limiting and Server header

- per-host sliding window rate limits (50/s, 2500/hr, 20000/day baseline;
5000/s, 50M/hr, 500M/day for trusted *.host.bsky.network hosts)
matches Go relay's slidingwindow approach
- Server: zlay (atproto-relay) header on all HTTP responses,
enabling relay loop detection by other relays
- new relay_rate_limited_total prometheus counter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+149 -4
+5
src/broadcaster.zig
··· 27 27 validated: std.atomic.Value(u64) = .{ .raw = 0 }, 28 28 failed: std.atomic.Value(u64) = .{ .raw = 0 }, 29 29 skipped: std.atomic.Value(u64) = .{ .raw = 0 }, 30 + rate_limited: std.atomic.Value(u64) = .{ .raw = 0 }, 30 31 decode_errors: std.atomic.Value(u64) = .{ .raw = 0 }, 31 32 cache_hits: std.atomic.Value(u64) = .{ .raw = 0 }, 32 33 cache_misses: std.atomic.Value(u64) = .{ .raw = 0 }, ··· 531 532 \\relay_validation_total{{result="failed"}} {d} 532 533 \\relay_validation_total{{result="skipped"}} {d} 533 534 \\ 535 + \\# TYPE relay_rate_limited_total counter 536 + \\relay_rate_limited_total {d} 537 + \\ 534 538 \\# TYPE relay_decode_errors_total counter 535 539 \\relay_decode_errors_total {d} 536 540 \\ ··· 560 564 stats.validated.load(.acquire), 561 565 stats.failed.load(.acquire), 562 566 stats.skipped.load(.acquire), 567 + stats.rate_limited.load(.acquire), 563 568 stats.decode_errors.load(.acquire), 564 569 stats.cache_hits.load(.acquire), 565 570 stats.cache_misses.load(.acquire),
+16 -4
src/main.zig
··· 247 247 } else if (std.mem.eql(u8, path, "/metrics")) { 248 248 var metrics_buf: [4096]u8 = undefined; 249 249 const body = broadcaster.formatPrometheusMetrics(stats, &metrics_buf); 250 - request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{.{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }} }) catch {}; 250 + request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 251 + .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 252 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 253 + } }) catch {}; 251 254 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 252 255 handleListRepos(request, query, persist); 253 256 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { ··· 279 282 \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/> 280 283 \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text> 281 284 \\</svg> 282 - , .{ .status = .ok, .keep_alive = false, .extra_headers = &.{.{ .name = "content-type", .value = "image/svg+xml" }} }) catch {}; 285 + , .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 286 + .{ .name = "content-type", .value = "image/svg+xml" }, 287 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 288 + } }) catch {}; 283 289 } else { 284 290 respondText(request, .not_found, "not found"); 285 291 } ··· 869 875 // --- response helpers --- 870 876 871 877 fn respondJson(request: *http.Server.Request, status: http.Status, body: []const u8) void { 872 - request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{.{ .name = "content-type", .value = "application/json" }} }) catch {}; 878 + request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{ 879 + .{ .name = "content-type", .value = "application/json" }, 880 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 881 + } }) catch {}; 873 882 } 874 883 875 884 fn respondText(request: *http.Server.Request, status: http.Status, body: []const u8) void { 876 - request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{.{ .name = "content-type", .value = "text/plain" }} }) catch {}; 885 + request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{ 886 + .{ .name = "content-type", .value = "text/plain" }, 887 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 888 + } }) catch {}; 877 889 } 878 890 879 891 fn parseEnvInt(comptime T: type, key: []const u8, default: T) T {
+128
src/subscriber.zig
··· 20 20 const max_consecutive_failures = 15; 21 21 const cursor_flush_interval_sec = 4; // flush cursor to DB every N seconds (Go relay: 4s) 22 22 23 + // per-host rate limits (Go relay: 50/s baseline, 2500/hr, 20000/day for public hosts) 24 + const default_per_second_limit: u64 = 50; 25 + const default_per_hour_limit: u64 = 2500; 26 + const default_per_day_limit: u64 = 20_000; 27 + 28 + // trusted hosts get much higher limits (Go relay: 5000/s, 50M/hr, 500M/day) 29 + const trusted_per_second_limit: u64 = 5_000; 30 + const trusted_per_hour_limit: u64 = 50_000_000; 31 + const trusted_per_day_limit: u64 = 500_000_000; 32 + 33 + // Go relay: TrustedDomains config — hosts matching these suffixes get trusted limits 34 + const trusted_suffixes: []const []const u8 = &.{".host.bsky.network"}; 35 + 36 + fn isTrustedHost(hostname: []const u8) bool { 37 + for (trusted_suffixes) |suffix| { 38 + if (std.mem.endsWith(u8, hostname, suffix)) return true; 39 + } 40 + return false; 41 + } 42 + 23 43 pub const Options = struct { 24 44 hostname: []const u8 = "bsky.network", 25 45 max_message_size: usize = 5 * 1024 * 1024, 26 46 host_id: u64 = 0, 27 47 }; 28 48 49 + /// simple sliding window rate limiter — tracks event counts per second/hour/day. 50 + /// Go relay uses github.com/RussellLuo/slidingwindow; this is a simpler fixed-window 51 + /// approximation that resets counters at window boundaries. 52 + const RateLimiter = struct { 53 + // per-second 54 + sec_count: u64 = 0, 55 + sec_epoch: i64 = 0, 56 + sec_limit: u64 = default_per_second_limit, 57 + 58 + // per-hour 59 + hour_count: u64 = 0, 60 + hour_epoch: i64 = 0, 61 + hour_limit: u64 = default_per_hour_limit, 62 + 63 + // per-day 64 + day_count: u64 = 0, 65 + day_epoch: i64 = 0, 66 + day_limit: u64 = default_per_day_limit, 67 + 68 + /// returns true if the event is allowed, false if rate-limited. 69 + fn allow(self: *RateLimiter, now: i64) bool { 70 + // per-second window 71 + if (now != self.sec_epoch) { 72 + self.sec_epoch = now; 73 + self.sec_count = 0; 74 + } 75 + if (self.sec_count >= self.sec_limit) return false; 76 + 77 + // per-hour window 78 + const hour = @divTrunc(now, 3600); 79 + if (hour != self.hour_epoch) { 80 + self.hour_epoch = hour; 81 + self.hour_count = 0; 82 + } 83 + if (self.hour_count >= self.hour_limit) return false; 84 + 85 + // per-day window 86 + const day = @divTrunc(now, 86400); 87 + if (day != self.day_epoch) { 88 + self.day_epoch = day; 89 + self.day_count = 0; 90 + } 91 + if (self.day_count >= self.day_limit) return false; 92 + 93 + self.sec_count += 1; 94 + self.hour_count += 1; 95 + self.day_count += 1; 96 + return true; 97 + } 98 + }; 99 + 29 100 pub const Subscriber = struct { 30 101 allocator: Allocator, 31 102 options: Options, ··· 36 107 shutdown: *std.atomic.Value(bool), 37 108 last_upstream_seq: ?u64 = null, 38 109 last_cursor_flush: i64 = 0, 110 + rate_limiter: RateLimiter = .{}, 39 111 40 112 // per-host shutdown (e.g. FutureCursor — stops only this subscriber) 41 113 host_shutdown: std.atomic.Value(bool) = .{ .raw = false }, ··· 48 120 shutdown: *std.atomic.Value(bool), 49 121 options: Options, 50 122 ) Subscriber { 123 + const trusted = isTrustedHost(options.hostname); 51 124 return .{ 52 125 .allocator = allocator, 53 126 .options = options, ··· 55 128 .validator = val, 56 129 .persist = persist, 57 130 .shutdown = shutdown, 131 + .rate_limiter = .{ 132 + .sec_limit = if (trusted) trusted_per_second_limit else default_per_second_limit, 133 + .hour_limit = if (trusted) trusted_per_hour_limit else default_per_hour_limit, 134 + .day_limit = if (trusted) trusted_per_day_limit else default_per_day_limit, 135 + }, 58 136 }; 59 137 } 60 138 ··· 220 298 _ = sub.bc.stats.frames_in.fetchAdd(1, .monotonic); 221 299 222 300 // extract seq for cursor tracking (all event types have seq) 301 + // must happen before rate limiting so we don't re-process dropped events on reconnect 223 302 const upstream_seq = payload.getUint("seq"); 224 303 if (upstream_seq) |s| { 225 304 sub.last_upstream_seq = s; ··· 235 314 } 236 315 } 237 316 317 + // per-host rate limiting (Go relay: slidingwindow per-second/hour/day) 318 + // applied after cursor tracking so dropped events aren't re-processed on reconnect 319 + if (!sub.rate_limiter.allow(std.time.timestamp())) { 320 + _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic); 321 + return; 322 + } 323 + 238 324 // route by frame type 239 325 const is_commit = std.mem.eql(u8, frame_type, "#commit"); 240 326 const is_account = std.mem.eql(u8, frame_type, "#account"); ··· 434 520 try std.testing.expectEqualStrings("#identity", h.getString("t").?); 435 521 try std.testing.expectEqualStrings("did:plc:alice", p.getString("did").?); 436 522 try std.testing.expectEqual(@as(i64, 99), p.getInt("seq").?); 523 + } 524 + 525 + test "rate limiter enforces per-second limit" { 526 + var rl: RateLimiter = .{}; 527 + rl.sec_limit = 3; 528 + rl.hour_limit = 1000; 529 + rl.day_limit = 10000; 530 + 531 + const now: i64 = 1000000; 532 + try std.testing.expect(rl.allow(now)); 533 + try std.testing.expect(rl.allow(now)); 534 + try std.testing.expect(rl.allow(now)); 535 + // 4th should be rejected 536 + try std.testing.expect(!rl.allow(now)); 537 + 538 + // next second resets 539 + try std.testing.expect(rl.allow(now + 1)); 540 + } 541 + 542 + test "rate limiter enforces per-hour limit" { 543 + var rl: RateLimiter = .{}; 544 + rl.sec_limit = 1000; // high per-second so it doesn't interfere 545 + rl.hour_limit = 5; 546 + rl.day_limit = 10000; 547 + 548 + const now: i64 = 3600 * 100; // some hour boundary 549 + for (0..5) |_| { 550 + try std.testing.expect(rl.allow(now)); 551 + } 552 + // 6th should be rejected (same hour) 553 + try std.testing.expect(!rl.allow(now + 1)); 554 + 555 + // next hour resets 556 + try std.testing.expect(rl.allow(now + 3600)); 557 + } 558 + 559 + test "trusted host detection" { 560 + try std.testing.expect(isTrustedHost("pds-123.host.bsky.network")); 561 + try std.testing.expect(isTrustedHost("abc.host.bsky.network")); 562 + try std.testing.expect(!isTrustedHost("bsky.network")); 563 + try std.testing.expect(!isTrustedHost("evil.bsky.network")); 564 + try std.testing.expect(!isTrustedHost("pds.example.com")); 437 565 } 438 566 439 567 test "error frame (op=-1) is detected" {