atproto relay implementation in zig zlay.waow.tech

add per-host configurable account limits

zlay is at 99.82% on pulsar — missing ~420 users traced to high-volume
third-party hosts (blacksky.app, eurosky.social, northsky.social) whose
events get rate-limited by the per-host formula (2500 + account_count).

adds admin API to override the effective account count per host, which
scales the rate limits accordingly. changes:

- account_limit nullable column on host table (NULL = use COUNT(*))
- getEffectiveAccountCount uses COALESCE(override, actual_count)
- POST /admin/hosts/changeLimits to set/clear overrides
- atomic rate limiter fields for safe cross-thread updates
- updateHostLimits on Slurper to apply changes to running subscribers
- account_limit included in GET /admin/hosts response

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+145 -32
+60 -7
src/api/admin.zig
··· 106 106 107 107 for (hosts, 0..) |host, i| { 108 108 if (i > 0) w.writeByte(',') catch return; 109 - std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d}}}", .{ 110 - host.id, 111 - host.hostname, 112 - host.status, 113 - host.last_seq, 114 - host.failed_attempts, 115 - }) catch return; 109 + if (host.account_limit) |limit| { 110 + std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d},\"account_limit\":{d}}}", .{ 111 + host.id, 112 + host.hostname, 113 + host.status, 114 + host.last_seq, 115 + host.failed_attempts, 116 + limit, 117 + }) catch return; 118 + } else { 119 + std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d},\"account_limit\":null}}", .{ 120 + host.id, 121 + host.hostname, 122 + host.status, 123 + host.last_seq, 124 + host.failed_attempts, 125 + }) catch return; 126 + } 116 127 } 117 128 118 129 std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; ··· 163 174 persist.resetHostFailures(host_info.id) catch {}; 164 175 165 176 log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 177 + h.respondJson(conn, .ok, "{\"success\":true}"); 178 + } 179 + 180 + /// set or clear the account_limit override for a host. 181 + /// POST {"host": "...", "account_limit": 100000} — set override 182 + /// POST {"host": "...", "account_limit": null} — clear override (revert to COUNT(*)) 183 + pub fn handleAdminChangeLimits(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 184 + if (!checkAdmin(conn, headers)) return; 185 + 186 + const parsed = std.json.parseFromSlice( 187 + struct { host: []const u8, account_limit: ?u64 }, 188 + ctx.persist.allocator, 189 + body, 190 + .{ .ignore_unknown_fields = true }, 191 + ) catch { 192 + h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"host\\\":\\\"...\\\",\\\"account_limit\\\":...}\"}"); 193 + return; 194 + }; 195 + defer parsed.deinit(); 196 + 197 + const host_id = ctx.persist.getHostIdForHostname(parsed.value.host) catch { 198 + h.respondJson(conn, .internal_server_error, "{\"error\":\"database error\"}"); 199 + return; 200 + } orelse { 201 + h.respondJson(conn, .not_found, "{\"error\":\"host not found\"}"); 202 + return; 203 + }; 204 + 205 + ctx.persist.setHostAccountLimit(host_id, parsed.value.account_limit) catch { 206 + h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to update limit\"}"); 207 + return; 208 + }; 209 + 210 + // update running subscriber's rate limits immediately 211 + const effective = if (parsed.value.account_limit) |l| l else ctx.persist.getHostAccountCount(host_id); 212 + ctx.slurper.updateHostLimits(host_id, effective); 213 + 214 + if (parsed.value.account_limit) |limit| { 215 + log.info("admin: set account_limit for {s} (id={d}): {d}", .{ parsed.value.host, host_id, limit }); 216 + } else { 217 + log.info("admin: cleared account_limit for {s} (id={d}), reverted to COUNT(*)", .{ parsed.value.host, host_id }); 218 + } 166 219 h.respondJson(conn, .ok, "{\"success\":true}"); 167 220 } 168 221
+2
src/api/router.zig
··· 117 117 admin.handleAdminBlockHost(conn, body, headers, ctx.persist); 118 118 } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 119 119 admin.handleAdminUnblockHost(conn, body, headers, ctx.persist); 120 + } else if (std.mem.eql(u8, path, "/admin/hosts/changeLimits")) { 121 + admin.handleAdminChangeLimits(conn, body, headers, ctx); 120 122 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 121 123 admin.handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); 122 124 } else {
+38 -2
src/event_log.zig
··· 200 200 \\ status TEXT NOT NULL DEFAULT 'active', 201 201 \\ last_seq BIGINT NOT NULL DEFAULT 0, 202 202 \\ failed_attempts INTEGER NOT NULL DEFAULT 0, 203 + \\ account_limit BIGINT, 203 204 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(), 204 205 \\ updated_at TIMESTAMPTZ NOT NULL DEFAULT now() 205 206 \\) 206 207 , .{}); 208 + 209 + // migration: add account_limit column to existing host tables 210 + _ = try pool.exec("ALTER TABLE host ADD COLUMN IF NOT EXISTS account_limit BIGINT", .{}); 207 211 208 212 _ = try pool.exec( 209 213 \\CREATE TABLE IF NOT EXISTS backfill_progress ( ··· 403 407 return if (count > 0) @intCast(count) else 0; 404 408 } 405 409 410 + /// effective account count for rate limit scaling. 411 + /// uses admin-configured limit if set, otherwise actual COUNT(*). 412 + pub fn getEffectiveAccountCount(self: *DiskPersist, host_id: u64) u64 { 413 + var row = (self.db.rowUnsafe( 414 + "SELECT COALESCE(h.account_limit, COUNT(a.uid)) FROM host h LEFT JOIN account a ON a.host_id = h.id WHERE h.id = $1 GROUP BY h.id", 415 + .{@as(i64, @intCast(host_id))}, 416 + ) catch return 0) orelse return 0; 417 + defer row.deinit() catch {}; 418 + const count = row.get(i64, 0); 419 + return if (count > 0) @intCast(count) else 0; 420 + } 421 + 422 + /// set admin-configured account limit for a host (overrides COUNT(*) for rate limiting). 423 + /// pass null to clear the override and revert to actual COUNT(*). 424 + pub fn setHostAccountLimit(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 425 + if (limit) |l| { 426 + const clamped: i64 = if (l > @as(u64, @intCast(std.math.maxInt(i64)))) std.math.maxInt(i64) else @intCast(l); 427 + _ = try self.db.exec( 428 + "UPDATE host SET account_limit = $2, updated_at = now() WHERE id = $1", 429 + .{ @as(i64, @intCast(host_id)), clamped }, 430 + ); 431 + } else { 432 + _ = try self.db.exec( 433 + "UPDATE host SET account_limit = NULL, updated_at = now() WHERE id = $1", 434 + .{@as(i64, @intCast(host_id))}, 435 + ); 436 + } 437 + } 438 + 406 439 /// set the host_id for an account (first encounter or migration) 407 440 pub fn setAccountHostId(self: *DiskPersist, uid: u64, host_id: u64) !void { 408 441 _ = try self.db.exec( ··· 444 477 status: []const u8, 445 478 last_seq: u64, 446 479 failed_attempts: u32, 480 + account_limit: ?u64 = null, 447 481 }; 448 482 449 483 /// get or create a host row. returns {id, last_seq}. ··· 519 553 } 520 554 521 555 var result = try self.db.query( 522 - "SELECT id, hostname, status, last_seq, failed_attempts FROM host WHERE status = 'active' ORDER BY id ASC", 556 + "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE status = 'active' ORDER BY id ASC", 523 557 .{}, 524 558 ); 525 559 defer result.deinit(); ··· 531 565 .status = try allocator.dupe(u8, row.get([]const u8, 2)), 532 566 .last_seq = @intCast(row.get(i64, 3)), 533 567 .failed_attempts = @intCast(row.get(i32, 4)), 568 + .account_limit = if (row.get(?i64, 5)) |v| @as(?u64, @intCast(v)) else null, 534 569 }); 535 570 } 536 571 ··· 549 584 } 550 585 551 586 var result = try self.db.query( 552 - "SELECT id, hostname, status, last_seq, failed_attempts FROM host ORDER BY id ASC", 587 + "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host ORDER BY id ASC", 553 588 .{}, 554 589 ); 555 590 defer result.deinit(); ··· 561 596 .status = try allocator.dupe(u8, row.get([]const u8, 2)), 562 597 .last_seq = @intCast(row.get(i64, 3)), 563 598 .failed_attempts = @intCast(row.get(i32, 4)), 599 + .account_limit = if (row.get(?i64, 5)) |v| @as(?u64, @intCast(v)) else null, 564 600 }); 565 601 } 566 602
+16 -1
src/slurper.zig
··· 432 432 const sub = try self.allocator.create(subscriber_mod.Subscriber); 433 433 errdefer self.allocator.destroy(sub); 434 434 435 - const account_count: u64 = self.persist.getHostAccountCount(host_id); 435 + const account_count: u64 = self.persist.getEffectiveAccountCount(host_id); 436 436 437 437 sub.* = subscriber_mod.Subscriber.init( 438 438 self.allocator, ··· 541 541 self.workers_mutex.lock(); 542 542 defer self.workers_mutex.unlock(); 543 543 return self.workers.count(); 544 + } 545 + 546 + /// update rate limits for a running subscriber (called from admin API). 547 + /// if the host has a worker, recomputes and applies new limits immediately. 548 + pub fn updateHostLimits(self: *Slurper, host_id: u64, account_count: u64) void { 549 + self.workers_mutex.lock(); 550 + defer self.workers_mutex.unlock(); 551 + if (self.workers.get(host_id)) |entry| { 552 + const trusted = subscriber_mod.isTrustedHost(entry.subscriber.options.hostname); 553 + const limits = subscriber_mod.computeLimits(trusted, account_count); 554 + entry.subscriber.rate_limiter.updateLimits(limits.sec, limits.hour, limits.day); 555 + log.info("updated rate limits for host_id={d}: sec={d} hour={d} day={d}", .{ 556 + host_id, limits.sec, limits.hour, limits.day, 557 + }); 558 + } 544 559 } 545 560 546 561 /// shutdown all workers and clean up
+29 -22
src/subscriber.zig
··· 35 35 // Go relay: TrustedDomains config — hosts matching these suffixes get trusted limits 36 36 const trusted_suffixes: []const []const u8 = &.{".host.bsky.network"}; 37 37 38 - fn isTrustedHost(hostname: []const u8) bool { 38 + pub fn isTrustedHost(hostname: []const u8) bool { 39 39 for (trusted_suffixes) |suffix| { 40 40 if (std.mem.endsWith(u8, hostname, suffix)) return true; 41 41 } ··· 43 43 } 44 44 45 45 /// compute rate limits scaled by account count (matches Go relay: slurper.go ComputeLimiterCounts) 46 - fn computeLimits(trusted: bool, account_count: u64) struct { sec: u64, hour: u64, day: u64 } { 46 + pub fn computeLimits(trusted: bool, account_count: u64) struct { sec: u64, hour: u64, day: u64 } { 47 47 if (trusted) return .{ 48 48 .sec = trusted_per_second_limit, 49 49 .hour = trusted_per_hour_limit, ··· 74 74 hour: SlidingWindow = .{ .size_ms = 3_600_000 }, 75 75 day: SlidingWindow = .{ .size_ms = 86_400_000 }, 76 76 77 - sec_limit: u64 = default_per_second_limit, 78 - hour_limit: u64 = default_per_hour_limit, 79 - day_limit: u64 = default_per_day_limit, 77 + sec_limit: std.atomic.Value(u64) = .{ .raw = default_per_second_limit }, 78 + hour_limit: std.atomic.Value(u64) = .{ .raw = default_per_hour_limit }, 79 + day_limit: std.atomic.Value(u64) = .{ .raw = default_per_day_limit }, 80 80 81 81 const SlidingWindow = struct { 82 82 size_ms: i64, ··· 109 109 self.hour.advance(now_ms); 110 110 self.day.advance(now_ms); 111 111 112 - if (self.sec.effectiveCount(now_ms) >= self.sec_limit) return .sec; 113 - if (self.hour.effectiveCount(now_ms) >= self.hour_limit) return .hour; 114 - if (self.day.effectiveCount(now_ms) >= self.day_limit) return .day; 112 + if (self.sec.effectiveCount(now_ms) >= self.sec_limit.load(.monotonic)) return .sec; 113 + if (self.hour.effectiveCount(now_ms) >= self.hour_limit.load(.monotonic)) return .hour; 114 + if (self.day.effectiveCount(now_ms) >= self.day_limit.load(.monotonic)) return .day; 115 115 116 116 self.sec.curr_count += 1; 117 117 self.hour.curr_count += 1; ··· 129 129 self.hour.advance(now_ms); 130 130 self.day.advance(now_ms); 131 131 132 - if (self.sec.effectiveCount(now_ms) < self.sec_limit and 133 - self.hour.effectiveCount(now_ms) < self.hour_limit and 134 - self.day.effectiveCount(now_ms) < self.day_limit) 132 + if (self.sec.effectiveCount(now_ms) < self.sec_limit.load(.monotonic) and 133 + self.hour.effectiveCount(now_ms) < self.hour_limit.load(.monotonic) and 134 + self.day.effectiveCount(now_ms) < self.day_limit.load(.monotonic)) 135 135 { 136 136 self.sec.curr_count += 1; 137 137 self.hour.curr_count += 1; ··· 149 149 self.hour.advance(t); 150 150 self.day.advance(t); 151 151 152 - if (self.sec.effectiveCount(t) >= self.sec_limit) { 152 + if (self.sec.effectiveCount(t) >= self.sec_limit.load(.monotonic)) { 153 153 waited = .sec; 154 154 continue; 155 155 } 156 - if (self.hour.effectiveCount(t) >= self.hour_limit) { 156 + if (self.hour.effectiveCount(t) >= self.hour_limit.load(.monotonic)) { 157 157 waited = .hour; 158 158 continue; 159 159 } 160 - if (self.day.effectiveCount(t) >= self.day_limit) { 160 + if (self.day.effectiveCount(t) >= self.day_limit.load(.monotonic)) { 161 161 waited = .day; 162 162 continue; 163 163 } ··· 169 169 } 170 170 return waited; 171 171 } 172 + 173 + /// update rate limits from another thread (e.g. admin API). 174 + pub fn updateLimits(self: *RateLimiter, sec: u64, hour: u64, day: u64) void { 175 + self.sec_limit.store(sec, .monotonic); 176 + self.hour_limit.store(hour, .monotonic); 177 + self.day_limit.store(day, .monotonic); 178 + } 172 179 }; 173 180 174 181 pub const Subscriber = struct { ··· 205 212 .persist = persist, 206 213 .shutdown = shutdown, 207 214 .rate_limiter = .{ 208 - .sec_limit = limits.sec, 209 - .hour_limit = limits.hour, 210 - .day_limit = limits.day, 215 + .sec_limit = .{ .raw = limits.sec }, 216 + .hour_limit = .{ .raw = limits.hour }, 217 + .day_limit = .{ .raw = limits.day }, 211 218 }, 212 219 }; 213 220 } ··· 769 776 } 770 777 771 778 test "rate limiter enforces per-second limit" { 772 - var rl: RateLimiter = .{ .sec_limit = 3, .hour_limit = 1000, .day_limit = 10000 }; 779 + var rl: RateLimiter = .{ .sec_limit = .{ .raw = 3 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } }; 773 780 774 781 const now: i64 = 1_000_000_000; 775 782 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); ··· 785 792 } 786 793 787 794 test "rate limiter enforces per-hour limit" { 788 - var rl: RateLimiter = .{ .sec_limit = 1000, .hour_limit = 5, .day_limit = 10000 }; 795 + var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1000 }, .hour_limit = .{ .raw = 5 }, .day_limit = .{ .raw = 10000 } }; 789 796 790 797 const now: i64 = 3_600_000 * 100; 791 798 for (0..5) |_| { ··· 801 808 } 802 809 803 810 test "sliding window interpolates previous count by elapsed time" { 804 - var rl: RateLimiter = .{ .sec_limit = 10, .hour_limit = 1_000_000, .day_limit = 1_000_000 }; 811 + var rl: RateLimiter = .{ .sec_limit = .{ .raw = 10 }, .hour_limit = .{ .raw = 1_000_000 }, .day_limit = .{ .raw = 1_000_000 } }; 805 812 806 813 const base: i64 = 1_000_000_000; 807 814 for (0..8) |_| { ··· 824 831 test "waitForAllow blocks then allows after window advances" { 825 832 // verify that waitForAllow returns a non-.allowed result when the limit was hit, 826 833 // indicating it had to wait. We use a tiny limit so the fast path is exhausted. 827 - var rl: RateLimiter = .{ .sec_limit = 1, .hour_limit = 1000, .day_limit = 10000 }; 834 + var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } }; 828 835 var shutdown = std.atomic.Value(bool){ .raw = false }; 829 836 830 837 // first call takes the fast path ··· 841 848 } 842 849 843 850 test "waitForAllow respects shutdown" { 844 - var rl: RateLimiter = .{ .sec_limit = 1, .hour_limit = 1000, .day_limit = 10000 }; 851 + var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } }; 845 852 var shutdown = std.atomic.Value(bool){ .raw = false }; 846 853 847 854 // exhaust the limit