atproto relay implementation in zig zlay.waow.tech

feat: add listRepos, getRepoStatus, getLatestCommit XRPC endpoints

implements the three high-priority sync endpoints matching the Go indigo
relay's response format. listRepos supports cursor/limit pagination,
getRepoStatus returns account status + rev, getLatestCommit checks
account status before returning CID/rev.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+250 -5
+250 -5
src/main.zig
··· 5 5 //! numbers, and rebroadcasts to downstream consumers over WebSocket. 6 6 //! 7 7 //! endpoints: 8 - //! /xrpc/com.atproto.sync.subscribeRepos — firehose WebSocket (supports ?cursor=N) 9 - //! /_health — HTTP 200 JSON health check 10 - //! /_stats — HTTP 200 JSON with relay statistics 8 + //! /xrpc/com.atproto.sync.subscribeRepos — firehose WebSocket (supports ?cursor=N) 9 + //! /xrpc/com.atproto.sync.listRepos — paginated account listing 10 + //! /xrpc/com.atproto.sync.getRepoStatus — single account status 11 + //! /xrpc/com.atproto.sync.getLatestCommit — latest commit CID + rev 12 + //! /xrpc/com.atproto.sync.requestCrawl — request PDS crawl (POST) 13 + //! /_health, /_stats, /metrics — health, stats, prometheus 11 14 12 15 const std = @import("std"); 13 16 const websocket = @import("websocket"); ··· 214 217 const body: []const u8 = if (header_end) |he| request[he + 4 ..] else ""; 215 218 216 219 if (std.mem.eql(u8, method, "GET")) { 217 - handleGet(stream, path, stats); 220 + handleGet(stream, path, stats, persist); 218 221 } else if (std.mem.eql(u8, method, "POST")) { 219 222 handlePost(stream, path, request[0 .. header_end orelse n], body, persist, sub); 220 223 } else { ··· 222 225 } 223 226 } 224 227 225 - fn handleGet(stream: std.net.Stream, path: []const u8, stats: *broadcaster.Stats) void { 228 + fn handleGet(stream: std.net.Stream, full_path: []const u8, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist) void { 229 + // split path from query string 230 + const qmark = std.mem.indexOfScalar(u8, full_path, '?'); 231 + const path = full_path[0..(qmark orelse full_path.len)]; 232 + const query = if (qmark) |q| full_path[q + 1 ..] else ""; 233 + 226 234 if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 227 235 httpRespond(stream, "200 OK", "application/json", "{\"status\":\"ok\"}"); 228 236 } else if (std.mem.eql(u8, path, "/_stats")) { ··· 235 243 var resp_buf: [8192]u8 = undefined; 236 244 const response = std.fmt.bufPrint(&resp_buf, "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4; charset=utf-8\r\nContent-Length: {d}\r\nConnection: close\r\n\r\n{s}", .{ body.len, body }) catch return; 237 245 _ = stream.write(response) catch {}; 246 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 247 + handleListRepos(stream, query, persist); 248 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 249 + handleGetRepoStatus(stream, query, persist); 250 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 251 + handleGetLatestCommit(stream, query, persist); 238 252 } else if (std.mem.eql(u8, path, "/")) { 239 253 httpRespond(stream, "200 OK", "text/plain", 240 254 \\ _ ··· 331 345 332 346 log.info("crawl requested: {s}", .{parsed.value.hostname}); 333 347 httpRespond(stream, "200 OK", "application/json", "{\"success\":true}"); 348 + } 349 + 350 + // --- XRPC endpoint handlers --- 351 + 352 + fn handleListRepos(stream: std.net.Stream, query: []const u8, persist: *event_log_mod.DiskPersist) void { 353 + const cursor_str = queryParam(query, "cursor") orelse "0"; 354 + const limit_str = queryParam(query, "limit") orelse "500"; 355 + 356 + const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 357 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 358 + return; 359 + }; 360 + if (cursor_val < 0) { 361 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 362 + return; 363 + } 364 + 365 + const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 366 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 367 + return; 368 + }; 369 + if (limit < 1 or limit > 1000) { 370 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 371 + return; 372 + } 373 + 374 + // query accounts with repo state, paginated by UID 375 + var result = persist.db.query( 376 + \\SELECT a.uid, a.did, a.status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 377 + \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 378 + \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 379 + , .{ cursor_val, limit }) catch { 380 + httpRespondJson(stream, "500 Internal Server Error", "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 381 + return; 382 + }; 383 + defer result.deinit(); 384 + 385 + // build JSON response into a buffer 386 + var buf: [65536]u8 = undefined; 387 + var fbs = std.io.fixedBufferStream(&buf); 388 + const w = fbs.writer(); 389 + 390 + var count: i64 = 0; 391 + var last_uid: i64 = 0; 392 + 393 + w.writeAll("{\"repos\":[") catch return; 394 + 395 + while (result.nextUnsafe() catch null) |row| { 396 + if (count > 0) w.writeByte(',') catch return; 397 + 398 + const uid = row.get(i64, 0); 399 + const did = row.get([]const u8, 1); 400 + const status = row.get([]const u8, 2); 401 + const rev = row.get([]const u8, 3); 402 + const head = row.get([]const u8, 4); 403 + 404 + const active = std.mem.eql(u8, status, "active"); 405 + 406 + w.writeAll("{\"did\":\"") catch return; 407 + w.writeAll(did) catch return; 408 + w.writeAll("\"") catch return; 409 + 410 + if (head.len > 0) { 411 + w.writeAll(",\"head\":\"") catch return; 412 + w.writeAll(head) catch return; 413 + w.writeAll("\"") catch return; 414 + } 415 + if (rev.len > 0) { 416 + w.writeAll(",\"rev\":\"") catch return; 417 + w.writeAll(rev) catch return; 418 + w.writeAll("\"") catch return; 419 + } 420 + 421 + if (active) { 422 + w.writeAll(",\"active\":true") catch return; 423 + } else { 424 + w.writeAll(",\"active\":false,\"status\":\"") catch return; 425 + w.writeAll(status) catch return; 426 + w.writeAll("\"") catch return; 427 + } 428 + 429 + w.writeByte('}') catch return; 430 + last_uid = uid; 431 + count += 1; 432 + } 433 + 434 + w.writeByte(']') catch return; 435 + 436 + // include cursor if we got a full page 437 + if (count >= limit and count >= 2) { 438 + std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_uid}) catch return; 439 + } 440 + 441 + w.writeByte('}') catch return; 442 + 443 + const body = fbs.getWritten(); 444 + httpRespondJson(stream, "200 OK", body); 445 + } 446 + 447 + fn handleGetRepoStatus(stream: std.net.Stream, query: []const u8, persist: *event_log_mod.DiskPersist) void { 448 + const did = queryParam(query, "did") orelse { 449 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 450 + return; 451 + }; 452 + 453 + // basic DID syntax check 454 + if (!std.mem.startsWith(u8, did, "did:")) { 455 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 456 + return; 457 + } 458 + 459 + // look up account 460 + var row = (persist.db.rowUnsafe( 461 + "SELECT a.uid, a.status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 462 + .{did}, 463 + ) catch { 464 + httpRespondJson(stream, "500 Internal Server Error", "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 465 + return; 466 + }) orelse { 467 + httpRespondJson(stream, "404 Not Found", "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 468 + return; 469 + }; 470 + defer row.deinit() catch {}; 471 + 472 + const status = row.get([]const u8, 1); 473 + const rev = row.get([]const u8, 2); 474 + const active = std.mem.eql(u8, status, "active"); 475 + 476 + var buf: [4096]u8 = undefined; 477 + var fbs = std.io.fixedBufferStream(&buf); 478 + const w = fbs.writer(); 479 + 480 + w.writeAll("{\"did\":\"") catch return; 481 + w.writeAll(did) catch return; 482 + w.writeAll("\"") catch return; 483 + 484 + if (active) { 485 + w.writeAll(",\"active\":true") catch return; 486 + } else { 487 + w.writeAll(",\"active\":false,\"status\":\"") catch return; 488 + w.writeAll(status) catch return; 489 + w.writeAll("\"") catch return; 490 + } 491 + 492 + if (rev.len > 0) { 493 + w.writeAll(",\"rev\":\"") catch return; 494 + w.writeAll(rev) catch return; 495 + w.writeAll("\"") catch return; 496 + } 497 + 498 + w.writeByte('}') catch return; 499 + httpRespondJson(stream, "200 OK", fbs.getWritten()); 500 + } 501 + 502 + fn handleGetLatestCommit(stream: std.net.Stream, query: []const u8, persist: *event_log_mod.DiskPersist) void { 503 + const did = queryParam(query, "did") orelse { 504 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 505 + return; 506 + }; 507 + 508 + if (!std.mem.startsWith(u8, did, "did:")) { 509 + httpRespondJson(stream, "400 Bad Request", "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 510 + return; 511 + } 512 + 513 + // look up account + repo state 514 + var row = (persist.db.rowUnsafe( 515 + "SELECT a.status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 516 + .{did}, 517 + ) catch { 518 + httpRespondJson(stream, "500 Internal Server Error", "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 519 + return; 520 + }) orelse { 521 + httpRespondJson(stream, "404 Not Found", "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 522 + return; 523 + }; 524 + defer row.deinit() catch {}; 525 + 526 + const status = row.get([]const u8, 0); 527 + const rev = row.get([]const u8, 1); 528 + const cid = row.get([]const u8, 2); 529 + 530 + // check account status (match Go relay behavior) 531 + if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 532 + httpRespondJson(stream, "403 Forbidden", "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 533 + return; 534 + } else if (std.mem.eql(u8, status, "deactivated")) { 535 + httpRespondJson(stream, "403 Forbidden", "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 536 + return; 537 + } else if (std.mem.eql(u8, status, "deleted")) { 538 + httpRespondJson(stream, "403 Forbidden", "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 539 + return; 540 + } else if (!std.mem.eql(u8, status, "active")) { 541 + httpRespondJson(stream, "403 Forbidden", "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 542 + return; 543 + } 544 + 545 + if (rev.len == 0 or cid.len == 0) { 546 + httpRespondJson(stream, "404 Not Found", "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 547 + return; 548 + } 549 + 550 + var buf: [4096]u8 = undefined; 551 + var fbs = std.io.fixedBufferStream(&buf); 552 + const w = fbs.writer(); 553 + 554 + w.writeAll("{\"cid\":\"") catch return; 555 + w.writeAll(cid) catch return; 556 + w.writeAll("\",\"rev\":\"") catch return; 557 + w.writeAll(rev) catch return; 558 + w.writeAll("\"}") catch return; 559 + 560 + httpRespondJson(stream, "200 OK", fbs.getWritten()); 561 + } 562 + 563 + // --- query string helpers --- 564 + 565 + fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 566 + if (query.len == 0) return null; 567 + var iter = std.mem.splitScalar(u8, query, '&'); 568 + while (iter.next()) |pair| { 569 + const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 570 + if (std.mem.eql(u8, pair[0..eq], name)) { 571 + return pair[eq + 1 ..]; 572 + } 573 + } 574 + return null; 575 + } 576 + 577 + fn httpRespondJson(stream: std.net.Stream, status: []const u8, body: []const u8) void { 578 + httpRespond(stream, status, "application/json", body); 334 579 } 335 580 336 581 fn findHeader(headers: []const u8, name: []const u8) ?[]const u8 {