atproto relay implementation in zig zlay.waow.tech

refactor: split api.zig into api/ directory (Ghostty namespace pattern)

api.zig was a 926-line grab bag mixing endpoint definitions with hex
parsing, query string decoding, CBOR frame building, and HTTP response
plumbing. now follows the Ghostty/stdlib namespace pattern:

src/api.zig — 20-line namespace (re-exports HttpContext + handleHttpRequest)
src/api/router.zig — route dispatch + static content
src/api/xrpc.zig — AT Protocol sync endpoint handlers
src/api/admin.zig — admin handlers, auth, CBOR frame building
src/api/http.zig — response helpers, query string parsing

no behavior changes. main.zig unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+973 -921
+15 -921
src/api.zig
··· 1 - //! HTTP API handlers for the relay 1 + //! HTTP API module for the relay. 2 2 //! 3 - //! serves XRPC endpoints, admin endpoints, health/stats, and the root banner 4 - //! via the websocket server's httpFallback mechanism. all handlers write raw 5 - //! HTTP responses to the websocket connection. 6 - 7 - const std = @import("std"); 8 - const http = std.http; 9 - const websocket = @import("websocket"); 10 - const broadcaster = @import("broadcaster.zig"); 11 - const validator_mod = @import("validator.zig"); 12 - const slurper_mod = @import("slurper.zig"); 13 - const event_log_mod = @import("event_log.zig"); 14 - const collection_index_mod = @import("collection_index.zig"); 15 - const backfill_mod = @import("backfill.zig"); 16 - 17 - const log = std.log.scoped(.relay); 18 - 19 - /// context for HTTP fallback handlers (passed as opaque pointer through broadcaster) 20 - pub const HttpContext = struct { 21 - stats: *broadcaster.Stats, 22 - persist: *event_log_mod.DiskPersist, 23 - slurper: *slurper_mod.Slurper, 24 - collection_index: *collection_index_mod.CollectionIndex, 25 - backfiller: *backfill_mod.Backfiller, 26 - bc: *broadcaster.Broadcaster, 27 - validator: *validator_mod.Validator, 28 - }; 29 - 30 - /// top-level HTTP request router — installed as bc.http_fallback 31 - pub fn handleHttpRequest( 32 - conn: *websocket.Conn, 33 - method: []const u8, 34 - url: []const u8, 35 - body: []const u8, 36 - headers: *const websocket.Handshake.KeyValue, 37 - opaque_ctx: ?*anyopaque, 38 - ) void { 39 - const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return)); 40 - 41 - const qmark = std.mem.indexOfScalar(u8, url, '?'); 42 - const path = url[0..(qmark orelse url.len)]; 43 - const query = if (qmark) |q| url[q + 1 ..] else ""; 44 - 45 - if (std.mem.eql(u8, method, "GET")) { 46 - handleGet(conn, path, query, headers, ctx); 47 - } else if (std.mem.eql(u8, method, "POST")) { 48 - handlePost(conn, path, query, body, headers, ctx); 49 - } else { 50 - respondText(conn, .method_not_allowed, "method not allowed"); 51 - } 52 - } 53 - 54 - fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 55 - if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 56 - respondJson(conn, .ok, "{\"status\":\"ok\"}"); 57 - } else if (std.mem.eql(u8, path, "/_stats")) { 58 - var stats_buf: [4096]u8 = undefined; 59 - const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf); 60 - respondJson(conn, .ok, body); 61 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 62 - handleListRepos(conn, query, ctx.persist); 63 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 64 - handleGetRepoStatus(conn, query, ctx.persist); 65 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 66 - handleGetLatestCommit(conn, query, ctx.persist); 67 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 68 - handleListReposByCollection(conn, query, ctx.collection_index); 69 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 70 - handleListHosts(conn, query, ctx.persist); 71 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 72 - handleGetHostStatus(conn, query, ctx.persist); 73 - } else if (std.mem.eql(u8, path, "/admin/hosts")) { 74 - handleAdminListHosts(conn, headers, ctx); 75 - } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 76 - handleAdminBackfillStatus(conn, headers, ctx); 77 - } else if (std.mem.eql(u8, path, "/")) { 78 - respondText(conn, .ok, 79 - \\ _ 80 - \\ ___| | __ _ _ _ 81 - \\|_ / |/ _` | | | | 82 - \\ / /| | (_| | |_| | 83 - \\/___|_|\__,_|\__, | 84 - \\ |___/ 85 - \\ 86 - \\This is an atproto [https://atproto.com] relay instance, 87 - \\running the zlay codebase [https://tangled.org/zzstoatzz.io/zlay] 88 - \\ 89 - \\The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 90 - \\ 91 - ); 92 - } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) { 93 - httpRespond(conn, .ok, "image/svg+xml", 94 - \\<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32"> 95 - \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/> 96 - \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text> 97 - \\</svg> 98 - ); 99 - } else { 100 - respondText(conn, .not_found, "not found"); 101 - } 102 - } 103 - 104 - fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 105 - if (std.mem.eql(u8, path, "/admin/repo/ban")) { 106 - handleBan(conn, body, headers, ctx); 107 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 108 - handleRequestCrawl(conn, body, ctx.slurper); 109 - } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { 110 - handleAdminBlockHost(conn, body, headers, ctx.persist); 111 - } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 112 - handleAdminUnblockHost(conn, body, headers, ctx.persist); 113 - } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 114 - handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); 115 - } else { 116 - respondText(conn, .not_found, "not found"); 117 - } 118 - } 119 - 120 - fn handleBan(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 121 - if (!checkAdmin(conn, headers)) return; 122 - 123 - const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 124 - respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); 125 - return; 126 - }; 127 - defer parsed.deinit(); 128 - const did = parsed.value.did; 129 - 130 - // resolve DID → UID and take down 131 - const uid = ctx.persist.uidForDid(did) catch { 132 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 133 - return; 134 - }; 135 - ctx.persist.takeDownUser(uid) catch { 136 - respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); 137 - return; 138 - }; 139 - 140 - // emit #account event so downstream consumers see the takedown 141 - if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 142 - if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 143 - ctx.bc.stats.relay_seq.store(relay_seq, .release); 144 - const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 145 - ctx.bc.broadcast(relay_seq, broadcast_data); 146 - log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 147 - } else |err| { 148 - log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); 149 - } 150 - } 151 - 152 - log.info("admin: banned {s} (uid={d})", .{ did, uid }); 153 - respondJson(conn, .ok, "{\"success\":true}"); 154 - } 155 - 156 - fn handleRequestCrawl(conn: *websocket.Conn, body: []const u8, slurper: *slurper_mod.Slurper) void { 157 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, slurper.allocator, body, .{ .ignore_unknown_fields = true }) catch { 158 - respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 159 - return; 160 - }; 161 - defer parsed.deinit(); 162 - 163 - // fast validation: hostname format (Go relay does this synchronously in handler) 164 - const hostname = slurper_mod.validateHostname(slurper.allocator, parsed.value.hostname) catch |err| { 165 - log.warn("requestCrawl rejected '{s}': {s}", .{ parsed.value.hostname, @errorName(err) }); 166 - respondJson(conn, .bad_request, switch (err) { 167 - error.EmptyHostname => "{\"error\":\"empty hostname\"}", 168 - error.InvalidCharacter => "{\"error\":\"hostname contains invalid characters\"}", 169 - error.InvalidLabel => "{\"error\":\"hostname has invalid label\"}", 170 - error.TooFewLabels => "{\"error\":\"hostname must have at least two labels (e.g. pds.example.com)\"}", 171 - error.LooksLikeIpAddress => "{\"error\":\"IP addresses not allowed, use a hostname\"}", 172 - error.PortNotAllowed => "{\"error\":\"port numbers not allowed\"}", 173 - error.LocalhostNotAllowed => "{\"error\":\"localhost not allowed\"}", 174 - else => "{\"error\":\"invalid hostname\"}", 175 - }); 176 - return; 177 - }; 178 - defer slurper.allocator.free(hostname); 179 - 180 - // fast validation: domain ban check 181 - if (slurper.persist.isDomainBanned(hostname)) { 182 - log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 183 - respondJson(conn, .bad_request, "{\"error\":\"domain is banned\"}"); 184 - return; 185 - } 186 - 187 - // enqueue for async processing (describeServer check happens in crawl processor) 188 - slurper.addCrawlRequest(hostname) catch { 189 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); 190 - return; 191 - }; 192 - 193 - log.info("crawl requested: {s}", .{hostname}); 194 - respondJson(conn, .ok, "{\"success\":true}"); 195 - } 196 - 197 - // --- admin host management --- 198 - 199 - fn handleAdminListHosts(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 200 - if (!checkAdmin(conn, headers)) return; 201 - 202 - const persist = ctx.persist; 203 - const hosts = persist.listAllHosts(persist.allocator) catch { 204 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 205 - return; 206 - }; 207 - defer { 208 - for (hosts) |h| { 209 - persist.allocator.free(h.hostname); 210 - persist.allocator.free(h.status); 211 - } 212 - persist.allocator.free(hosts); 213 - } 214 - 215 - var list: std.ArrayListUnmanaged(u8) = .{}; 216 - defer list.deinit(persist.allocator); 217 - const w = list.writer(persist.allocator); 218 - 219 - w.writeAll("{\"hosts\":[") catch return; 220 - 221 - for (hosts, 0..) |host, i| { 222 - if (i > 0) w.writeByte(',') catch return; 223 - std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d}}}", .{ 224 - host.id, 225 - host.hostname, 226 - host.status, 227 - host.last_seq, 228 - host.failed_attempts, 229 - }) catch return; 230 - } 231 - 232 - std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; 233 - respondJson(conn, .ok, list.items); 234 - } 235 - 236 - fn handleAdminBlockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 237 - if (!checkAdmin(conn, headers)) return; 238 - 239 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 240 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 241 - return; 242 - }; 243 - defer parsed.deinit(); 244 - 245 - const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 246 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 247 - return; 248 - }; 249 - 250 - persist.updateHostStatus(host_info.id, "blocked") catch { 251 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 252 - return; 253 - }; 254 - 255 - log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 256 - respondJson(conn, .ok, "{\"success\":true}"); 257 - } 258 - 259 - fn handleAdminUnblockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 260 - if (!checkAdmin(conn, headers)) return; 261 - 262 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 263 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 264 - return; 265 - }; 266 - defer parsed.deinit(); 267 - 268 - const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 269 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 270 - return; 271 - }; 272 - 273 - persist.updateHostStatus(host_info.id, "active") catch { 274 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 275 - return; 276 - }; 277 - persist.resetHostFailures(host_info.id) catch {}; 278 - 279 - log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 280 - respondJson(conn, .ok, "{\"success\":true}"); 281 - } 282 - 283 - /// check admin auth via headers, send error response if not authorized. returns true if authorized. 284 - fn checkAdmin(conn: *websocket.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { 285 - const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 286 - respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); 287 - return false; 288 - }; 289 - 290 - const kv = headers orelse { 291 - respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 292 - return false; 293 - }; 294 - 295 - // handshake parser lowercases all header names 296 - const auth_value = kv.get("authorization") orelse { 297 - respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 298 - return false; 299 - }; 300 - 301 - const bearer_prefix = "Bearer "; 302 - if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { 303 - respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); 304 - return false; 305 - } 306 - const token = auth_value[bearer_prefix.len..]; 307 - if (!std.mem.eql(u8, token, admin_pw)) { 308 - respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); 309 - return false; 310 - } 311 - return true; 312 - } 313 - 314 - // --- XRPC endpoint handlers --- 315 - 316 - fn handleListRepos(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 317 - const cursor_str = queryParam(query, "cursor") orelse "0"; 318 - const limit_str = queryParam(query, "limit") orelse "500"; 319 - 320 - const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 321 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 322 - return; 323 - }; 324 - if (cursor_val < 0) { 325 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 326 - return; 327 - } 328 - 329 - const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 330 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 331 - return; 332 - }; 333 - if (limit < 1 or limit > 1000) { 334 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 335 - return; 336 - } 337 - 338 - // query accounts with repo state, paginated by UID 339 - // includes both local status and upstream_status for combined active check 340 - var result = persist.db.query( 341 - \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 342 - \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 343 - \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 344 - , .{ cursor_val, limit }) catch { 345 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 346 - return; 347 - }; 348 - defer result.deinit(); 349 - 350 - // build JSON response into a buffer 351 - var buf: [65536]u8 = undefined; 352 - var fbs = std.io.fixedBufferStream(&buf); 353 - const w = fbs.writer(); 354 - 355 - var count: i64 = 0; 356 - var last_uid: i64 = 0; 357 - 358 - w.writeAll("{\"repos\":[") catch return; 359 - 360 - while (result.nextUnsafe() catch null) |row| { 361 - if (count > 0) w.writeByte(',') catch return; 362 - 363 - const uid = row.get(i64, 0); 364 - const did = row.get([]const u8, 1); 365 - const local_status = row.get([]const u8, 2); 366 - const upstream_status = row.get([]const u8, 3); 367 - const rev = row.get([]const u8, 4); 368 - const head = row.get([]const u8, 5); 369 - 370 - // Go relay: Account.IsActive() — both local AND upstream must be active 371 - const local_ok = std.mem.eql(u8, local_status, "active"); 372 - const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 373 - const active = local_ok and upstream_ok; 374 - // Go relay: Account.AccountStatus() — local takes priority 375 - const status = if (!local_ok) local_status else upstream_status; 376 - 377 - w.writeAll("{\"did\":\"") catch return; 378 - w.writeAll(did) catch return; 379 - w.writeAll("\"") catch return; 380 - 381 - if (head.len > 0) { 382 - w.writeAll(",\"head\":\"") catch return; 383 - w.writeAll(head) catch return; 384 - w.writeAll("\"") catch return; 385 - } 386 - if (rev.len > 0) { 387 - w.writeAll(",\"rev\":\"") catch return; 388 - w.writeAll(rev) catch return; 389 - w.writeAll("\"") catch return; 390 - } 391 - 392 - if (active) { 393 - w.writeAll(",\"active\":true") catch return; 394 - } else { 395 - w.writeAll(",\"active\":false,\"status\":\"") catch return; 396 - w.writeAll(status) catch return; 397 - w.writeAll("\"") catch return; 398 - } 399 - 400 - w.writeByte('}') catch return; 401 - last_uid = uid; 402 - count += 1; 403 - } 404 - 405 - w.writeByte(']') catch return; 406 - 407 - // include cursor if we got a full page 408 - if (count >= limit and count >= 2) { 409 - std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_uid}) catch return; 410 - } 411 - 412 - w.writeByte('}') catch return; 413 - 414 - respondJson(conn, .ok, fbs.getWritten()); 415 - } 416 - 417 - fn handleGetRepoStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 418 - var did_buf: [256]u8 = undefined; 419 - const did = queryParamDecoded(query, "did", &did_buf) orelse { 420 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 421 - return; 422 - }; 423 - 424 - // basic DID syntax check 425 - if (!std.mem.startsWith(u8, did, "did:")) { 426 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 427 - return; 428 - } 429 - 430 - // look up account (includes both local and upstream status) 431 - var row = (persist.db.rowUnsafe( 432 - "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 433 - .{did}, 434 - ) catch { 435 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 436 - return; 437 - }) orelse { 438 - respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 439 - return; 440 - }; 441 - defer row.deinit() catch {}; 442 - 443 - const local_status = row.get([]const u8, 1); 444 - const upstream_status = row.get([]const u8, 2); 445 - const rev = row.get([]const u8, 3); 446 - // Go relay: Account.IsActive() / AccountStatus() 447 - const local_ok = std.mem.eql(u8, local_status, "active"); 448 - const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 449 - const active = local_ok and upstream_ok; 450 - const status = if (!local_ok) local_status else upstream_status; 451 - 452 - var buf: [4096]u8 = undefined; 453 - var fbs = std.io.fixedBufferStream(&buf); 454 - const w = fbs.writer(); 455 - 456 - w.writeAll("{\"did\":\"") catch return; 457 - w.writeAll(did) catch return; 458 - w.writeAll("\"") catch return; 459 - 460 - if (active) { 461 - w.writeAll(",\"active\":true") catch return; 462 - } else { 463 - w.writeAll(",\"active\":false,\"status\":\"") catch return; 464 - w.writeAll(status) catch return; 465 - w.writeAll("\"") catch return; 466 - } 467 - 468 - if (rev.len > 0) { 469 - w.writeAll(",\"rev\":\"") catch return; 470 - w.writeAll(rev) catch return; 471 - w.writeAll("\"") catch return; 472 - } 473 - 474 - w.writeByte('}') catch return; 475 - respondJson(conn, .ok, fbs.getWritten()); 476 - } 477 - 478 - fn handleGetLatestCommit(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 479 - var did_buf: [256]u8 = undefined; 480 - const did = queryParamDecoded(query, "did", &did_buf) orelse { 481 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 482 - return; 483 - }; 484 - 485 - if (!std.mem.startsWith(u8, did, "did:")) { 486 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 487 - return; 488 - } 489 - 490 - // look up account + repo state (includes both local and upstream status) 491 - var row = (persist.db.rowUnsafe( 492 - "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 493 - .{did}, 494 - ) catch { 495 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 496 - return; 497 - }) orelse { 498 - respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 499 - return; 500 - }; 501 - defer row.deinit() catch {}; 502 - 503 - const local_status = row.get([]const u8, 0); 504 - const upstream_status = row.get([]const u8, 1); 505 - const rev = row.get([]const u8, 2); 506 - const cid = row.get([]const u8, 3); 507 - 508 - // combined status: local takes priority (Go relay: AccountStatus()) 509 - const status = if (!std.mem.eql(u8, local_status, "active")) local_status else upstream_status; 510 - 511 - // check account status (match Go relay behavior) 512 - if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 513 - respondJson(conn, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 514 - return; 515 - } else if (std.mem.eql(u8, status, "deactivated")) { 516 - respondJson(conn, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 517 - return; 518 - } else if (std.mem.eql(u8, status, "deleted")) { 519 - respondJson(conn, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 520 - return; 521 - } else if (!std.mem.eql(u8, status, "active")) { 522 - respondJson(conn, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 523 - return; 524 - } 525 - 526 - if (rev.len == 0 or cid.len == 0) { 527 - respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 528 - return; 529 - } 530 - 531 - var buf: [4096]u8 = undefined; 532 - var fbs = std.io.fixedBufferStream(&buf); 533 - const w = fbs.writer(); 534 - 535 - w.writeAll("{\"cid\":\"") catch return; 536 - w.writeAll(cid) catch return; 537 - w.writeAll("\",\"rev\":\"") catch return; 538 - w.writeAll(rev) catch return; 539 - w.writeAll("\"}") catch return; 540 - 541 - respondJson(conn, .ok, fbs.getWritten()); 542 - } 543 - 544 - fn handleListReposByCollection(conn: *websocket.Conn, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { 545 - const collection = queryParam(query, "collection") orelse { 546 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 547 - return; 548 - }; 549 - 550 - if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { 551 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 552 - return; 553 - } 554 - 555 - const limit_str = queryParam(query, "limit") orelse "500"; 556 - const limit = std.fmt.parseInt(usize, limit_str, 10) catch { 557 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 558 - return; 559 - }; 560 - if (limit < 1 or limit > 1000) { 561 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 562 - return; 563 - } 564 - 565 - var cursor_buf: [256]u8 = undefined; 566 - const cursor_did = queryParamDecoded(query, "cursor", &cursor_buf); 567 - 568 - // scan collection index 569 - var did_buf: [65536]u8 = undefined; 570 - const ci_result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 571 - respondJson(conn, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 572 - return; 573 - }; 574 - 575 - // build JSON response 576 - var buf: [65536]u8 = undefined; 577 - var fbs = std.io.fixedBufferStream(&buf); 578 - const w = fbs.writer(); 579 - 580 - w.writeAll("{\"repos\":[") catch return; 581 - for (0..ci_result.count) |i| { 582 - if (i > 0) w.writeByte(',') catch return; 583 - w.writeAll("{\"did\":\"") catch return; 584 - w.writeAll(ci_result.getDid(i)) catch return; 585 - w.writeAll("\"}") catch return; 586 - } 587 - w.writeByte(']') catch return; 588 - 589 - if (ci_result.last_did) |last| { 590 - if (ci_result.count >= limit) { 591 - w.writeAll(",\"cursor\":\"") catch return; 592 - w.writeAll(last) catch return; 593 - w.writeAll("\"") catch return; 594 - } 595 - } 596 - 597 - w.writeByte('}') catch return; 598 - respondJson(conn, .ok, fbs.getWritten()); 599 - } 600 - 601 - fn handleListHosts(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 602 - const cursor_str = queryParam(query, "cursor") orelse "0"; 603 - const limit_str = queryParam(query, "limit") orelse "200"; 604 - 605 - const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 606 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 607 - return; 608 - }; 609 - if (cursor_val < 0) { 610 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 611 - return; 612 - } 613 - 614 - const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 615 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 616 - return; 617 - }; 618 - if (limit < 1 or limit > 1000) { 619 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 620 - return; 621 - } 622 - 623 - var result = persist.db.query( 624 - "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 625 - .{ cursor_val, limit }, 626 - ) catch { 627 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 628 - return; 629 - }; 630 - defer result.deinit(); 631 - 632 - var buf: [65536]u8 = undefined; 633 - var fbs = std.io.fixedBufferStream(&buf); 634 - const w = fbs.writer(); 635 - 636 - var count: i64 = 0; 637 - var last_id: i64 = 0; 638 - 639 - w.writeAll("{\"hosts\":[") catch return; 640 - 641 - while (result.nextUnsafe() catch null) |row| { 642 - if (count > 0) w.writeByte(',') catch return; 643 - 644 - const id = row.get(i64, 0); 645 - const hostname = row.get([]const u8, 1); 646 - const status = row.get([]const u8, 2); 647 - const seq = row.get(i64, 3); 648 - 649 - w.writeAll("{\"hostname\":\"") catch return; 650 - w.writeAll(hostname) catch return; 651 - w.writeAll("\"") catch return; 652 - std.fmt.format(w, ",\"seq\":{d}", .{seq}) catch return; 653 - w.writeAll(",\"status\":\"") catch return; 654 - w.writeAll(status) catch return; 655 - w.writeAll("\"}") catch return; 656 - 657 - last_id = id; 658 - count += 1; 659 - } 660 - 661 - w.writeByte(']') catch return; 662 - 663 - if (count >= limit and count > 1) { 664 - std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_id}) catch return; 665 - } 666 - 667 - w.writeByte('}') catch return; 668 - respondJson(conn, .ok, fbs.getWritten()); 669 - } 670 - 671 - fn handleGetHostStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 672 - var hostname_buf: [256]u8 = undefined; 673 - const hostname = queryParamDecoded(query, "hostname", &hostname_buf) orelse { 674 - respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 675 - return; 676 - }; 677 - 678 - // look up host 679 - var row = (persist.db.rowUnsafe( 680 - "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 681 - .{hostname}, 682 - ) catch { 683 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 684 - return; 685 - }) orelse { 686 - respondJson(conn, .bad_request, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 687 - return; 688 - }; 689 - defer row.deinit() catch {}; 690 - 691 - const host_id = row.get(i64, 0); 692 - const host_name = row.get([]const u8, 1); 693 - const raw_status = row.get([]const u8, 2); 694 - const seq = row.get(i64, 3); 695 - 696 - // map internal status to lexicon hostStatus values 697 - const status = if (std.mem.eql(u8, raw_status, "blocked")) 698 - "banned" 699 - else if (std.mem.eql(u8, raw_status, "exhausted")) 700 - "offline" 701 - else 702 - raw_status; // active, idle pass through 703 - 704 - // count accounts on this host 705 - const account_count: i64 = if (persist.db.rowUnsafe( 706 - "SELECT COUNT(*) FROM account WHERE host_id = $1", 707 - .{host_id}, 708 - ) catch null) |cnt_row| blk: { 709 - var r = cnt_row; 710 - defer r.deinit() catch {}; 711 - break :blk r.get(i64, 0); 712 - } else 0; 713 - 714 - var buf: [4096]u8 = undefined; 715 - var fbs = std.io.fixedBufferStream(&buf); 716 - const w = fbs.writer(); 717 - 718 - w.writeAll("{\"hostname\":\"") catch return; 719 - w.writeAll(host_name) catch return; 720 - w.writeAll("\"") catch return; 721 - std.fmt.format(w, ",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; 722 - w.writeAll(",\"status\":\"") catch return; 723 - w.writeAll(status) catch return; 724 - w.writeAll("\"}") catch return; 725 - 726 - respondJson(conn, .ok, fbs.getWritten()); 727 - } 728 - 729 - /// build a CBOR #account frame for a takedown event. 730 - /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} 731 - fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { 732 - const zat = @import("zat"); 733 - const cbor = zat.cbor; 734 - 735 - const header: cbor.Value = .{ .map = &.{ 736 - .{ .key = "op", .value = .{ .unsigned = 1 } }, 737 - .{ .key = "t", .value = .{ .text = "#account" } }, 738 - } }; 739 - 740 - var time_buf: [24]u8 = undefined; 741 - const time_str = formatTimestamp(&time_buf); 742 - 743 - const payload: cbor.Value = .{ .map = &.{ 744 - .{ .key = "seq", .value = .{ .unsigned = 0 } }, 745 - .{ .key = "did", .value = .{ .text = did } }, 746 - .{ .key = "time", .value = .{ .text = time_str } }, 747 - .{ .key = "active", .value = .{ .boolean = false } }, 748 - .{ .key = "status", .value = .{ .text = "takendown" } }, 749 - } }; 750 - 751 - const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; 752 - const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { 753 - allocator.free(header_bytes); 754 - return null; 755 - }; 756 - 757 - var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { 758 - allocator.free(header_bytes); 759 - allocator.free(payload_bytes); 760 - return null; 761 - }; 762 - @memcpy(frame[0..header_bytes.len], header_bytes); 763 - @memcpy(frame[header_bytes.len..], payload_bytes); 764 - 765 - allocator.free(header_bytes); 766 - allocator.free(payload_bytes); 767 - 768 - return frame; 769 - } 770 - 771 - /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) 772 - fn formatTimestamp(buf: *[24]u8) []const u8 { 773 - const ts: u64 = @intCast(std.time.timestamp()); 774 - const es = std.time.epoch.EpochSeconds{ .secs = ts }; 775 - const day = es.getEpochDay(); 776 - const yd = day.calculateYearDay(); 777 - const md = yd.calculateMonthDay(); 778 - const ds = es.getDaySeconds(); 779 - 780 - return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 781 - yd.year, 782 - @as(u32, @intFromEnum(md.month)) + 1, 783 - @as(u32, md.day_index) + 1, 784 - ds.getHoursIntoDay(), 785 - ds.getMinutesIntoHour(), 786 - ds.getSecondsIntoMinute(), 787 - }) catch "1970-01-01T00:00:00Z"; 788 - } 789 - 790 - // --- backfill handlers --- 791 - 792 - fn handleAdminBackfillTrigger(conn: *websocket.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { 793 - if (!checkAdmin(conn, headers)) return; 794 - 795 - const source = queryParam(query, "source") orelse "bsky.network"; 796 - 797 - backfiller.start(source) catch |err| { 798 - switch (err) { 799 - error.AlreadyRunning => { 800 - respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); 801 - }, 802 - else => { 803 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); 804 - }, 805 - } 806 - return; 807 - }; 808 - 809 - var buf: [256]u8 = undefined; 810 - const body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 811 - respondJson(conn, .ok, "{\"status\":\"started\"}"); 812 - return; 813 - }; 814 - respondJson(conn, .ok, body); 815 - } 816 - 817 - fn handleAdminBackfillStatus(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 818 - if (!checkAdmin(conn, headers)) return; 819 - 820 - const body = ctx.backfiller.getStatus(ctx.backfiller.allocator) catch { 821 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); 822 - return; 823 - }; 824 - defer ctx.backfiller.allocator.free(body); 3 + //! re-exports the public API from the api/ subdirectory: 4 + //! router.zig — route dispatch, HttpContext 5 + //! xrpc.zig — AT Protocol sync endpoint handlers 6 + //! admin.zig — admin endpoint handlers 7 + //! http.zig — response helpers, query string parsing 825 8 826 - respondJson(conn, .ok, body); 827 - } 9 + const router = @import("api/router.zig"); 828 10 829 - // --- query string helpers --- 11 + pub const HttpContext = router.HttpContext; 12 + pub const handleHttpRequest = router.handleHttpRequest; 830 13 831 - fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 832 - if (query.len == 0) return null; 833 - var iter = std.mem.splitScalar(u8, query, '&'); 834 - while (iter.next()) |pair| { 835 - const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 836 - if (std.mem.eql(u8, pair[0..eq], name)) { 837 - return pair[eq + 1 ..]; 838 - } 839 - } 840 - return null; 841 - } 842 - 843 - /// like queryParam but percent-decodes the value into buf. 844 - /// returns null if the param is missing, or a slice into buf with the decoded value. 845 - fn queryParamDecoded(query: []const u8, name: []const u8, buf: []u8) ?[]const u8 { 846 - const raw = queryParam(query, name) orelse return null; 847 - var i: usize = 0; 848 - var out: usize = 0; 849 - while (i < raw.len) { 850 - if (raw[i] == '%' and i + 2 < raw.len) { 851 - const hi = hexVal(raw[i + 1]) orelse { 852 - if (out >= buf.len) return null; 853 - buf[out] = raw[i]; 854 - out += 1; 855 - i += 1; 856 - continue; 857 - }; 858 - const lo = hexVal(raw[i + 2]) orelse { 859 - if (out >= buf.len) return null; 860 - buf[out] = raw[i]; 861 - out += 1; 862 - i += 1; 863 - continue; 864 - }; 865 - if (out >= buf.len) return null; 866 - buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 867 - out += 1; 868 - i += 3; 869 - } else if (raw[i] == '+') { 870 - if (out >= buf.len) return null; 871 - buf[out] = ' '; 872 - out += 1; 873 - i += 1; 874 - } else { 875 - if (out >= buf.len) return null; 876 - buf[out] = raw[i]; 877 - out += 1; 878 - i += 1; 879 - } 880 - } 881 - return buf[0..out]; 882 - } 883 - 884 - fn hexVal(c: u8) ?u4 { 885 - return switch (c) { 886 - '0'...'9' => @intCast(c - '0'), 887 - 'a'...'f' => @intCast(c - 'a' + 10), 888 - 'A'...'F' => @intCast(c - 'A' + 10), 889 - else => null, 890 - }; 891 - } 892 - 893 - // --- response helpers (write raw HTTP to websocket.Conn) --- 894 - 895 - fn httpRespond(conn: *websocket.Conn, status: http.Status, content_type: []const u8, body: []const u8) void { 896 - var buf: [512]u8 = undefined; 897 - const header = std.fmt.bufPrint(&buf, "HTTP/1.1 {s}\r\nContent-Type: {s}\r\nContent-Length: {d}\r\nConnection: close\r\nServer: zlay\r\n\r\n", .{ 898 - httpStatusLine(status), 899 - content_type, 900 - body.len, 901 - }) catch return; 902 - conn.writeFramed(header) catch return; 903 - if (body.len > 0) conn.writeFramed(body) catch return; 904 - } 905 - 906 - fn respondJson(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 907 - httpRespond(conn, status, "application/json", body); 908 - } 909 - 910 - fn respondText(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 911 - httpRespond(conn, status, "text/plain", body); 912 - } 913 - 914 - fn httpStatusLine(status: http.Status) []const u8 { 915 - return switch (status) { 916 - .ok => "200 OK", 917 - .bad_request => "400 Bad Request", 918 - .unauthorized => "401 Unauthorized", 919 - .forbidden => "403 Forbidden", 920 - .not_found => "404 Not Found", 921 - .method_not_allowed => "405 Method Not Allowed", 922 - .conflict => "409 Conflict", 923 - .internal_server_error => "500 Internal Server Error", 924 - else => "500 Internal Server Error", 925 - }; 14 + test { 15 + @import("std").testing.refAllDecls(@This()); 16 + _ = @import("api/http.zig"); 17 + _ = @import("api/xrpc.zig"); 18 + _ = @import("api/admin.zig"); 19 + _ = @import("api/router.zig"); 926 20 }
+267
src/api/admin.zig
··· 1 + //! admin endpoint handlers for relay management. 2 + //! 3 + //! all handlers require Bearer token auth against RELAY_ADMIN_PASSWORD. 4 + //! includes host blocking/unblocking, account bans, and backfill control. 5 + 6 + const std = @import("std"); 7 + const h = @import("http.zig"); 8 + const router = @import("router.zig"); 9 + const websocket = @import("websocket"); 10 + const broadcaster = @import("../broadcaster.zig"); 11 + const event_log_mod = @import("../event_log.zig"); 12 + const backfill_mod = @import("../backfill.zig"); 13 + 14 + const log = std.log.scoped(.relay); 15 + 16 + const HttpContext = router.HttpContext; 17 + 18 + /// check admin auth via headers, send error response if not authorized. returns true if authorized. 19 + pub fn checkAdmin(conn: *h.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { 20 + const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 21 + h.respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); 22 + return false; 23 + }; 24 + 25 + const kv = headers orelse { 26 + h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 27 + return false; 28 + }; 29 + 30 + // handshake parser lowercases all header names 31 + const auth_value = kv.get("authorization") orelse { 32 + h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 33 + return false; 34 + }; 35 + 36 + const bearer_prefix = "Bearer "; 37 + if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { 38 + h.respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); 39 + return false; 40 + } 41 + const token = auth_value[bearer_prefix.len..]; 42 + if (!std.mem.eql(u8, token, admin_pw)) { 43 + h.respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); 44 + return false; 45 + } 46 + return true; 47 + } 48 + 49 + pub fn handleBan(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 50 + if (!checkAdmin(conn, headers)) return; 51 + 52 + const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 53 + h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); 54 + return; 55 + }; 56 + defer parsed.deinit(); 57 + const did = parsed.value.did; 58 + 59 + // resolve DID → UID and take down 60 + const uid = ctx.persist.uidForDid(did) catch { 61 + h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 62 + return; 63 + }; 64 + ctx.persist.takeDownUser(uid) catch { 65 + h.respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); 66 + return; 67 + }; 68 + 69 + // emit #account event so downstream consumers see the takedown 70 + if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 71 + if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 72 + ctx.bc.stats.relay_seq.store(relay_seq, .release); 73 + const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 74 + ctx.bc.broadcast(relay_seq, broadcast_data); 75 + log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 76 + } else |err| { 77 + log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); 78 + } 79 + } 80 + 81 + log.info("admin: banned {s} (uid={d})", .{ did, uid }); 82 + h.respondJson(conn, .ok, "{\"success\":true}"); 83 + } 84 + 85 + pub fn handleAdminListHosts(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 86 + if (!checkAdmin(conn, headers)) return; 87 + 88 + const persist = ctx.persist; 89 + const hosts = persist.listAllHosts(persist.allocator) catch { 90 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 91 + return; 92 + }; 93 + defer { 94 + for (hosts) |host| { 95 + persist.allocator.free(host.hostname); 96 + persist.allocator.free(host.status); 97 + } 98 + persist.allocator.free(hosts); 99 + } 100 + 101 + var list: std.ArrayListUnmanaged(u8) = .{}; 102 + defer list.deinit(persist.allocator); 103 + const w = list.writer(persist.allocator); 104 + 105 + w.writeAll("{\"hosts\":[") catch return; 106 + 107 + for (hosts, 0..) |host, i| { 108 + if (i > 0) w.writeByte(',') catch return; 109 + std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d}}}", .{ 110 + host.id, 111 + host.hostname, 112 + host.status, 113 + host.last_seq, 114 + host.failed_attempts, 115 + }) catch return; 116 + } 117 + 118 + std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; 119 + h.respondJson(conn, .ok, list.items); 120 + } 121 + 122 + pub fn handleAdminBlockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 123 + if (!checkAdmin(conn, headers)) return; 124 + 125 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 126 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 127 + return; 128 + }; 129 + defer parsed.deinit(); 130 + 131 + const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 132 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 133 + return; 134 + }; 135 + 136 + persist.updateHostStatus(host_info.id, "blocked") catch { 137 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 138 + return; 139 + }; 140 + 141 + log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 142 + h.respondJson(conn, .ok, "{\"success\":true}"); 143 + } 144 + 145 + pub fn handleAdminUnblockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 146 + if (!checkAdmin(conn, headers)) return; 147 + 148 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 149 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 150 + return; 151 + }; 152 + defer parsed.deinit(); 153 + 154 + const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 155 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 156 + return; 157 + }; 158 + 159 + persist.updateHostStatus(host_info.id, "active") catch { 160 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 161 + return; 162 + }; 163 + persist.resetHostFailures(host_info.id) catch {}; 164 + 165 + log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 166 + h.respondJson(conn, .ok, "{\"success\":true}"); 167 + } 168 + 169 + pub fn handleAdminBackfillTrigger(conn: *h.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { 170 + if (!checkAdmin(conn, headers)) return; 171 + 172 + const source = h.queryParam(query, "source") orelse "bsky.network"; 173 + 174 + backfiller.start(source) catch |err| { 175 + switch (err) { 176 + error.AlreadyRunning => { 177 + h.respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); 178 + }, 179 + else => { 180 + h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); 181 + }, 182 + } 183 + return; 184 + }; 185 + 186 + var buf: [256]u8 = undefined; 187 + const resp_body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 188 + h.respondJson(conn, .ok, "{\"status\":\"started\"}"); 189 + return; 190 + }; 191 + h.respondJson(conn, .ok, resp_body); 192 + } 193 + 194 + pub fn handleAdminBackfillStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { 195 + if (!checkAdmin(conn, headers)) return; 196 + 197 + const body = backfiller.getStatus(backfiller.allocator) catch { 198 + h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); 199 + return; 200 + }; 201 + defer backfiller.allocator.free(body); 202 + 203 + h.respondJson(conn, .ok, body); 204 + } 205 + 206 + // --- protocol helpers (used only by handleBan) --- 207 + 208 + /// build a CBOR #account frame for a takedown event. 209 + /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} 210 + fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { 211 + const zat = @import("zat"); 212 + const cbor = zat.cbor; 213 + 214 + const header: cbor.Value = .{ .map = &.{ 215 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 216 + .{ .key = "t", .value = .{ .text = "#account" } }, 217 + } }; 218 + 219 + var time_buf: [24]u8 = undefined; 220 + const time_str = formatTimestamp(&time_buf); 221 + 222 + const payload: cbor.Value = .{ .map = &.{ 223 + .{ .key = "seq", .value = .{ .unsigned = 0 } }, 224 + .{ .key = "did", .value = .{ .text = did } }, 225 + .{ .key = "time", .value = .{ .text = time_str } }, 226 + .{ .key = "active", .value = .{ .boolean = false } }, 227 + .{ .key = "status", .value = .{ .text = "takendown" } }, 228 + } }; 229 + 230 + const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; 231 + const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { 232 + allocator.free(header_bytes); 233 + return null; 234 + }; 235 + 236 + var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { 237 + allocator.free(header_bytes); 238 + allocator.free(payload_bytes); 239 + return null; 240 + }; 241 + @memcpy(frame[0..header_bytes.len], header_bytes); 242 + @memcpy(frame[header_bytes.len..], payload_bytes); 243 + 244 + allocator.free(header_bytes); 245 + allocator.free(payload_bytes); 246 + 247 + return frame; 248 + } 249 + 250 + /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) 251 + fn formatTimestamp(buf: *[24]u8) []const u8 { 252 + const ts: u64 = @intCast(std.time.timestamp()); 253 + const es = std.time.epoch.EpochSeconds{ .secs = ts }; 254 + const day = es.getEpochDay(); 255 + const yd = day.calculateYearDay(); 256 + const md = yd.calculateMonthDay(); 257 + const ds = es.getDaySeconds(); 258 + 259 + return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 260 + yd.year, 261 + @as(u32, @intFromEnum(md.month)) + 1, 262 + @as(u32, md.day_index) + 1, 263 + ds.getHoursIntoDay(), 264 + ds.getMinutesIntoHour(), 265 + ds.getSecondsIntoMinute(), 266 + }) catch "1970-01-01T00:00:00Z"; 267 + }
+107
src/api/http.zig
··· 1 + //! HTTP response helpers and query string parsing. 2 + //! 3 + //! pure utility module — no domain dependencies. used by all API handler modules 4 + //! to write raw HTTP responses to websocket connections and parse query parameters. 5 + 6 + const std = @import("std"); 7 + const http = std.http; 8 + const websocket = @import("websocket"); 9 + 10 + pub const Conn = websocket.Conn; 11 + 12 + pub fn httpRespond(conn: *Conn, status: http.Status, content_type: []const u8, body: []const u8) void { 13 + var buf: [512]u8 = undefined; 14 + const header = std.fmt.bufPrint(&buf, "HTTP/1.1 {s}\r\nContent-Type: {s}\r\nContent-Length: {d}\r\nConnection: close\r\nServer: zlay\r\n\r\n", .{ 15 + httpStatusLine(status), 16 + content_type, 17 + body.len, 18 + }) catch return; 19 + conn.writeFramed(header) catch return; 20 + if (body.len > 0) conn.writeFramed(body) catch return; 21 + } 22 + 23 + pub fn respondJson(conn: *Conn, status: http.Status, body: []const u8) void { 24 + httpRespond(conn, status, "application/json", body); 25 + } 26 + 27 + pub fn respondText(conn: *Conn, status: http.Status, body: []const u8) void { 28 + httpRespond(conn, status, "text/plain", body); 29 + } 30 + 31 + pub fn httpStatusLine(status: http.Status) []const u8 { 32 + return switch (status) { 33 + .ok => "200 OK", 34 + .bad_request => "400 Bad Request", 35 + .unauthorized => "401 Unauthorized", 36 + .forbidden => "403 Forbidden", 37 + .not_found => "404 Not Found", 38 + .method_not_allowed => "405 Method Not Allowed", 39 + .conflict => "409 Conflict", 40 + .internal_server_error => "500 Internal Server Error", 41 + else => "500 Internal Server Error", 42 + }; 43 + } 44 + 45 + // --- query string helpers --- 46 + 47 + pub fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 48 + if (query.len == 0) return null; 49 + var iter = std.mem.splitScalar(u8, query, '&'); 50 + while (iter.next()) |pair| { 51 + const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 52 + if (std.mem.eql(u8, pair[0..eq], name)) { 53 + return pair[eq + 1 ..]; 54 + } 55 + } 56 + return null; 57 + } 58 + 59 + /// like queryParam but percent-decodes the value into buf. 60 + /// returns null if the param is missing, or a slice into buf with the decoded value. 61 + pub fn queryParamDecoded(query: []const u8, name: []const u8, buf: []u8) ?[]const u8 { 62 + const raw = queryParam(query, name) orelse return null; 63 + var i: usize = 0; 64 + var out: usize = 0; 65 + while (i < raw.len) { 66 + if (raw[i] == '%' and i + 2 < raw.len) { 67 + const hi = hexVal(raw[i + 1]) orelse { 68 + if (out >= buf.len) return null; 69 + buf[out] = raw[i]; 70 + out += 1; 71 + i += 1; 72 + continue; 73 + }; 74 + const lo = hexVal(raw[i + 2]) orelse { 75 + if (out >= buf.len) return null; 76 + buf[out] = raw[i]; 77 + out += 1; 78 + i += 1; 79 + continue; 80 + }; 81 + if (out >= buf.len) return null; 82 + buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 83 + out += 1; 84 + i += 3; 85 + } else if (raw[i] == '+') { 86 + if (out >= buf.len) return null; 87 + buf[out] = ' '; 88 + out += 1; 89 + i += 1; 90 + } else { 91 + if (out >= buf.len) return null; 92 + buf[out] = raw[i]; 93 + out += 1; 94 + i += 1; 95 + } 96 + } 97 + return buf[0..out]; 98 + } 99 + 100 + fn hexVal(c: u8) ?u4 { 101 + return switch (c) { 102 + '0'...'9' => @intCast(c - '0'), 103 + 'a'...'f' => @intCast(c - 'a' + 10), 104 + 'A'...'F' => @intCast(c - 'A' + 10), 105 + else => null, 106 + }; 107 + }
+117
src/api/router.zig
··· 1 + //! HTTP route dispatch and static content. 2 + //! 3 + //! defines HttpContext (the shared context plumbed from main.zig) and the 4 + //! top-level request router that delegates to xrpc and admin handler modules. 5 + 6 + const std = @import("std"); 7 + const websocket = @import("websocket"); 8 + const broadcaster = @import("../broadcaster.zig"); 9 + const validator_mod = @import("../validator.zig"); 10 + const slurper_mod = @import("../slurper.zig"); 11 + const event_log_mod = @import("../event_log.zig"); 12 + const collection_index_mod = @import("../collection_index.zig"); 13 + const backfill_mod = @import("../backfill.zig"); 14 + const h = @import("http.zig"); 15 + const xrpc = @import("xrpc.zig"); 16 + const admin = @import("admin.zig"); 17 + 18 + /// context for HTTP fallback handlers (passed as opaque pointer through broadcaster) 19 + pub const HttpContext = struct { 20 + stats: *broadcaster.Stats, 21 + persist: *event_log_mod.DiskPersist, 22 + slurper: *slurper_mod.Slurper, 23 + collection_index: *collection_index_mod.CollectionIndex, 24 + backfiller: *backfill_mod.Backfiller, 25 + bc: *broadcaster.Broadcaster, 26 + validator: *validator_mod.Validator, 27 + }; 28 + 29 + /// top-level HTTP request router — installed as bc.http_fallback 30 + pub fn handleHttpRequest( 31 + conn: *websocket.Conn, 32 + method: []const u8, 33 + url: []const u8, 34 + body: []const u8, 35 + headers: *const websocket.Handshake.KeyValue, 36 + opaque_ctx: ?*anyopaque, 37 + ) void { 38 + const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return)); 39 + 40 + const qmark = std.mem.indexOfScalar(u8, url, '?'); 41 + const path = url[0..(qmark orelse url.len)]; 42 + const query = if (qmark) |q| url[q + 1 ..] else ""; 43 + 44 + if (std.mem.eql(u8, method, "GET")) { 45 + handleGet(conn, path, query, headers, ctx); 46 + } else if (std.mem.eql(u8, method, "POST")) { 47 + handlePost(conn, path, query, body, headers, ctx); 48 + } else { 49 + h.respondText(conn, .method_not_allowed, "method not allowed"); 50 + } 51 + } 52 + 53 + fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 54 + if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 55 + h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); 56 + } else if (std.mem.eql(u8, path, "/_stats")) { 57 + var stats_buf: [4096]u8 = undefined; 58 + const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf); 59 + h.respondJson(conn, .ok, body); 60 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 61 + xrpc.handleListRepos(conn, query, ctx.persist); 62 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 63 + xrpc.handleGetRepoStatus(conn, query, ctx.persist); 64 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 65 + xrpc.handleGetLatestCommit(conn, query, ctx.persist); 66 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 67 + xrpc.handleListReposByCollection(conn, query, ctx.collection_index); 68 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 69 + xrpc.handleListHosts(conn, query, ctx.persist); 70 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 71 + xrpc.handleGetHostStatus(conn, query, ctx.persist); 72 + } else if (std.mem.eql(u8, path, "/admin/hosts")) { 73 + admin.handleAdminListHosts(conn, headers, ctx); 74 + } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 75 + admin.handleAdminBackfillStatus(conn, headers, ctx.backfiller); 76 + } else if (std.mem.eql(u8, path, "/")) { 77 + h.respondText(conn, .ok, 78 + \\ _ 79 + \\ ___| | __ _ _ _ 80 + \\|_ / |/ _` | | | | 81 + \\ / /| | (_| | |_| | 82 + \\/___|_|\__,_|\__, | 83 + \\ |___/ 84 + \\ 85 + \\This is an atproto [https://atproto.com] relay instance, 86 + \\running the zlay codebase [https://tangled.org/zzstoatzz.io/zlay] 87 + \\ 88 + \\The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 89 + \\ 90 + ); 91 + } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) { 92 + h.httpRespond(conn, .ok, "image/svg+xml", 93 + \\<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32"> 94 + \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/> 95 + \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text> 96 + \\</svg> 97 + ); 98 + } else { 99 + h.respondText(conn, .not_found, "not found"); 100 + } 101 + } 102 + 103 + fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 104 + if (std.mem.eql(u8, path, "/admin/repo/ban")) { 105 + admin.handleBan(conn, body, headers, ctx); 106 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 107 + xrpc.handleRequestCrawl(conn, body, ctx.slurper); 108 + } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { 109 + admin.handleAdminBlockHost(conn, body, headers, ctx.persist); 110 + } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 111 + admin.handleAdminUnblockHost(conn, body, headers, ctx.persist); 112 + } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 113 + admin.handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); 114 + } else { 115 + h.respondText(conn, .not_found, "not found"); 116 + } 117 + }
+467
src/api/xrpc.zig
··· 1 + //! XRPC endpoint handlers for the AT Protocol sync API. 2 + //! 3 + //! implements com.atproto.sync.* lexicon endpoints: 4 + //! listRepos, getRepoStatus, getLatestCommit, listReposByCollection, 5 + //! listHosts, getHostStatus, requestCrawl 6 + 7 + const std = @import("std"); 8 + const h = @import("http.zig"); 9 + const event_log_mod = @import("../event_log.zig"); 10 + const collection_index_mod = @import("../collection_index.zig"); 11 + const slurper_mod = @import("../slurper.zig"); 12 + 13 + const log = std.log.scoped(.relay); 14 + 15 + pub fn handleListRepos(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 16 + const cursor_str = h.queryParam(query, "cursor") orelse "0"; 17 + const limit_str = h.queryParam(query, "limit") orelse "500"; 18 + 19 + const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 20 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 21 + return; 22 + }; 23 + if (cursor_val < 0) { 24 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 25 + return; 26 + } 27 + 28 + const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 29 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 30 + return; 31 + }; 32 + if (limit < 1 or limit > 1000) { 33 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 34 + return; 35 + } 36 + 37 + // query accounts with repo state, paginated by UID 38 + // includes both local status and upstream_status for combined active check 39 + var result = persist.db.query( 40 + \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 41 + \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 42 + \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 43 + , .{ cursor_val, limit }) catch { 44 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 45 + return; 46 + }; 47 + defer result.deinit(); 48 + 49 + // build JSON response into a buffer 50 + var buf: [65536]u8 = undefined; 51 + var fbs = std.io.fixedBufferStream(&buf); 52 + const w = fbs.writer(); 53 + 54 + var count: i64 = 0; 55 + var last_uid: i64 = 0; 56 + 57 + w.writeAll("{\"repos\":[") catch return; 58 + 59 + while (result.nextUnsafe() catch null) |row| { 60 + if (count > 0) w.writeByte(',') catch return; 61 + 62 + const uid = row.get(i64, 0); 63 + const did = row.get([]const u8, 1); 64 + const local_status = row.get([]const u8, 2); 65 + const upstream_status = row.get([]const u8, 3); 66 + const rev = row.get([]const u8, 4); 67 + const head = row.get([]const u8, 5); 68 + 69 + // Go relay: Account.IsActive() — both local AND upstream must be active 70 + const local_ok = std.mem.eql(u8, local_status, "active"); 71 + const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 72 + const active = local_ok and upstream_ok; 73 + // Go relay: Account.AccountStatus() — local takes priority 74 + const status = if (!local_ok) local_status else upstream_status; 75 + 76 + w.writeAll("{\"did\":\"") catch return; 77 + w.writeAll(did) catch return; 78 + w.writeAll("\"") catch return; 79 + 80 + if (head.len > 0) { 81 + w.writeAll(",\"head\":\"") catch return; 82 + w.writeAll(head) catch return; 83 + w.writeAll("\"") catch return; 84 + } 85 + if (rev.len > 0) { 86 + w.writeAll(",\"rev\":\"") catch return; 87 + w.writeAll(rev) catch return; 88 + w.writeAll("\"") catch return; 89 + } 90 + 91 + if (active) { 92 + w.writeAll(",\"active\":true") catch return; 93 + } else { 94 + w.writeAll(",\"active\":false,\"status\":\"") catch return; 95 + w.writeAll(status) catch return; 96 + w.writeAll("\"") catch return; 97 + } 98 + 99 + w.writeByte('}') catch return; 100 + last_uid = uid; 101 + count += 1; 102 + } 103 + 104 + w.writeByte(']') catch return; 105 + 106 + // include cursor if we got a full page 107 + if (count >= limit and count >= 2) { 108 + std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_uid}) catch return; 109 + } 110 + 111 + w.writeByte('}') catch return; 112 + 113 + h.respondJson(conn, .ok, fbs.getWritten()); 114 + } 115 + 116 + pub fn handleGetRepoStatus(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 117 + var did_buf: [256]u8 = undefined; 118 + const did = h.queryParamDecoded(query, "did", &did_buf) orelse { 119 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 120 + return; 121 + }; 122 + 123 + // basic DID syntax check 124 + if (!std.mem.startsWith(u8, did, "did:")) { 125 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 126 + return; 127 + } 128 + 129 + // look up account (includes both local and upstream status) 130 + var row = (persist.db.rowUnsafe( 131 + "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 132 + .{did}, 133 + ) catch { 134 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 135 + return; 136 + }) orelse { 137 + h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 138 + return; 139 + }; 140 + defer row.deinit() catch {}; 141 + 142 + const local_status = row.get([]const u8, 1); 143 + const upstream_status = row.get([]const u8, 2); 144 + const rev = row.get([]const u8, 3); 145 + // Go relay: Account.IsActive() / AccountStatus() 146 + const local_ok = std.mem.eql(u8, local_status, "active"); 147 + const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 148 + const active = local_ok and upstream_ok; 149 + const status = if (!local_ok) local_status else upstream_status; 150 + 151 + var buf: [4096]u8 = undefined; 152 + var fbs = std.io.fixedBufferStream(&buf); 153 + const w = fbs.writer(); 154 + 155 + w.writeAll("{\"did\":\"") catch return; 156 + w.writeAll(did) catch return; 157 + w.writeAll("\"") catch return; 158 + 159 + if (active) { 160 + w.writeAll(",\"active\":true") catch return; 161 + } else { 162 + w.writeAll(",\"active\":false,\"status\":\"") catch return; 163 + w.writeAll(status) catch return; 164 + w.writeAll("\"") catch return; 165 + } 166 + 167 + if (rev.len > 0) { 168 + w.writeAll(",\"rev\":\"") catch return; 169 + w.writeAll(rev) catch return; 170 + w.writeAll("\"") catch return; 171 + } 172 + 173 + w.writeByte('}') catch return; 174 + h.respondJson(conn, .ok, fbs.getWritten()); 175 + } 176 + 177 + pub fn handleGetLatestCommit(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 178 + var did_buf: [256]u8 = undefined; 179 + const did = h.queryParamDecoded(query, "did", &did_buf) orelse { 180 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 181 + return; 182 + }; 183 + 184 + if (!std.mem.startsWith(u8, did, "did:")) { 185 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 186 + return; 187 + } 188 + 189 + // look up account + repo state (includes both local and upstream status) 190 + var row = (persist.db.rowUnsafe( 191 + "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 192 + .{did}, 193 + ) catch { 194 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 195 + return; 196 + }) orelse { 197 + h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 198 + return; 199 + }; 200 + defer row.deinit() catch {}; 201 + 202 + const local_status = row.get([]const u8, 0); 203 + const upstream_status = row.get([]const u8, 1); 204 + const rev = row.get([]const u8, 2); 205 + const cid = row.get([]const u8, 3); 206 + 207 + // combined status: local takes priority (Go relay: AccountStatus()) 208 + const status = if (!std.mem.eql(u8, local_status, "active")) local_status else upstream_status; 209 + 210 + // check account status (match Go relay behavior) 211 + if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 212 + h.respondJson(conn, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 213 + return; 214 + } else if (std.mem.eql(u8, status, "deactivated")) { 215 + h.respondJson(conn, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 216 + return; 217 + } else if (std.mem.eql(u8, status, "deleted")) { 218 + h.respondJson(conn, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 219 + return; 220 + } else if (!std.mem.eql(u8, status, "active")) { 221 + h.respondJson(conn, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 222 + return; 223 + } 224 + 225 + if (rev.len == 0 or cid.len == 0) { 226 + h.respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 227 + return; 228 + } 229 + 230 + var buf: [4096]u8 = undefined; 231 + var fbs = std.io.fixedBufferStream(&buf); 232 + const w = fbs.writer(); 233 + 234 + w.writeAll("{\"cid\":\"") catch return; 235 + w.writeAll(cid) catch return; 236 + w.writeAll("\",\"rev\":\"") catch return; 237 + w.writeAll(rev) catch return; 238 + w.writeAll("\"}") catch return; 239 + 240 + h.respondJson(conn, .ok, fbs.getWritten()); 241 + } 242 + 243 + pub fn handleListReposByCollection(conn: *h.Conn, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { 244 + const collection = h.queryParam(query, "collection") orelse { 245 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 246 + return; 247 + }; 248 + 249 + if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { 250 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 251 + return; 252 + } 253 + 254 + const limit_str = h.queryParam(query, "limit") orelse "500"; 255 + const limit = std.fmt.parseInt(usize, limit_str, 10) catch { 256 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 257 + return; 258 + }; 259 + if (limit < 1 or limit > 1000) { 260 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 261 + return; 262 + } 263 + 264 + var cursor_buf: [256]u8 = undefined; 265 + const cursor_did = h.queryParamDecoded(query, "cursor", &cursor_buf); 266 + 267 + // scan collection index 268 + var did_buf: [65536]u8 = undefined; 269 + const ci_result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 270 + h.respondJson(conn, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 271 + return; 272 + }; 273 + 274 + // build JSON response 275 + var buf: [65536]u8 = undefined; 276 + var fbs = std.io.fixedBufferStream(&buf); 277 + const w = fbs.writer(); 278 + 279 + w.writeAll("{\"repos\":[") catch return; 280 + for (0..ci_result.count) |i| { 281 + if (i > 0) w.writeByte(',') catch return; 282 + w.writeAll("{\"did\":\"") catch return; 283 + w.writeAll(ci_result.getDid(i)) catch return; 284 + w.writeAll("\"}") catch return; 285 + } 286 + w.writeByte(']') catch return; 287 + 288 + if (ci_result.last_did) |last| { 289 + if (ci_result.count >= limit) { 290 + w.writeAll(",\"cursor\":\"") catch return; 291 + w.writeAll(last) catch return; 292 + w.writeAll("\"") catch return; 293 + } 294 + } 295 + 296 + w.writeByte('}') catch return; 297 + h.respondJson(conn, .ok, fbs.getWritten()); 298 + } 299 + 300 + pub fn handleListHosts(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 301 + const cursor_str = h.queryParam(query, "cursor") orelse "0"; 302 + const limit_str = h.queryParam(query, "limit") orelse "200"; 303 + 304 + const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 305 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 306 + return; 307 + }; 308 + if (cursor_val < 0) { 309 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 310 + return; 311 + } 312 + 313 + const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 314 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 315 + return; 316 + }; 317 + if (limit < 1 or limit > 1000) { 318 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 319 + return; 320 + } 321 + 322 + var result = persist.db.query( 323 + "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 324 + .{ cursor_val, limit }, 325 + ) catch { 326 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 327 + return; 328 + }; 329 + defer result.deinit(); 330 + 331 + var buf: [65536]u8 = undefined; 332 + var fbs = std.io.fixedBufferStream(&buf); 333 + const w = fbs.writer(); 334 + 335 + var count: i64 = 0; 336 + var last_id: i64 = 0; 337 + 338 + w.writeAll("{\"hosts\":[") catch return; 339 + 340 + while (result.nextUnsafe() catch null) |row| { 341 + if (count > 0) w.writeByte(',') catch return; 342 + 343 + const id = row.get(i64, 0); 344 + const hostname = row.get([]const u8, 1); 345 + const status = row.get([]const u8, 2); 346 + const seq = row.get(i64, 3); 347 + 348 + w.writeAll("{\"hostname\":\"") catch return; 349 + w.writeAll(hostname) catch return; 350 + w.writeAll("\"") catch return; 351 + std.fmt.format(w, ",\"seq\":{d}", .{seq}) catch return; 352 + w.writeAll(",\"status\":\"") catch return; 353 + w.writeAll(status) catch return; 354 + w.writeAll("\"}") catch return; 355 + 356 + last_id = id; 357 + count += 1; 358 + } 359 + 360 + w.writeByte(']') catch return; 361 + 362 + if (count >= limit and count > 1) { 363 + std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_id}) catch return; 364 + } 365 + 366 + w.writeByte('}') catch return; 367 + h.respondJson(conn, .ok, fbs.getWritten()); 368 + } 369 + 370 + pub fn handleGetHostStatus(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 371 + var hostname_buf: [256]u8 = undefined; 372 + const hostname = h.queryParamDecoded(query, "hostname", &hostname_buf) orelse { 373 + h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 374 + return; 375 + }; 376 + 377 + // look up host 378 + var row = (persist.db.rowUnsafe( 379 + "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 380 + .{hostname}, 381 + ) catch { 382 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 383 + return; 384 + }) orelse { 385 + h.respondJson(conn, .bad_request, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 386 + return; 387 + }; 388 + defer row.deinit() catch {}; 389 + 390 + const host_id = row.get(i64, 0); 391 + const host_name = row.get([]const u8, 1); 392 + const raw_status = row.get([]const u8, 2); 393 + const seq = row.get(i64, 3); 394 + 395 + // map internal status to lexicon hostStatus values 396 + const status = if (std.mem.eql(u8, raw_status, "blocked")) 397 + "banned" 398 + else if (std.mem.eql(u8, raw_status, "exhausted")) 399 + "offline" 400 + else 401 + raw_status; // active, idle pass through 402 + 403 + // count accounts on this host 404 + const account_count: i64 = if (persist.db.rowUnsafe( 405 + "SELECT COUNT(*) FROM account WHERE host_id = $1", 406 + .{host_id}, 407 + ) catch null) |cnt_row| blk: { 408 + var r = cnt_row; 409 + defer r.deinit() catch {}; 410 + break :blk r.get(i64, 0); 411 + } else 0; 412 + 413 + var buf: [4096]u8 = undefined; 414 + var fbs = std.io.fixedBufferStream(&buf); 415 + const w = fbs.writer(); 416 + 417 + w.writeAll("{\"hostname\":\"") catch return; 418 + w.writeAll(host_name) catch return; 419 + w.writeAll("\"") catch return; 420 + std.fmt.format(w, ",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; 421 + w.writeAll(",\"status\":\"") catch return; 422 + w.writeAll(status) catch return; 423 + w.writeAll("\"}") catch return; 424 + 425 + h.respondJson(conn, .ok, fbs.getWritten()); 426 + } 427 + 428 + pub fn handleRequestCrawl(conn: *h.Conn, body: []const u8, slurper: *slurper_mod.Slurper) void { 429 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, slurper.allocator, body, .{ .ignore_unknown_fields = true }) catch { 430 + h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 431 + return; 432 + }; 433 + defer parsed.deinit(); 434 + 435 + // fast validation: hostname format (Go relay does this synchronously in handler) 436 + const hostname = slurper_mod.validateHostname(slurper.allocator, parsed.value.hostname) catch |err| { 437 + log.warn("requestCrawl rejected '{s}': {s}", .{ parsed.value.hostname, @errorName(err) }); 438 + h.respondJson(conn, .bad_request, switch (err) { 439 + error.EmptyHostname => "{\"error\":\"empty hostname\"}", 440 + error.InvalidCharacter => "{\"error\":\"hostname contains invalid characters\"}", 441 + error.InvalidLabel => "{\"error\":\"hostname has invalid label\"}", 442 + error.TooFewLabels => "{\"error\":\"hostname must have at least two labels (e.g. pds.example.com)\"}", 443 + error.LooksLikeIpAddress => "{\"error\":\"IP addresses not allowed, use a hostname\"}", 444 + error.PortNotAllowed => "{\"error\":\"port numbers not allowed\"}", 445 + error.LocalhostNotAllowed => "{\"error\":\"localhost not allowed\"}", 446 + else => "{\"error\":\"invalid hostname\"}", 447 + }); 448 + return; 449 + }; 450 + defer slurper.allocator.free(hostname); 451 + 452 + // fast validation: domain ban check 453 + if (slurper.persist.isDomainBanned(hostname)) { 454 + log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 455 + h.respondJson(conn, .bad_request, "{\"error\":\"domain is banned\"}"); 456 + return; 457 + } 458 + 459 + // enqueue for async processing (describeServer check happens in crawl processor) 460 + slurper.addCrawlRequest(hostname) catch { 461 + h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); 462 + return; 463 + }; 464 + 465 + log.info("crawl requested: {s}", .{hostname}); 466 + h.respondJson(conn, .ok, "{\"success\":true}"); 467 + }