//! XRPC endpoint handlers for the AT Protocol sync API. //! //! implements com.atproto.sync.* lexicon endpoints: //! listRepos, getRepoStatus, getLatestCommit, listReposByCollection, //! listHosts, getHostStatus, requestCrawl const std = @import("std"); const h = @import("http.zig"); const event_log_mod = @import("../event_log.zig"); const collection_index_mod = @import("../collection_index.zig"); const slurper_mod = @import("../slurper.zig"); const log = std.log.scoped(.relay); pub fn handleListRepos(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { const cursor_str = h.queryParam(query, "cursor") orelse "0"; const limit_str = h.queryParam(query, "limit") orelse "500"; const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); return; }; if (cursor_val < 0) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); return; } const limit = std.fmt.parseInt(i64, limit_str, 10) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); return; }; if (limit < 1 or limit > 1000) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); return; } // query accounts with repo state, paginated by UID // includes both local status and upstream_status for combined active check var result = persist.db.query( \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 , .{ cursor_val, limit }) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; }; defer result.deinit(); // build JSON response into a buffer var buf: [65536]u8 = undefined; var fbs = std.io.fixedBufferStream(&buf); const w = fbs.writer(); var count: i64 = 0; var last_uid: i64 = 0; w.writeAll("{\"repos\":[") catch return; while (result.nextUnsafe() catch null) |row| { if (count > 0) w.writeByte(',') catch return; const uid = row.get(i64, 0); const did = row.get([]const u8, 1); const local_status = row.get([]const u8, 2); const upstream_status = row.get([]const u8, 3); const rev = row.get([]const u8, 4); const head = row.get([]const u8, 5); // Go relay: Account.IsActive() — both local AND upstream must be active const local_ok = std.mem.eql(u8, local_status, "active"); const upstream_ok = std.mem.eql(u8, upstream_status, "active"); const active = local_ok and upstream_ok; // Go relay: Account.AccountStatus() — local takes priority const status = if (!local_ok) local_status else upstream_status; w.writeAll("{\"did\":\"") catch return; w.writeAll(did) catch return; w.writeAll("\"") catch return; w.writeAll(",\"head\":\"") catch return; w.writeAll(head) catch return; w.writeAll("\",\"rev\":\"") catch return; w.writeAll(rev) catch return; w.writeAll("\"") catch return; if (active) { w.writeAll(",\"active\":true") catch return; } else { w.writeAll(",\"active\":false,\"status\":\"") catch return; w.writeAll(status) catch return; w.writeAll("\"") catch return; } w.writeByte('}') catch return; last_uid = uid; count += 1; } w.writeByte(']') catch return; // include cursor if we got a full page if (count >= limit and count >= 2) { std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_uid}) catch return; } w.writeByte('}') catch return; h.respondJson(conn, .ok, fbs.getWritten()); } pub fn handleGetRepoStatus(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { var did_buf: [256]u8 = undefined; const did = h.queryParamDecoded(query, "did", &did_buf) orelse { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); return; }; // basic DID syntax check if (!std.mem.startsWith(u8, did, "did:")) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); return; } // look up account (includes both local and upstream status) var row = (persist.db.rowUnsafe( "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", .{did}, ) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; }) orelse { h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); return; }; defer row.deinit() catch {}; const local_status = row.get([]const u8, 1); const upstream_status = row.get([]const u8, 2); const rev = row.get([]const u8, 3); // Go relay: Account.IsActive() / AccountStatus() const local_ok = std.mem.eql(u8, local_status, "active"); const upstream_ok = std.mem.eql(u8, upstream_status, "active"); const active = local_ok and upstream_ok; const status = if (!local_ok) local_status else upstream_status; var buf: [4096]u8 = undefined; var fbs = std.io.fixedBufferStream(&buf); const w = fbs.writer(); w.writeAll("{\"did\":\"") catch return; w.writeAll(did) catch return; w.writeAll("\"") catch return; if (active) { w.writeAll(",\"active\":true") catch return; } else { w.writeAll(",\"active\":false,\"status\":\"") catch return; w.writeAll(status) catch return; w.writeAll("\"") catch return; } if (rev.len > 0) { w.writeAll(",\"rev\":\"") catch return; w.writeAll(rev) catch return; w.writeAll("\"") catch return; } w.writeByte('}') catch return; h.respondJson(conn, .ok, fbs.getWritten()); } pub fn handleGetLatestCommit(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { var did_buf: [256]u8 = undefined; const did = h.queryParamDecoded(query, "did", &did_buf) orelse { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); return; }; if (!std.mem.startsWith(u8, did, "did:")) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); return; } // look up account + repo state (includes both local and upstream status) var row = (persist.db.rowUnsafe( "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", .{did}, ) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; }) orelse { h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); return; }; defer row.deinit() catch {}; const local_status = row.get([]const u8, 0); const upstream_status = row.get([]const u8, 1); const rev = row.get([]const u8, 2); const cid = row.get([]const u8, 3); // combined status: local takes priority (Go relay: AccountStatus()) const status = if (!std.mem.eql(u8, local_status, "active")) local_status else upstream_status; // check account status (match Go relay behavior) if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { h.respondJson(conn, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); return; } else if (std.mem.eql(u8, status, "deactivated")) { h.respondJson(conn, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); return; } else if (std.mem.eql(u8, status, "deleted")) { h.respondJson(conn, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); return; } else if (!std.mem.eql(u8, status, "active")) { h.respondJson(conn, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); return; } if (rev.len == 0 or cid.len == 0) { h.respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); return; } var buf: [4096]u8 = undefined; var fbs = std.io.fixedBufferStream(&buf); const w = fbs.writer(); w.writeAll("{\"cid\":\"") catch return; w.writeAll(cid) catch return; w.writeAll("\",\"rev\":\"") catch return; w.writeAll(rev) catch return; w.writeAll("\"}") catch return; h.respondJson(conn, .ok, fbs.getWritten()); } pub fn handleListReposByCollection(conn: *h.Conn, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { const collection = h.queryParam(query, "collection") orelse { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); return; }; if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); return; } const limit_str = h.queryParam(query, "limit") orelse "500"; const limit = std.fmt.parseInt(usize, limit_str, 10) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); return; }; if (limit < 1 or limit > 2000) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..2000\"}"); return; } var cursor_buf: [256]u8 = undefined; const cursor_did = h.queryParamDecoded(query, "cursor", &cursor_buf); // scan collection index var did_buf: [65536]u8 = undefined; const ci_result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); return; }; // build JSON response var buf: [65536]u8 = undefined; var fbs = std.io.fixedBufferStream(&buf); const w = fbs.writer(); w.writeAll("{\"repos\":[") catch return; for (0..ci_result.count) |i| { if (i > 0) w.writeByte(',') catch return; w.writeAll("{\"did\":\"") catch return; w.writeAll(ci_result.getDid(i)) catch return; w.writeAll("\"}") catch return; } w.writeByte(']') catch return; if (ci_result.last_did) |last| { if (ci_result.count >= limit) { w.writeAll(",\"cursor\":\"") catch return; w.writeAll(last) catch return; w.writeAll("\"") catch return; } } w.writeByte('}') catch return; h.respondJson(conn, .ok, fbs.getWritten()); } pub fn handleListHosts(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { const cursor_str = h.queryParam(query, "cursor") orelse "0"; const limit_str = h.queryParam(query, "limit") orelse "200"; const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); return; }; if (cursor_val < 0) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); return; } const limit = std.fmt.parseInt(i64, limit_str, 10) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); return; }; if (limit < 1 or limit > 1000) { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); return; } var result = persist.db.query( "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", .{ cursor_val, limit }, ) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; }; defer result.deinit(); var buf: [65536]u8 = undefined; var fbs = std.io.fixedBufferStream(&buf); const w = fbs.writer(); var count: i64 = 0; var last_id: i64 = 0; w.writeAll("{\"hosts\":[") catch return; while (result.nextUnsafe() catch null) |row| { if (count > 0) w.writeByte(',') catch return; const id = row.get(i64, 0); const hostname = row.get([]const u8, 1); const status = row.get([]const u8, 2); const seq = row.get(i64, 3); w.writeAll("{\"hostname\":\"") catch return; w.writeAll(hostname) catch return; w.writeAll("\"") catch return; std.fmt.format(w, ",\"seq\":{d}", .{seq}) catch return; w.writeAll(",\"status\":\"") catch return; w.writeAll(status) catch return; w.writeAll("\"}") catch return; last_id = id; count += 1; } w.writeByte(']') catch return; if (count >= limit and count > 1) { std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_id}) catch return; } w.writeByte('}') catch return; h.respondJson(conn, .ok, fbs.getWritten()); } pub fn handleGetHostStatus(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { var hostname_buf: [256]u8 = undefined; const hostname = h.queryParamDecoded(query, "hostname", &hostname_buf) orelse { h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); return; }; // look up host var row = (persist.db.rowUnsafe( "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", .{hostname}, ) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; }) orelse { h.respondJson(conn, .not_found, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); return; }; defer row.deinit() catch {}; const host_id = row.get(i64, 0); const host_name = row.get([]const u8, 1); const raw_status = row.get([]const u8, 2); const seq = row.get(i64, 3); // map internal status to lexicon hostStatus values const status = if (std.mem.eql(u8, raw_status, "blocked")) "banned" else if (std.mem.eql(u8, raw_status, "exhausted")) "offline" else raw_status; // active, idle pass through // count accounts on this host const account_count: i64 = if (persist.db.rowUnsafe( "SELECT COUNT(*) FROM account WHERE host_id = $1", .{host_id}, ) catch null) |cnt_row| blk: { var r = cnt_row; defer r.deinit() catch {}; break :blk r.get(i64, 0); } else 0; var buf: [4096]u8 = undefined; var fbs = std.io.fixedBufferStream(&buf); const w = fbs.writer(); w.writeAll("{\"hostname\":\"") catch return; w.writeAll(host_name) catch return; w.writeAll("\"") catch return; std.fmt.format(w, ",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; w.writeAll(",\"status\":\"") catch return; w.writeAll(status) catch return; w.writeAll("\"}") catch return; h.respondJson(conn, .ok, fbs.getWritten()); } pub fn handleRequestCrawl(conn: *h.Conn, body: []const u8, slurper: *slurper_mod.Slurper) void { const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, slurper.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); return; }; defer parsed.deinit(); // fast validation: hostname format (Go relay does this synchronously in handler) const hostname = slurper_mod.validateHostname(slurper.allocator, parsed.value.hostname) catch |err| { log.warn("requestCrawl rejected '{s}': {s}", .{ parsed.value.hostname, @errorName(err) }); h.respondJson(conn, .bad_request, switch (err) { error.EmptyHostname => "{\"error\":\"InvalidRequest\",\"message\":\"empty hostname\"}", error.InvalidCharacter => "{\"error\":\"InvalidRequest\",\"message\":\"hostname contains invalid characters\"}", error.InvalidLabel => "{\"error\":\"InvalidRequest\",\"message\":\"hostname has invalid label\"}", error.TooFewLabels => "{\"error\":\"InvalidRequest\",\"message\":\"hostname must have at least two labels (e.g. pds.example.com)\"}", error.LooksLikeIpAddress => "{\"error\":\"InvalidRequest\",\"message\":\"IP addresses not allowed, use a hostname\"}", error.PortNotAllowed => "{\"error\":\"InvalidRequest\",\"message\":\"port numbers not allowed\"}", error.LocalhostNotAllowed => "{\"error\":\"InvalidRequest\",\"message\":\"localhost not allowed\"}", else => "{\"error\":\"InvalidRequest\",\"message\":\"invalid hostname\"}", }); return; }; defer slurper.allocator.free(hostname); // fast validation: domain ban check if (slurper.persist.isDomainBanned(hostname)) { log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"domain is banned\"}"); return; } // enqueue for async processing (describeServer check happens in crawl processor) slurper.addCrawlRequest(hostname) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); return; }; log.info("crawl requested: {s}", .{hostname}); h.respondJson(conn, .ok, "{\"success\":true}"); }