atproto relay implementation in zig zlay.waow.tech

feat: merge API + WebSocket onto single port, add memory metrics

serve all HTTP endpoints via websocket.zig httpFallback on the main
WebSocket port (3000). the old HttpServer becomes a minimal MetricsServer
on RELAY_METRICS_PORT (3001) serving only /metrics.

adds process-level metrics (RSS, thread count on linux) and validator
cache/migration queue gauges to the prometheus endpoint.

updates zat to v0.2.10 and websocket.zig to 9e6d732 (HTTP fallback support).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+357 -245
+4 -4
build.zig.zon
··· 5 5 .minimum_zig_version = "0.15.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.org/zat.dev/zat/archive/v0.2.8.tar.gz", 9 - .hash = "zat-0.2.8-5PuC7n-4BACVD9BxQgIOOUh2XZDxp735dXw0E5eJmtc8", 8 + .url = "https://tangled.org/zat.dev/zat/archive/v0.2.10.tar.gz", 9 + .hash = "zat-0.2.10-5PuC7oC4BAC5U6sOP0i-dLQjjAAuy3fTkSpM7dZBvUuu", 10 10 }, 11 11 .websocket = .{ 12 - .url = "https://github.com/zzstoatzz/websocket.zig/archive/50f179cfcb307e4872386658160a451fcdc7738d.tar.gz", 13 - .hash = "websocket-0.1.0-ZPISddZzAwC_4VR6gW24TQVEVfplV0lEr16AyH0InGMX", 12 + .url = "https://github.com/zzstoatzz/websocket.zig/archive/9e6d732b207bdb0cb5fe5efb37a8173ac9638051.tar.gz", 13 + .hash = "websocket-0.1.0-ZPISdeJ2AwC8rczCVo9NwFzIzW7EdvoXlNkNR_P-bdaf", 14 14 }, 15 15 .pg = .{ 16 16 .url = "git+https://github.com/karlseguin/pg.zig?ref=master#e58b318b7867ef065b3135983f829219c5eef891",
+95 -5
src/broadcaster.zig
··· 13 13 const ring_buffer = @import("ring_buffer.zig"); 14 14 const event_log_mod = @import("event_log.zig"); 15 15 16 + const builtin = @import("builtin"); 16 17 const Allocator = std.mem.Allocator; 17 18 const log = std.log.scoped(.relay); 18 19 ··· 272 273 const history_capacity = 50_000; 273 274 const FrameHistory = ring_buffer.RingBuffer(history_capacity); 274 275 276 + /// function pointer for HTTP fallback handler (set by main.zig) 277 + pub const HttpFallbackFn = *const fn ( 278 + *websocket.Conn, 279 + []const u8, 280 + []const u8, 281 + []const u8, 282 + *const websocket.Handshake.KeyValue, 283 + ?*anyopaque, 284 + ) void; 285 + 275 286 pub const Broadcaster = struct { 276 287 allocator: Allocator, 277 288 consumers: std.ArrayListUnmanaged(*Consumer) = .{}, ··· 280 291 persist: ?*event_log_mod.DiskPersist = null, 281 292 stats: Stats = .{}, 282 293 error_frame: ?[]const u8 = null, 294 + http_fallback: ?HttpFallbackFn = null, 295 + http_fallback_ctx: ?*anyopaque = null, 283 296 284 297 pub fn init(allocator: Allocator) Broadcaster { 285 298 return .{ ··· 513 526 pub fn close(self: *Handler) void { 514 527 if (self.consumer) |c| self.broadcaster.removeConsumer(c); 515 528 } 529 + 530 + /// called by websocket server for non-WebSocket HTTP requests 531 + pub fn httpFallback( 532 + conn: *websocket.Conn, 533 + method: []const u8, 534 + url: []const u8, 535 + body: []const u8, 536 + headers: *const websocket.Handshake.KeyValue, 537 + ctx: *Broadcaster, 538 + ) void { 539 + if (ctx.http_fallback) |f| { 540 + f(conn, method, url, body, headers, ctx.http_fallback_ctx); 541 + } 542 + } 516 543 }; 517 544 518 - pub fn formatPrometheusMetrics(stats: *const Stats, buf: []u8) []const u8 { 545 + pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, migration_queue_len: usize, buf: []u8) []const u8 { 519 546 const uptime: i64 = std.time.timestamp() - stats.start_time; 520 - return std.fmt.bufPrint(buf, 547 + var fbs = std.io.fixedBufferStream(buf); 548 + const w = fbs.writer(); 549 + 550 + std.fmt.format(w, 521 551 \\# TYPE relay_frames_received_total counter 522 552 \\relay_frames_received_total {d} 523 553 \\ ··· 557 587 \\# TYPE relay_uptime_seconds gauge 558 588 \\relay_uptime_seconds {d} 559 589 \\ 590 + \\# TYPE relay_validator_cache_entries gauge 591 + \\relay_validator_cache_entries {d} 592 + \\ 593 + \\# TYPE relay_validator_migration_queue gauge 594 + \\relay_validator_migration_queue {d} 595 + \\ 560 596 , .{ 561 597 stats.frames_in.load(.acquire), 562 598 stats.frames_out.load(.acquire), ··· 573 609 stats.seq.load(.acquire), 574 610 stats.relay_seq.load(.acquire), 575 611 uptime, 576 - }) catch ""; 612 + cache_entries, 613 + migration_queue_len, 614 + }) catch return fbs.getWritten(); 615 + 616 + // linux-only process metrics from /proc 617 + if (comptime builtin.os.tag == .linux) { 618 + appendProcMetrics(w); 619 + } 620 + 621 + return fbs.getWritten(); 622 + } 623 + 624 + fn appendProcMetrics(w: anytype) void { 625 + // RSS from /proc/self/statm (field[1] * page_size) 626 + if (std.fs.openFileAbsolute("/proc/self/statm", .{})) |f| { 627 + defer f.close(); 628 + var statm_buf: [256]u8 = undefined; 629 + if (f.reader().readAll(&statm_buf)) |n| { 630 + const line = statm_buf[0..n]; 631 + var iter = std.mem.splitScalar(u8, line, ' '); 632 + _ = iter.next(); // skip total pages 633 + if (iter.next()) |rss_pages| { 634 + if (std.fmt.parseInt(u64, rss_pages, 10)) |pages| { 635 + std.fmt.format(w, 636 + \\# TYPE relay_process_rss_bytes gauge 637 + \\relay_process_rss_bytes {d} 638 + \\ 639 + , .{pages * 4096}) catch {}; 640 + } else |_| {} 641 + } 642 + } else |_| {} 643 + } else |_| {} 644 + 645 + // thread count from /proc/self/status 646 + if (std.fs.openFileAbsolute("/proc/self/status", .{})) |f| { 647 + defer f.close(); 648 + var status_buf: [4096]u8 = undefined; 649 + if (f.reader().readAll(&status_buf)) |n| { 650 + const content = status_buf[0..n]; 651 + if (std.mem.indexOf(u8, content, "Threads:")) |pos| { 652 + const rest = content[pos + "Threads:".len ..]; 653 + const trimmed = std.mem.trimLeft(u8, rest, " \t"); 654 + const end = std.mem.indexOfScalar(u8, trimmed, '\n') orelse trimmed.len; 655 + if (std.fmt.parseInt(u64, trimmed[0..end], 10)) |threads| { 656 + std.fmt.format(w, 657 + \\# TYPE relay_threads_total gauge 658 + \\relay_threads_total {d} 659 + \\ 660 + , .{threads}) catch {}; 661 + } else |_| {} 662 + } 663 + } else |_| {} 664 + } else |_| {} 577 665 } 578 666 579 667 pub fn formatStatsResponse(stats: *const Stats, buf: []u8) []const u8 { ··· 685 773 stats.cache_hits.store(400, .release); 686 774 stats.cache_misses.store(100, .release); 687 775 688 - var buf: [4096]u8 = undefined; 689 - const output = formatPrometheusMetrics(&stats, &buf); 776 + var buf: [8192]u8 = undefined; 777 + const output = formatPrometheusMetrics(&stats, 42, 3, &buf); 690 778 691 779 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_received_total 10000") != null); 692 780 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_broadcast_total 9000") != null); ··· 697 785 try std.testing.expect(std.mem.indexOf(u8, output, "relay_upstream_seq 99999") != null); 698 786 try std.testing.expect(std.mem.indexOf(u8, output, "relay_seq 12345") != null); 699 787 try std.testing.expect(std.mem.indexOf(u8, output, "# TYPE relay_uptime_seconds gauge") != null); 788 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_cache_entries 42") != null); 789 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_migration_queue 3") != null); 700 790 } 701 791 702 792 test "formatStatsResponse produces valid JSON" {
+251 -236
src/main.zig
··· 5 5 //! to disk with relay-assigned seq numbers, and rebroadcasts to downstream 6 6 //! consumers over WebSocket. 7 7 //! 8 - //! endpoints: 8 + //! port 3000 (RELAY_PORT): WebSocket firehose + HTTP API (via httpFallback) 9 9 //! /xrpc/com.atproto.sync.subscribeRepos — firehose WebSocket (supports ?cursor=N) 10 10 //! /xrpc/com.atproto.sync.listRepos — paginated account listing 11 11 //! /xrpc/com.atproto.sync.getRepoStatus — single account status ··· 17 17 //! /admin/hosts — list all hosts (GET, admin) 18 18 //! /admin/hosts/block — block a host (POST, admin) 19 19 //! /admin/hosts/unblock — unblock a host (POST, admin) 20 - //! /_health, /_stats, /metrics — health, stats, prometheus 20 + //! /_health, /_stats — health, stats 21 + //! 22 + //! port 3001 (RELAY_METRICS_PORT): internal metrics only 23 + //! /metrics — prometheus metrics 21 24 22 25 const std = @import("std"); 23 26 const http = std.http; ··· 38 41 39 42 var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 40 43 41 - /// HTTP server state — shared so main can close the socket to unblock accept() 42 - const HttpServer = struct { 43 - server: std.net.Server, 44 + /// context for HTTP fallback handlers (passed as opaque pointer through broadcaster) 45 + const HttpContext = struct { 44 46 stats: *broadcaster.Stats, 45 47 persist: *event_log_mod.DiskPersist, 46 48 slurper: *slurper_mod.Slurper, 47 49 collection_index: *collection_index_mod.CollectionIndex, 48 50 backfiller: *backfill_mod.Backfiller, 49 51 bc: *broadcaster.Broadcaster, 52 + validator: *validator_mod.Validator, 53 + }; 50 54 51 - fn run(self: *HttpServer) void { 55 + /// metrics-only server on the internal port 56 + const MetricsServer = struct { 57 + server: std.net.Server, 58 + stats: *broadcaster.Stats, 59 + validator: *validator_mod.Validator, 60 + 61 + fn run(self: *MetricsServer) void { 52 62 while (!shutdown_flag.load(.acquire)) { 53 63 const conn = self.server.accept() catch |err| { 54 64 if (shutdown_flag.load(.acquire)) return; 55 - log.debug("http accept error: {s}", .{@errorName(err)}); 65 + log.debug("metrics accept error: {s}", .{@errorName(err)}); 56 66 continue; 57 67 }; 58 - handleHttpConn(conn.stream, self.stats, self.persist, self.slurper, self.collection_index, self.backfiller, self.bc); 68 + handleMetricsConn(conn.stream, self.stats, self.validator); 59 69 } 60 70 } 61 71 }; 62 72 73 + fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator) void { 74 + defer stream.close(); 75 + 76 + var recv_buf: [4096]u8 = undefined; 77 + var send_buf: [4096]u8 = undefined; 78 + var connection_reader = stream.reader(&recv_buf); 79 + var connection_writer = stream.writer(&send_buf); 80 + var server = http.Server.init(connection_reader.interface(), &connection_writer.interface); 81 + 82 + var request = server.receiveHead() catch return; 83 + 84 + const cache_entries = validator.cacheSize(); 85 + const migration_queue_len = validator.migrationQueueLen(); 86 + 87 + var metrics_buf: [8192]u8 = undefined; 88 + const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, &metrics_buf); 89 + request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 90 + .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 91 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 92 + } }) catch {}; 93 + } 94 + 63 95 pub fn main() !void { 64 96 // use libc allocator — glibc malloc has per-thread arenas, madvise-based page 65 97 // return, and proven fragmentation mitigation. GPA is a debug allocator that ··· 68 100 69 101 // parse config from env 70 102 const port = parseEnvInt(u16, "RELAY_PORT", 3000); 71 - const http_port = parseEnvInt(u16, "RELAY_HTTP_PORT", 3001); 103 + const metrics_port = parseEnvInt(u16, "RELAY_METRICS_PORT", 3001); 72 104 const upstream = std.posix.getenv("RELAY_UPSTREAM") orelse "bsky.network"; 73 105 const data_dir = std.posix.getenv("RELAY_DATA_DIR") orelse "data/events"; 74 106 const retention_hours = parseEnvInt(u64, "RELAY_RETENTION_HOURS", 72); ··· 136 168 // start GC thread (runs every 10 minutes) 137 169 const gc_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, gcLoop, .{&dp}); 138 170 139 - // start HTTP health/stats server 140 - const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, http_port); 141 - var http_srv = HttpServer{ 142 - .server = address.listen(.{ .reuse_address = true }) catch |err| { 143 - log.err("http server failed to listen on :{d}: {s}", .{ http_port, @errorName(err) }); 144 - return err; 145 - }, 171 + // wire HTTP fallback into broadcaster (all API endpoints served on WS port) 172 + var http_context = HttpContext{ 146 173 .stats = &bc.stats, 147 174 .persist = &dp, 148 175 .slurper = &slurper, 149 176 .collection_index = &ci, 150 177 .backfiller = &backfiller, 151 178 .bc = &bc, 179 + .validator = &val, 152 180 }; 153 - const http_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, HttpServer.run, .{&http_srv}); 181 + bc.http_fallback = handleHttpRequest; 182 + bc.http_fallback_ctx = @ptrCast(&http_context); 154 183 155 - // start downstream WebSocket server 156 - log.info("relay listening on :{d} (ws), :{d} (http)", .{ port, http_port }); 184 + // start metrics-only server (internal port) 185 + const metrics_address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, metrics_port); 186 + var metrics_srv = MetricsServer{ 187 + .server = metrics_address.listen(.{ .reuse_address = true }) catch |err| { 188 + log.err("metrics server failed to listen on :{d}: {s}", .{ metrics_port, @errorName(err) }); 189 + return err; 190 + }, 191 + .stats = &bc.stats, 192 + .validator = &val, 193 + }; 194 + const metrics_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, MetricsServer.run, .{&metrics_srv}); 195 + 196 + // start downstream WebSocket server (also serves HTTP API via httpFallback) 197 + log.info("relay listening on :{d} (ws+http), :{d} (metrics)", .{ port, metrics_port }); 157 198 log.info("seed host: {s}", .{upstream}); 158 199 log.info("data dir: {s} (retention: {d}h)", .{ data_dir, retention_hours }); 159 200 ··· 181 222 // wait for GC thread 182 223 gc_thread.join(); 183 224 184 - // close HTTP listener socket to unblock accept(), then join 185 - http_srv.server.stream.close(); 186 - http_thread.join(); 225 + // close metrics listener socket to unblock accept(), then join 226 + metrics_srv.server.stream.close(); 227 + metrics_thread.join(); 187 228 188 229 log.info("relay stopped cleanly", .{}); 189 230 } ··· 228 269 std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 229 270 } 230 271 231 - fn handleHttpConn(stream: std.net.Stream, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex, backfiller: *backfill_mod.Backfiller, bc: *broadcaster.Broadcaster) void { 232 - defer stream.close(); 272 + // --- HTTP fallback handler (called from broadcaster via websocket httpFallback) --- 233 273 234 - var recv_buf: [8192]u8 = undefined; 235 - var send_buf: [8192]u8 = undefined; 236 - var connection_reader = stream.reader(&recv_buf); 237 - var connection_writer = stream.writer(&send_buf); 238 - var server = http.Server.init(connection_reader.interface(), &connection_writer.interface); 274 + fn handleHttpRequest( 275 + conn: *websocket.Conn, 276 + method: []const u8, 277 + url: []const u8, 278 + body: []const u8, 279 + headers: *const websocket.Handshake.KeyValue, 280 + opaque_ctx: ?*anyopaque, 281 + ) void { 282 + const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return)); 239 283 240 - var request = server.receiveHead() catch return; 241 - 242 - const target = request.head.target; 243 - // extract path and query before reading body (head strings reference recv_buf) 244 - const qmark = std.mem.indexOfScalar(u8, target, '?'); 245 - const path = target[0..(qmark orelse target.len)]; 246 - const query = if (qmark) |q| target[q + 1 ..] else ""; 284 + const qmark = std.mem.indexOfScalar(u8, url, '?'); 285 + const path = url[0..(qmark orelse url.len)]; 286 + const query = if (qmark) |q| url[q + 1 ..] else ""; 247 287 248 - if (request.head.method == .GET) { 249 - handleGet(&request, path, query, stats, persist, slurper, ci, backfiller); 250 - } else if (request.head.method == .POST) { 251 - handlePost(&request, path, query, persist, slurper, backfiller, bc); 288 + if (std.mem.eql(u8, method, "GET")) { 289 + handleGet(conn, path, query, headers, ctx); 290 + } else if (std.mem.eql(u8, method, "POST")) { 291 + handlePost(conn, path, query, body, headers, ctx); 252 292 } else { 253 - respondText(&request, .method_not_allowed, "method not allowed"); 293 + respondText(conn, .method_not_allowed, "method not allowed"); 254 294 } 255 295 } 256 296 257 - fn handleGet(request: *http.Server.Request, path: []const u8, query: []const u8, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex, backfiller: *backfill_mod.Backfiller) void { 297 + fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 258 298 if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 259 - respondJson(request, .ok, "{\"status\":\"ok\"}"); 299 + respondJson(conn, .ok, "{\"status\":\"ok\"}"); 260 300 } else if (std.mem.eql(u8, path, "/_stats")) { 261 301 var stats_buf: [4096]u8 = undefined; 262 - const body = broadcaster.formatStatsResponse(stats, &stats_buf); 263 - respondJson(request, .ok, body); 264 - } else if (std.mem.eql(u8, path, "/metrics")) { 265 - var metrics_buf: [4096]u8 = undefined; 266 - const body = broadcaster.formatPrometheusMetrics(stats, &metrics_buf); 267 - request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 268 - .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 269 - .{ .name = "server", .value = "zlay (atproto-relay)" }, 270 - } }) catch {}; 302 + const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf); 303 + respondJson(conn, .ok, body); 271 304 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 272 - handleListRepos(request, query, persist); 305 + handleListRepos(conn, query, ctx.persist); 273 306 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 274 - handleGetRepoStatus(request, query, persist); 307 + handleGetRepoStatus(conn, query, ctx.persist); 275 308 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 276 - handleGetLatestCommit(request, query, persist); 309 + handleGetLatestCommit(conn, query, ctx.persist); 277 310 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 278 - handleListReposByCollection(request, query, ci); 311 + handleListReposByCollection(conn, query, ctx.collection_index); 279 312 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 280 - handleListHosts(request, query, persist); 313 + handleListHosts(conn, query, ctx.persist); 281 314 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 282 - handleGetHostStatus(request, query, persist); 315 + handleGetHostStatus(conn, query, ctx.persist); 283 316 } else if (std.mem.eql(u8, path, "/admin/hosts")) { 284 - handleAdminListHosts(request, persist, slurper); 317 + handleAdminListHosts(conn, headers, ctx); 285 318 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 286 - handleAdminBackfillStatus(request, backfiller); 319 + handleAdminBackfillStatus(conn, headers, ctx); 287 320 } else if (std.mem.eql(u8, path, "/")) { 288 - respondText(request, .ok, 321 + respondText(conn, .ok, 289 322 \\ _ 290 323 \\ ___| | __ _ _ _ 291 324 \\|_ / |/ _` | | | | ··· 300 333 \\ 301 334 ); 302 335 } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) { 303 - request.respond( 336 + httpRespond(conn, .ok, "image/svg+xml", 304 337 \\<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32"> 305 338 \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/> 306 339 \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text> 307 340 \\</svg> 308 - , .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 309 - .{ .name = "content-type", .value = "image/svg+xml" }, 310 - .{ .name = "server", .value = "zlay (atproto-relay)" }, 311 - } }) catch {}; 341 + ); 312 342 } else { 313 - respondText(request, .not_found, "not found"); 343 + respondText(conn, .not_found, "not found"); 314 344 } 315 345 } 316 346 317 - fn handlePost(request: *http.Server.Request, path: []const u8, query: []const u8, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, backfiller: *backfill_mod.Backfiller, bc: *broadcaster.Broadcaster) void { 347 + fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 318 348 if (std.mem.eql(u8, path, "/admin/repo/ban")) { 319 - handleBan(request, persist, bc); 349 + handleBan(conn, body, headers, ctx); 320 350 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 321 - handleRequestCrawl(request, slurper); 351 + handleRequestCrawl(conn, body, ctx.slurper); 322 352 } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { 323 - handleAdminBlockHost(request, persist); 353 + handleAdminBlockHost(conn, body, headers, ctx.persist); 324 354 } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 325 - handleAdminUnblockHost(request, persist); 355 + handleAdminUnblockHost(conn, body, headers, ctx.persist); 326 356 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 327 - handleAdminBackfillTrigger(request, query, backfiller); 357 + handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); 328 358 } else { 329 - respondText(request, .not_found, "not found"); 359 + respondText(conn, .not_found, "not found"); 330 360 } 331 361 } 332 362 333 - fn handleBan(request: *http.Server.Request, persist: *event_log_mod.DiskPersist, bc: *broadcaster.Broadcaster) void { 334 - if (!checkAdmin(request)) return; 335 - 336 - // read body (after checkAdmin which uses iterateHeaders) 337 - var transfer_buf: [4096]u8 = undefined; 338 - const body_reader = request.readerExpectNone(&transfer_buf); 339 - var body_buf: [4096]u8 = undefined; 340 - const body_len = body_reader.readSliceShort(&body_buf) catch { 341 - respondJson(request, .bad_request, "{\"error\":\"failed to read request body\"}"); 342 - return; 343 - }; 344 - const body = body_buf[0..body_len]; 363 + fn handleBan(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 364 + if (!checkAdmin(conn, headers)) return; 345 365 346 - // parse JSON body for "did" field 347 - const parsed = std.json.parseFromSlice(struct { did: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 348 - respondJson(request, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); 366 + const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 367 + respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); 349 368 return; 350 369 }; 351 370 defer parsed.deinit(); 352 371 const did = parsed.value.did; 353 372 354 373 // resolve DID → UID and take down 355 - const uid = persist.uidForDid(did) catch { 356 - respondJson(request, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 374 + const uid = ctx.persist.uidForDid(did) catch { 375 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 357 376 return; 358 377 }; 359 - persist.takeDownUser(uid) catch { 360 - respondJson(request, .internal_server_error, "{\"error\":\"takedown failed\"}"); 378 + ctx.persist.takeDownUser(uid) catch { 379 + respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); 361 380 return; 362 381 }; 363 382 364 383 // emit #account event so downstream consumers see the takedown 365 - if (buildAccountFrame(persist.allocator, did)) |frame_bytes| { 366 - if (persist.persist(.account, uid, frame_bytes)) |relay_seq| { 367 - bc.stats.relay_seq.store(relay_seq, .release); 368 - const broadcast_data = broadcaster.resequenceFrame(persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 369 - bc.broadcast(relay_seq, broadcast_data); 384 + if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 385 + if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 386 + ctx.bc.stats.relay_seq.store(relay_seq, .release); 387 + const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 388 + ctx.bc.broadcast(relay_seq, broadcast_data); 370 389 log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 371 390 } else |err| { 372 391 log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); ··· 374 393 } 375 394 376 395 log.info("admin: banned {s} (uid={d})", .{ did, uid }); 377 - respondJson(request, .ok, "{\"success\":true}"); 396 + respondJson(conn, .ok, "{\"success\":true}"); 378 397 } 379 398 380 - fn handleRequestCrawl(request: *http.Server.Request, slurper: *slurper_mod.Slurper) void { 381 - var transfer_buf: [4096]u8 = undefined; 382 - const body_reader = request.readerExpectNone(&transfer_buf); 383 - var body_buf: [4096]u8 = undefined; 384 - const body_len = body_reader.readSliceShort(&body_buf) catch { 385 - respondJson(request, .bad_request, "{\"error\":\"failed to read request body\"}"); 386 - return; 387 - }; 388 - const body = body_buf[0..body_len]; 389 - 399 + fn handleRequestCrawl(conn: *websocket.Conn, body: []const u8, slurper: *slurper_mod.Slurper) void { 390 400 const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, slurper.allocator, body, .{ .ignore_unknown_fields = true }) catch { 391 - respondJson(request, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 401 + respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 392 402 return; 393 403 }; 394 404 defer parsed.deinit(); ··· 396 406 // fast validation: hostname format (Go relay does this synchronously in handler) 397 407 const hostname = slurper_mod.validateHostname(slurper.allocator, parsed.value.hostname) catch |err| { 398 408 log.warn("requestCrawl rejected '{s}': {s}", .{ parsed.value.hostname, @errorName(err) }); 399 - respondJson(request, .bad_request, switch (err) { 409 + respondJson(conn, .bad_request, switch (err) { 400 410 error.EmptyHostname => "{\"error\":\"empty hostname\"}", 401 411 error.InvalidCharacter => "{\"error\":\"hostname contains invalid characters\"}", 402 412 error.InvalidLabel => "{\"error\":\"hostname has invalid label\"}", ··· 413 423 // fast validation: domain ban check 414 424 if (slurper.persist.isDomainBanned(hostname)) { 415 425 log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 416 - respondJson(request, .bad_request, "{\"error\":\"domain is banned\"}"); 426 + respondJson(conn, .bad_request, "{\"error\":\"domain is banned\"}"); 417 427 return; 418 428 } 419 429 420 430 // enqueue for async processing (describeServer check happens in crawl processor) 421 431 slurper.addCrawlRequest(hostname) catch { 422 - respondJson(request, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); 432 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); 423 433 return; 424 434 }; 425 435 426 436 log.info("crawl requested: {s}", .{hostname}); 427 - respondJson(request, .ok, "{\"success\":true}"); 437 + respondJson(conn, .ok, "{\"success\":true}"); 428 438 } 429 439 430 440 // --- admin host management --- 431 441 432 - fn handleAdminListHosts(request: *http.Server.Request, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper) void { 433 - if (!checkAdmin(request)) return; 442 + fn handleAdminListHosts(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 443 + if (!checkAdmin(conn, headers)) return; 434 444 445 + const persist = ctx.persist; 435 446 const hosts = persist.listAllHosts(persist.allocator) catch { 436 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 447 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 437 448 return; 438 449 }; 439 450 defer { ··· 461 472 }) catch return; 462 473 } 463 474 464 - std.fmt.format(w, "],\"active_workers\":{d}}}", .{slurper.workerCount()}) catch return; 465 - respondJson(request, .ok, list.items); 475 + std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; 476 + respondJson(conn, .ok, list.items); 466 477 } 467 478 468 - fn handleAdminBlockHost(request: *http.Server.Request, persist: *event_log_mod.DiskPersist) void { 469 - if (!checkAdmin(request)) return; 470 - 471 - var transfer_buf: [4096]u8 = undefined; 472 - const body_reader = request.readerExpectNone(&transfer_buf); 473 - var body_buf: [4096]u8 = undefined; 474 - const body_len = body_reader.readSliceShort(&body_buf) catch { 475 - respondJson(request, .bad_request, "{\"error\":\"failed to read request body\"}"); 476 - return; 477 - }; 478 - const body = body_buf[0..body_len]; 479 + fn handleAdminBlockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 480 + if (!checkAdmin(conn, headers)) return; 479 481 480 482 const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 481 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 483 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 482 484 return; 483 485 }; 484 486 defer parsed.deinit(); 485 487 486 488 const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 487 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 489 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 488 490 return; 489 491 }; 490 492 491 493 persist.updateHostStatus(host_info.id, "blocked") catch { 492 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 494 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 493 495 return; 494 496 }; 495 497 496 498 log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 497 - respondJson(request, .ok, "{\"success\":true}"); 499 + respondJson(conn, .ok, "{\"success\":true}"); 498 500 } 499 501 500 - fn handleAdminUnblockHost(request: *http.Server.Request, persist: *event_log_mod.DiskPersist) void { 501 - if (!checkAdmin(request)) return; 502 - 503 - var transfer_buf: [4096]u8 = undefined; 504 - const body_reader = request.readerExpectNone(&transfer_buf); 505 - var body_buf: [4096]u8 = undefined; 506 - const body_len = body_reader.readSliceShort(&body_buf) catch { 507 - respondJson(request, .bad_request, "{\"error\":\"failed to read request body\"}"); 508 - return; 509 - }; 510 - const body = body_buf[0..body_len]; 502 + fn handleAdminUnblockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 503 + if (!checkAdmin(conn, headers)) return; 511 504 512 505 const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 513 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 506 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 514 507 return; 515 508 }; 516 509 defer parsed.deinit(); 517 510 518 511 const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 519 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 512 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 520 513 return; 521 514 }; 522 515 523 516 persist.updateHostStatus(host_info.id, "active") catch { 524 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 517 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 525 518 return; 526 519 }; 527 520 persist.resetHostFailures(host_info.id) catch {}; 528 521 529 522 log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 530 - respondJson(request, .ok, "{\"success\":true}"); 523 + respondJson(conn, .ok, "{\"success\":true}"); 531 524 } 532 525 533 - /// check admin auth, send error response if not authorized. returns true if authorized. 534 - fn checkAdmin(request: *http.Server.Request) bool { 526 + /// check admin auth via headers, send error response if not authorized. returns true if authorized. 527 + fn checkAdmin(conn: *websocket.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { 535 528 const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 536 - respondJson(request, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); 529 + respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); 537 530 return false; 538 531 }; 539 532 540 - var iter = request.iterateHeaders(); 541 - while (iter.next()) |header| { 542 - if (std.ascii.eqlIgnoreCase(header.name, "authorization")) { 543 - const bearer_prefix = "Bearer "; 544 - if (!std.mem.startsWith(u8, header.value, bearer_prefix)) { 545 - respondJson(request, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); 546 - return false; 547 - } 548 - const token = header.value[bearer_prefix.len..]; 549 - if (!std.mem.eql(u8, token, admin_pw)) { 550 - respondJson(request, .unauthorized, "{\"error\":\"invalid token\"}"); 551 - return false; 552 - } 553 - return true; 554 - } 533 + const kv = headers orelse { 534 + respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 535 + return false; 536 + }; 537 + 538 + // handshake parser lowercases all header names 539 + const auth_value = kv.get("authorization") orelse { 540 + respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 541 + return false; 542 + }; 543 + 544 + const bearer_prefix = "Bearer "; 545 + if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { 546 + respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); 547 + return false; 548 + } 549 + const token = auth_value[bearer_prefix.len..]; 550 + if (!std.mem.eql(u8, token, admin_pw)) { 551 + respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); 552 + return false; 555 553 } 556 - 557 - respondJson(request, .unauthorized, "{\"error\":\"missing authorization header\"}"); 558 - return false; 554 + return true; 559 555 } 560 556 561 557 // --- XRPC endpoint handlers --- 562 558 563 - fn handleListRepos(request: *http.Server.Request, query: []const u8, persist: *event_log_mod.DiskPersist) void { 559 + fn handleListRepos(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 564 560 const cursor_str = queryParam(query, "cursor") orelse "0"; 565 561 const limit_str = queryParam(query, "limit") orelse "500"; 566 562 567 563 const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 568 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 564 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 569 565 return; 570 566 }; 571 567 if (cursor_val < 0) { 572 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 568 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 573 569 return; 574 570 } 575 571 576 572 const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 577 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 573 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 578 574 return; 579 575 }; 580 576 if (limit < 1 or limit > 1000) { 581 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 577 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 582 578 return; 583 579 } 584 580 ··· 589 585 \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 590 586 \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 591 587 , .{ cursor_val, limit }) catch { 592 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 588 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 593 589 return; 594 590 }; 595 591 defer result.deinit(); ··· 658 654 659 655 w.writeByte('}') catch return; 660 656 661 - respondJson(request, .ok, fbs.getWritten()); 657 + respondJson(conn, .ok, fbs.getWritten()); 662 658 } 663 659 664 - fn handleGetRepoStatus(request: *http.Server.Request, query: []const u8, persist: *event_log_mod.DiskPersist) void { 660 + fn handleGetRepoStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 665 661 var did_buf: [256]u8 = undefined; 666 662 const did = queryParamDecoded(query, "did", &did_buf) orelse { 667 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 663 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 668 664 return; 669 665 }; 670 666 671 667 // basic DID syntax check 672 668 if (!std.mem.startsWith(u8, did, "did:")) { 673 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 669 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 674 670 return; 675 671 } 676 672 ··· 679 675 "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 680 676 .{did}, 681 677 ) catch { 682 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 678 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 683 679 return; 684 680 }) orelse { 685 - respondJson(request, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 681 + respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 686 682 return; 687 683 }; 688 684 defer row.deinit() catch {}; ··· 719 715 } 720 716 721 717 w.writeByte('}') catch return; 722 - respondJson(request, .ok, fbs.getWritten()); 718 + respondJson(conn, .ok, fbs.getWritten()); 723 719 } 724 720 725 - fn handleGetLatestCommit(request: *http.Server.Request, query: []const u8, persist: *event_log_mod.DiskPersist) void { 721 + fn handleGetLatestCommit(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 726 722 var did_buf: [256]u8 = undefined; 727 723 const did = queryParamDecoded(query, "did", &did_buf) orelse { 728 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 724 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 729 725 return; 730 726 }; 731 727 732 728 if (!std.mem.startsWith(u8, did, "did:")) { 733 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 729 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 734 730 return; 735 731 } 736 732 ··· 739 735 "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 740 736 .{did}, 741 737 ) catch { 742 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 738 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 743 739 return; 744 740 }) orelse { 745 - respondJson(request, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 741 + respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 746 742 return; 747 743 }; 748 744 defer row.deinit() catch {}; ··· 757 753 758 754 // check account status (match Go relay behavior) 759 755 if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 760 - respondJson(request, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 756 + respondJson(conn, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 761 757 return; 762 758 } else if (std.mem.eql(u8, status, "deactivated")) { 763 - respondJson(request, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 759 + respondJson(conn, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 764 760 return; 765 761 } else if (std.mem.eql(u8, status, "deleted")) { 766 - respondJson(request, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 762 + respondJson(conn, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 767 763 return; 768 764 } else if (!std.mem.eql(u8, status, "active")) { 769 - respondJson(request, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 765 + respondJson(conn, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 770 766 return; 771 767 } 772 768 773 769 if (rev.len == 0 or cid.len == 0) { 774 - respondJson(request, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 770 + respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 775 771 return; 776 772 } 777 773 ··· 785 781 w.writeAll(rev) catch return; 786 782 w.writeAll("\"}") catch return; 787 783 788 - respondJson(request, .ok, fbs.getWritten()); 784 + respondJson(conn, .ok, fbs.getWritten()); 789 785 } 790 786 791 - fn handleListReposByCollection(request: *http.Server.Request, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { 787 + fn handleListReposByCollection(conn: *websocket.Conn, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { 792 788 const collection = queryParam(query, "collection") orelse { 793 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 789 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 794 790 return; 795 791 }; 796 792 797 793 if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { 798 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 794 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 799 795 return; 800 796 } 801 797 802 798 const limit_str = queryParam(query, "limit") orelse "500"; 803 799 const limit = std.fmt.parseInt(usize, limit_str, 10) catch { 804 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 800 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 805 801 return; 806 802 }; 807 803 if (limit < 1 or limit > 1000) { 808 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 804 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 809 805 return; 810 806 } 811 807 ··· 815 811 // scan collection index 816 812 var did_buf: [65536]u8 = undefined; 817 813 const result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 818 - respondJson(request, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 814 + respondJson(conn, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 819 815 return; 820 816 }; 821 817 ··· 842 838 } 843 839 844 840 w.writeByte('}') catch return; 845 - respondJson(request, .ok, fbs.getWritten()); 841 + respondJson(conn, .ok, fbs.getWritten()); 846 842 } 847 843 848 - fn handleListHosts(request: *http.Server.Request, query: []const u8, persist: *event_log_mod.DiskPersist) void { 844 + fn handleListHosts(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 849 845 const cursor_str = queryParam(query, "cursor") orelse "0"; 850 846 const limit_str = queryParam(query, "limit") orelse "200"; 851 847 852 848 const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 853 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 849 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 854 850 return; 855 851 }; 856 852 if (cursor_val < 0) { 857 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 853 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 858 854 return; 859 855 } 860 856 861 857 const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 862 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 858 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 863 859 return; 864 860 }; 865 861 if (limit < 1 or limit > 1000) { 866 - respondJson(request, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 862 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 867 863 return; 868 864 } 869 865 ··· 871 867 "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 872 868 .{ cursor_val, limit }, 873 869 ) catch { 874 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 870 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 875 871 return; 876 872 }; 877 873 defer result.deinit(); ··· 912 908 } 913 909 914 910 w.writeByte('}') catch return; 915 - respondJson(request, .ok, fbs.getWritten()); 911 + respondJson(conn, .ok, fbs.getWritten()); 916 912 } 917 913 918 - fn handleGetHostStatus(request: *http.Server.Request, query: []const u8, persist: *event_log_mod.DiskPersist) void { 914 + fn handleGetHostStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 919 915 var hostname_buf: [256]u8 = undefined; 920 916 const hostname = queryParamDecoded(query, "hostname", &hostname_buf) orelse { 921 - respondJson(request, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 917 + respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 922 918 return; 923 919 }; 924 920 ··· 927 923 "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 928 924 .{hostname}, 929 925 ) catch { 930 - respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 926 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 931 927 return; 932 928 }) orelse { 933 - respondJson(request, .bad_request, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 929 + respondJson(conn, .bad_request, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 934 930 return; 935 931 }; 936 932 defer row.deinit() catch {}; ··· 970 966 w.writeAll(status) catch return; 971 967 w.writeAll("\"}") catch return; 972 968 973 - respondJson(request, .ok, fbs.getWritten()); 969 + respondJson(conn, .ok, fbs.getWritten()); 974 970 } 975 971 976 972 /// build a CBOR #account frame for a takedown event. ··· 1036 1032 1037 1033 // --- backfill handlers --- 1038 1034 1039 - fn handleAdminBackfillTrigger(request: *http.Server.Request, query: []const u8, backfiller: *backfill_mod.Backfiller) void { 1040 - if (!checkAdmin(request)) return; 1035 + fn handleAdminBackfillTrigger(conn: *websocket.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { 1036 + if (!checkAdmin(conn, headers)) return; 1041 1037 1042 1038 const source = queryParam(query, "source") orelse "bsky.network"; 1043 1039 1044 1040 backfiller.start(source) catch |err| { 1045 1041 switch (err) { 1046 1042 error.AlreadyRunning => { 1047 - respondJson(request, .conflict, "{\"error\":\"backfill already in progress\"}"); 1043 + respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); 1048 1044 }, 1049 1045 else => { 1050 - respondJson(request, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); 1046 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); 1051 1047 }, 1052 1048 } 1053 1049 return; ··· 1055 1051 1056 1052 var buf: [256]u8 = undefined; 1057 1053 const body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 1058 - respondJson(request, .ok, "{\"status\":\"started\"}"); 1054 + respondJson(conn, .ok, "{\"status\":\"started\"}"); 1059 1055 return; 1060 1056 }; 1061 - respondJson(request, .ok, body); 1057 + respondJson(conn, .ok, body); 1062 1058 } 1063 1059 1064 - fn handleAdminBackfillStatus(request: *http.Server.Request, backfiller: *backfill_mod.Backfiller) void { 1065 - if (!checkAdmin(request)) return; 1060 + fn handleAdminBackfillStatus(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 1061 + if (!checkAdmin(conn, headers)) return; 1066 1062 1067 - const body = backfiller.getStatus(backfiller.allocator) catch { 1068 - respondJson(request, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); 1063 + const body = ctx.backfiller.getStatus(ctx.backfiller.allocator) catch { 1064 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); 1069 1065 return; 1070 1066 }; 1071 - defer backfiller.allocator.free(body); 1067 + defer ctx.backfiller.allocator.free(body); 1072 1068 1073 - respondJson(request, .ok, body); 1069 + respondJson(conn, .ok, body); 1074 1070 } 1075 1071 1076 1072 // --- query string helpers --- ··· 1137 1133 }; 1138 1134 } 1139 1135 1140 - // --- response helpers --- 1136 + // --- response helpers (write raw HTTP to websocket.Conn) --- 1137 + 1138 + fn httpRespond(conn: *websocket.Conn, status: http.Status, content_type: []const u8, body: []const u8) void { 1139 + var buf: [512]u8 = undefined; 1140 + const header = std.fmt.bufPrint(&buf, "HTTP/1.1 {s}\r\nContent-Type: {s}\r\nContent-Length: {d}\r\nConnection: close\r\nServer: zlay\r\n\r\n", .{ 1141 + httpStatusLine(status), 1142 + content_type, 1143 + body.len, 1144 + }) catch return; 1145 + conn.writeFramed(header) catch return; 1146 + if (body.len > 0) conn.writeFramed(body) catch return; 1147 + } 1141 1148 1142 - fn respondJson(request: *http.Server.Request, status: http.Status, body: []const u8) void { 1143 - request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{ 1144 - .{ .name = "content-type", .value = "application/json" }, 1145 - .{ .name = "server", .value = "zlay (atproto-relay)" }, 1146 - } }) catch {}; 1149 + fn respondJson(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 1150 + httpRespond(conn, status, "application/json", body); 1151 + } 1152 + 1153 + fn respondText(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 1154 + httpRespond(conn, status, "text/plain", body); 1147 1155 } 1148 1156 1149 - fn respondText(request: *http.Server.Request, status: http.Status, body: []const u8) void { 1150 - request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{ 1151 - .{ .name = "content-type", .value = "text/plain" }, 1152 - .{ .name = "server", .value = "zlay (atproto-relay)" }, 1153 - } }) catch {}; 1157 + fn httpStatusLine(status: http.Status) []const u8 { 1158 + return switch (status) { 1159 + .ok => "200 OK", 1160 + .bad_request => "400 Bad Request", 1161 + .unauthorized => "401 Unauthorized", 1162 + .forbidden => "403 Forbidden", 1163 + .not_found => "404 Not Found", 1164 + .method_not_allowed => "405 Method Not Allowed", 1165 + .conflict => "409 Conflict", 1166 + .internal_server_error => "500 Internal Server Error", 1167 + else => "500 Internal Server Error", 1168 + }; 1154 1169 } 1155 1170 1156 1171 fn parseEnvInt(comptime T: type, key: []const u8, default: T) T {
+7
src/validator.zig
··· 538 538 defer self.cache_mutex.unlock(); 539 539 return self.cache.count(); 540 540 } 541 + 542 + /// migration queue length (for diagnostics) 543 + pub fn migrationQueueLen(self: *Validator) usize { 544 + self.queue_mutex.lock(); 545 + defer self.queue_mutex.unlock(); 546 + return self.migration_queue.items.len; 547 + } 541 548 }; 542 549 543 550 /// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path"