//! admin endpoint handlers for relay management. //! //! all handlers require Bearer token auth against RELAY_ADMIN_PASSWORD. //! includes host blocking/unblocking, account bans, and backfill control. const std = @import("std"); const h = @import("http.zig"); const router = @import("router.zig"); const websocket = @import("websocket"); const broadcaster = @import("../broadcaster.zig"); const event_log_mod = @import("../event_log.zig"); const backfill_mod = @import("../backfill.zig"); const log = std.log.scoped(.relay); const HttpContext = router.HttpContext; /// check admin auth via headers, send error response if not authorized. returns true if authorized. pub fn checkAdmin(conn: *h.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { h.respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); return false; }; const kv = headers orelse { h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); return false; }; // handshake parser lowercases all header names const auth_value = kv.get("authorization") orelse { h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); return false; }; const bearer_prefix = "Bearer "; if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { h.respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); return false; } const token = auth_value[bearer_prefix.len..]; if (!std.mem.eql(u8, token, admin_pw)) { h.respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); return false; } return true; } pub fn handleBan(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); return; }; defer parsed.deinit(); const did = parsed.value.did; // resolve DID → UID and take down const uid = ctx.persist.uidForDid(did) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); return; }; ctx.persist.takeDownUser(uid) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); return; }; // emit #account event so downstream consumers see the takedown if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { ctx.bc.stats.relay_seq.store(relay_seq, .release); const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; ctx.bc.broadcast(relay_seq, broadcast_data); log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); } else |err| { log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); } } log.info("admin: banned {s} (uid={d})", .{ did, uid }); h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminListHosts(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const persist = ctx.persist; const hosts = persist.listAllHosts(persist.allocator) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; }; defer { for (hosts) |host| { persist.allocator.free(host.hostname); persist.allocator.free(host.status); } persist.allocator.free(hosts); } var list: std.ArrayListUnmanaged(u8) = .{}; defer list.deinit(persist.allocator); const w = list.writer(persist.allocator); w.writeAll("{\"hosts\":[") catch return; for (hosts, 0..) |host, i| { if (i > 0) w.writeByte(',') catch return; std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d}}}", .{ host.id, host.hostname, host.status, host.last_seq, host.failed_attempts, }) catch return; } std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; h.respondJson(conn, .ok, list.items); } pub fn handleAdminBlockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); return; }; defer parsed.deinit(); const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); return; }; persist.updateHostStatus(host_info.id, "blocked") catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); return; }; log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminUnblockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); return; }; defer parsed.deinit(); const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); return; }; persist.updateHostStatus(host_info.id, "active") catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); return; }; persist.resetHostFailures(host_info.id) catch {}; log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminBackfillTrigger(conn: *h.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { if (!checkAdmin(conn, headers)) return; const source = h.queryParam(query, "source") orelse "bsky.network"; backfiller.start(source) catch |err| { switch (err) { error.AlreadyRunning => { h.respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); }, else => { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); }, } return; }; var buf: [256]u8 = undefined; const resp_body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { h.respondJson(conn, .ok, "{\"status\":\"started\"}"); return; }; h.respondJson(conn, .ok, resp_body); } pub fn handleAdminBackfillStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { if (!checkAdmin(conn, headers)) return; const body = backfiller.getStatus(backfiller.allocator) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); return; }; defer backfiller.allocator.free(body); h.respondJson(conn, .ok, body); } // --- protocol helpers (used only by handleBan) --- /// build a CBOR #account frame for a takedown event. /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { const zat = @import("zat"); const cbor = zat.cbor; const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .unsigned = 1 } }, .{ .key = "t", .value = .{ .text = "#account" } }, } }; var time_buf: [24]u8 = undefined; const time_str = formatTimestamp(&time_buf); const payload: cbor.Value = .{ .map = &.{ .{ .key = "seq", .value = .{ .unsigned = 0 } }, .{ .key = "did", .value = .{ .text = did } }, .{ .key = "time", .value = .{ .text = time_str } }, .{ .key = "active", .value = .{ .boolean = false } }, .{ .key = "status", .value = .{ .text = "takendown" } }, } }; const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { allocator.free(header_bytes); return null; }; var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { allocator.free(header_bytes); allocator.free(payload_bytes); return null; }; @memcpy(frame[0..header_bytes.len], header_bytes); @memcpy(frame[header_bytes.len..], payload_bytes); allocator.free(header_bytes); allocator.free(payload_bytes); return frame; } /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) fn formatTimestamp(buf: *[24]u8) []const u8 { const ts: u64 = @intCast(std.time.timestamp()); const es = std.time.epoch.EpochSeconds{ .secs = ts }; const day = es.getEpochDay(); const yd = day.calculateYearDay(); const md = yd.calculateMonthDay(); const ds = es.getDaySeconds(); return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ yd.year, @as(u32, @intFromEnum(md.month)) + 1, @as(u32, md.day_index) + 1, ds.getHoursIntoDay(), ds.getMinutesIntoHour(), ds.getSecondsIntoMinute(), }) catch "1970-01-01T00:00:00Z"; }