atproto relay implementation in zig zlay.waow.tech

feat: admin-triggered collection index backfill from source relay

discovers collections from lexicon garden llms.txt + RBC scan,
paginates through source relay's listReposByCollection, tracks
progress in postgres for resumability.

POST /admin/backfill-collections?source=bsky.network — trigger
GET /admin/backfill-collections — status

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+542 -6
+1
build.zig
··· 57 57 "src/event_log.zig", 58 58 "src/slurper.zig", 59 59 "src/collection_index.zig", 60 + "src/backfill.zig", 60 61 }; 61 62 inline for (test_files) |file| { 62 63 const t = b.addTest(.{
+396
src/backfill.zig
··· 1 + //! collection index backfill — discovers collections and imports DIDs from a source relay 2 + //! 3 + //! two collection discovery sources (unioned): 4 + //! 1. lexicon garden llms.txt — ~700 known NSIDs 5 + //! 2. RBC column family scan — firehose-observed collections already in the index 6 + //! 7 + //! progress tracked in postgres (backfill_progress table) for resumability. 8 + //! triggered via POST /admin/backfill-collections, status via GET. 9 + 10 + const std = @import("std"); 11 + const http = std.http; 12 + const pg = @import("pg"); 13 + const collection_index_mod = @import("collection_index.zig"); 14 + 15 + const Allocator = std.mem.Allocator; 16 + const log = std.log.scoped(.backfill); 17 + 18 + const FetchResult = struct { 19 + dids: [][]const u8, 20 + next_cursor: ?[]const u8, 21 + }; 22 + 23 + pub const Backfiller = struct { 24 + allocator: Allocator, 25 + collection_index: *collection_index_mod.CollectionIndex, 26 + db: *pg.Pool, 27 + running: std.atomic.Value(bool), 28 + thread: ?std.Thread, 29 + source: []const u8, 30 + 31 + pub fn init( 32 + allocator: Allocator, 33 + collection_index: *collection_index_mod.CollectionIndex, 34 + db: *pg.Pool, 35 + ) Backfiller { 36 + return .{ 37 + .allocator = allocator, 38 + .collection_index = collection_index, 39 + .db = db, 40 + .running = .{ .raw = false }, 41 + .thread = null, 42 + .source = "", 43 + }; 44 + } 45 + 46 + pub fn isRunning(self: *Backfiller) bool { 47 + return self.running.load(.acquire); 48 + } 49 + 50 + /// start a backfill from the given source relay. returns error if already running. 51 + pub fn start(self: *Backfiller, source: []const u8) !void { 52 + if (self.running.cmpxchgStrong(false, true, .acq_rel, .acquire) != null) { 53 + return error.AlreadyRunning; 54 + } 55 + errdefer self.running.store(false, .release); 56 + 57 + self.source = try self.allocator.dupe(u8, source); 58 + self.thread = try std.Thread.spawn(.{}, run, .{self}); 59 + } 60 + 61 + fn run(self: *Backfiller) void { 62 + defer { 63 + self.allocator.free(self.source); 64 + self.source = ""; 65 + self.thread = null; 66 + self.running.store(false, .release); 67 + } 68 + 69 + // discover collections 70 + const collections = self.discoverCollections() catch |err| { 71 + log.err("collection discovery failed: {s}", .{@errorName(err)}); 72 + return; 73 + }; 74 + defer { 75 + for (collections) |c| self.allocator.free(c); 76 + self.allocator.free(collections); 77 + } 78 + 79 + log.info("discovered {d} collections to backfill from {s}", .{ collections.len, self.source }); 80 + 81 + // insert progress rows (skip existing) 82 + for (collections) |collection| { 83 + _ = self.db.exec( 84 + "INSERT INTO backfill_progress (collection, source) VALUES ($1, $2) ON CONFLICT (collection) DO NOTHING", 85 + .{ collection, self.source }, 86 + ) catch |err| { 87 + log.warn("failed to insert progress for {s}: {s}", .{ collection, @errorName(err) }); 88 + }; 89 + } 90 + 91 + // backfill each collection 92 + for (collections) |collection| { 93 + self.backfillCollection(collection) catch |err| { 94 + log.warn("backfill failed for {s}: {s}", .{ collection, @errorName(err) }); 95 + }; 96 + } 97 + 98 + log.info("backfill complete", .{}); 99 + } 100 + 101 + fn discoverCollections(self: *Backfiller) ![][]const u8 { 102 + var seen: std.StringHashMapUnmanaged(void) = .{}; 103 + defer seen.deinit(self.allocator); 104 + 105 + // source 1: lexicon garden 106 + const garden = self.fetchLexiconGarden() catch |err| blk: { 107 + log.warn("lexicon garden fetch failed: {s}", .{@errorName(err)}); 108 + break :blk &[_][]const u8{}; 109 + }; 110 + defer { 111 + for (garden) |c| self.allocator.free(c); 112 + self.allocator.free(garden); 113 + } 114 + for (garden) |c| { 115 + if (!seen.contains(c)) { 116 + const duped = try self.allocator.dupe(u8, c); 117 + errdefer self.allocator.free(duped); 118 + try seen.put(self.allocator, duped, {}); 119 + } 120 + } 121 + 122 + // source 2: observed collections from RBC scan 123 + const observed = self.collection_index.listKnownCollections(self.allocator) catch |err| blk: { 124 + log.warn("RBC scan failed: {s}", .{@errorName(err)}); 125 + break :blk &[_][]const u8{}; 126 + }; 127 + defer { 128 + for (observed) |c| self.allocator.free(c); 129 + self.allocator.free(observed); 130 + } 131 + for (observed) |c| { 132 + if (!seen.contains(c)) { 133 + const duped = try self.allocator.dupe(u8, c); 134 + errdefer self.allocator.free(duped); 135 + try seen.put(self.allocator, duped, {}); 136 + } 137 + } 138 + 139 + // collect into result 140 + const result = try self.allocator.alloc([]const u8, seen.count()); 141 + var i: usize = 0; 142 + var key_iter = seen.keyIterator(); 143 + while (key_iter.next()) |key| { 144 + result[i] = key.*; 145 + i += 1; 146 + } 147 + 148 + return result; 149 + } 150 + 151 + /// fetch NSIDs from https://lexicon.garden/llms.txt 152 + /// parses lines matching `- [`<nsid>`](` 153 + fn fetchLexiconGarden(self: *Backfiller) ![][]const u8 { 154 + var client: http.Client = .{ .allocator = self.allocator }; 155 + defer client.deinit(); 156 + 157 + var aw: std.Io.Writer.Allocating = .init(self.allocator); 158 + defer aw.deinit(); 159 + 160 + const result = client.fetch(.{ 161 + .location = .{ .url = "https://lexicon.garden/llms.txt" }, 162 + .response_writer = &aw.writer, 163 + .method = .GET, 164 + }) catch return error.FetchFailed; 165 + 166 + if (result.status != .ok) return error.FetchFailed; 167 + 168 + const body = aw.toArrayList().items; 169 + 170 + var nsids: std.ArrayListUnmanaged([]const u8) = .{}; 171 + defer nsids.deinit(self.allocator); 172 + 173 + // parse lines like: - [`app.bsky.feed.post`]( 174 + var lines = std.mem.splitScalar(u8, body, '\n'); 175 + while (lines.next()) |line| { 176 + // look for pattern: - [`<nsid>`]( 177 + const backtick_start = std.mem.indexOf(u8, line, "- [`") orelse continue; 178 + const nsid_start = backtick_start + 4; // skip "- [`" 179 + const rest = line[nsid_start..]; 180 + const backtick_end = std.mem.indexOf(u8, rest, "`](") orelse continue; 181 + const nsid = rest[0..backtick_end]; 182 + 183 + // basic NSID validation: must contain at least one dot 184 + if (!std.mem.containsAtLeast(u8, nsid, 1, ".")) continue; 185 + // skip obviously non-record types (but include them anyway — they'll return empty) 186 + 187 + const duped = try self.allocator.dupe(u8, nsid); 188 + errdefer self.allocator.free(duped); 189 + try nsids.append(self.allocator, duped); 190 + } 191 + 192 + log.info("lexicon garden: found {d} NSIDs", .{nsids.items.len}); 193 + return try nsids.toOwnedSlice(self.allocator); 194 + } 195 + 196 + fn backfillCollection(self: *Backfiller, collection: []const u8) !void { 197 + // single query: check completion, get cursor + count for resume 198 + var cursor: ?[]const u8 = null; 199 + defer if (cursor) |c| self.allocator.free(c); 200 + var imported: i64 = 0; 201 + { 202 + var row = (self.db.rowUnsafe( 203 + "SELECT completed_at IS NOT NULL, cursor, imported_count FROM backfill_progress WHERE collection = $1", 204 + .{collection}, 205 + ) catch return error.DatabaseError) orelse return; 206 + defer row.deinit() catch {}; 207 + 208 + if (row.get(bool, 0)) return; // already done 209 + 210 + const saved_cursor = row.get([]const u8, 1); 211 + imported = row.get(i64, 2); 212 + 213 + if (saved_cursor.len > 0) { 214 + cursor = try self.allocator.dupe(u8, saved_cursor); 215 + log.info("{s}: resuming from cursor (imported {d} so far)", .{ collection, imported }); 216 + } 217 + } 218 + 219 + // reuse one HTTP client across all pages for this collection 220 + var client: http.Client = .{ .allocator = self.allocator }; 221 + defer client.deinit(); 222 + 223 + var page_count: usize = 0; 224 + while (true) { 225 + const fetch_result = self.fetchPage(&client, collection, cursor) catch |err| { 226 + log.warn("{s}: fetch page failed: {s}", .{ collection, @errorName(err) }); 227 + break; 228 + }; 229 + defer { 230 + for (fetch_result.dids) |d| self.allocator.free(d); 231 + self.allocator.free(fetch_result.dids); 232 + if (fetch_result.next_cursor) |nc| self.allocator.free(nc); 233 + } 234 + 235 + // add each DID to collection index 236 + for (fetch_result.dids) |did| { 237 + self.collection_index.addCollection(did, collection) catch {}; 238 + imported += 1; 239 + } 240 + 241 + page_count += 1; 242 + 243 + // update cursor in progress table 244 + const new_cursor = fetch_result.next_cursor orelse ""; 245 + _ = self.db.exec( 246 + "UPDATE backfill_progress SET cursor = $1, imported_count = $2 WHERE collection = $3", 247 + .{ new_cursor, imported, collection }, 248 + ) catch {}; 249 + 250 + if (fetch_result.next_cursor) |nc| { 251 + // advance cursor 252 + if (cursor) |old| self.allocator.free(old); 253 + cursor = self.allocator.dupe(u8, nc) catch break; 254 + 255 + // brief pause between pages 256 + std.posix.nanosleep(0, 100 * std.time.ns_per_ms); 257 + } else { 258 + // no more pages — mark complete 259 + _ = self.db.exec( 260 + "UPDATE backfill_progress SET completed_at = now(), cursor = '', imported_count = $1 WHERE collection = $2", 261 + .{ imported, collection }, 262 + ) catch {}; 263 + log.info("{s}: complete ({d} DIDs, {d} pages)", .{ collection, imported, page_count }); 264 + break; 265 + } 266 + } 267 + } 268 + 269 + fn fetchPage(self: *Backfiller, client: *http.Client, collection: []const u8, cursor: ?[]const u8) !FetchResult { 270 + // build URL 271 + var url_buf: [1024]u8 = undefined; 272 + const url = if (cursor) |c| 273 + std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000&cursor={s}", .{ self.source, collection, c }) catch return error.UrlTooLong 274 + else 275 + std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000", .{ self.source, collection }) catch return error.UrlTooLong; 276 + 277 + var aw: std.Io.Writer.Allocating = .init(self.allocator); 278 + defer aw.deinit(); 279 + 280 + const result = client.fetch(.{ 281 + .location = .{ .url = url }, 282 + .response_writer = &aw.writer, 283 + .method = .GET, 284 + }) catch return error.FetchFailed; 285 + 286 + if (result.status != .ok) return error.FetchFailed; 287 + 288 + const body = aw.toArrayList().items; 289 + 290 + // parse: {"repos": [{"did": "..."}, ...], "cursor": "..."} 291 + const parsed = std.json.parseFromSlice(ListReposResponse, self.allocator, body, .{ .ignore_unknown_fields = true }) catch return error.ParseFailed; 292 + defer parsed.deinit(); 293 + 294 + const repos = parsed.value.repos orelse return .{ 295 + .dids = try self.allocator.alloc([]const u8, 0), 296 + .next_cursor = null, 297 + }; 298 + 299 + var dids: std.ArrayListUnmanaged([]const u8) = .{}; 300 + defer dids.deinit(self.allocator); 301 + 302 + for (repos) |repo| { 303 + const duped = self.allocator.dupe(u8, repo.did) catch continue; 304 + dids.append(self.allocator, duped) catch { 305 + self.allocator.free(duped); 306 + continue; 307 + }; 308 + } 309 + 310 + const next_cursor = if (parsed.value.cursor) |c| 311 + self.allocator.dupe(u8, c) catch null 312 + else 313 + null; 314 + 315 + return .{ 316 + .dids = dids.toOwnedSlice(self.allocator) catch return error.OutOfMemory, 317 + .next_cursor = next_cursor, 318 + }; 319 + } 320 + 321 + const ListReposResponse = struct { 322 + repos: ?[]const RepoEntry = null, 323 + cursor: ?[]const u8 = null, 324 + }; 325 + 326 + const RepoEntry = struct { 327 + did: []const u8, 328 + }; 329 + 330 + /// return status summary for the admin endpoint 331 + pub fn getStatus(self: *Backfiller, allocator: Allocator) ![]u8 { 332 + var list: std.ArrayListUnmanaged(u8) = .{}; 333 + defer list.deinit(allocator); 334 + const w = list.writer(allocator); 335 + 336 + // query aggregate stats 337 + var total: i64 = 0; 338 + var completed: i64 = 0; 339 + var total_imported: i64 = 0; 340 + { 341 + var row = (self.db.rowUnsafe( 342 + "SELECT COUNT(*), COUNT(completed_at), COALESCE(SUM(imported_count), 0) FROM backfill_progress", 343 + .{}, 344 + ) catch null) orelse null; 345 + if (row) |*r| { 346 + total = r.get(i64, 0); 347 + completed = r.get(i64, 1); 348 + total_imported = r.get(i64, 2); 349 + r.deinit() catch {}; 350 + } 351 + } 352 + 353 + std.fmt.format(w, "{{\"running\":{},\"total\":{d},\"completed\":{d},\"in_progress\":{d},\"total_imported\":{d},\"collections\":[", .{ 354 + self.isRunning(), 355 + total, 356 + completed, 357 + total - completed, 358 + total_imported, 359 + }) catch return error.FormatError; 360 + 361 + // per-collection detail 362 + var result = self.db.query( 363 + "SELECT collection, source, cursor, imported_count, completed_at IS NOT NULL FROM backfill_progress ORDER BY collection", 364 + .{}, 365 + ) catch return error.DatabaseError; 366 + defer result.deinit(); 367 + 368 + var first = true; 369 + while (result.nextUnsafe() catch null) |dbrow| { 370 + if (!first) w.writeByte(',') catch {}; 371 + first = false; 372 + 373 + const collection = dbrow.get([]const u8, 0); 374 + const source = dbrow.get([]const u8, 1); 375 + const cursor_val = dbrow.get([]const u8, 2); 376 + const count = dbrow.get(i64, 3); 377 + const is_completed = dbrow.get(bool, 4); 378 + 379 + std.fmt.format(w, "{{\"collection\":\"{s}\",\"source\":\"{s}\",\"imported\":{d},\"completed\":{}", .{ 380 + collection, 381 + source, 382 + count, 383 + is_completed, 384 + }) catch {}; 385 + 386 + if (cursor_val.len > 0 and !is_completed) { 387 + std.fmt.format(w, ",\"cursor\":\"{s}\"", .{cursor_val}) catch {}; 388 + } 389 + 390 + w.writeByte('}') catch {}; 391 + } 392 + 393 + w.writeAll("]}") catch {}; 394 + return try list.toOwnedSlice(allocator); 395 + } 396 + };
+79
src/collection_index.zig
··· 271 271 }; 272 272 } 273 273 274 + /// scan the RBC column family for unique collection prefixes. 275 + /// returns a deduplicated list of collection names (caller owns the slice and each string). 276 + pub fn listKnownCollections(self: *CollectionIndex, allocator: Allocator) ![][]const u8 { 277 + var err_str: ?rocksdb.Data = null; 278 + 279 + var seen: std.StringHashMapUnmanaged(void) = .{}; 280 + defer seen.deinit(allocator); 281 + 282 + // full scan of RBC — keys are collection\0did 283 + var iter = self.db.iterator(self.rbc, .forward, ""); 284 + defer iter.deinit(); 285 + 286 + while (try iter.next(&err_str)) |entry| { 287 + const key_data = entry[0].data; 288 + const sep_pos = std.mem.indexOfScalar(u8, key_data, separator) orelse continue; 289 + const collection = key_data[0..sep_pos]; 290 + if (collection.len == 0) continue; 291 + 292 + if (!seen.contains(collection)) { 293 + const duped = try allocator.dupe(u8, collection); 294 + errdefer allocator.free(duped); 295 + try seen.put(allocator, duped, {}); 296 + } 297 + } 298 + 299 + // collect into a slice 300 + const result = try allocator.alloc([]const u8, seen.count()); 301 + var i: usize = 0; 302 + var key_iter = seen.keyIterator(); 303 + while (key_iter.next()) |key| { 304 + result[i] = key.*; 305 + i += 1; 306 + } 307 + 308 + return result; 309 + } 310 + 274 311 /// check if a DID has any collection entries 275 312 pub fn hasCollections(self: *CollectionIndex, did: []const u8) bool { 276 313 var err_str: ?rocksdb.Data = null; ··· 417 454 const result = try ci.listReposByCollection("app.bsky.feed.post", 100, null, &buf); 418 455 try testing.expectEqual(@as(usize, 1), result.count); 419 456 try testing.expectEqualStrings("did:plc:bob", result.getDid(0)); 457 + } 458 + 459 + test "collection index: listKnownCollections" { 460 + const testing = std.testing; 461 + const allocator = testing.allocator; 462 + 463 + var dir = std.testing.tmpDir(.{}); 464 + defer dir.cleanup(); 465 + const path = try dir.dir.realpathAlloc(allocator, "."); 466 + defer allocator.free(path); 467 + 468 + var ci = CollectionIndex.open(allocator, path) catch |err| { 469 + log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)}); 470 + return error.SkipZigTest; 471 + }; 472 + defer ci.deinit(); 473 + 474 + try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 475 + try ci.addCollection("did:plc:bob", "app.bsky.feed.post"); 476 + try ci.addCollection("did:plc:alice", "app.bsky.feed.like"); 477 + try ci.addCollection("did:plc:carol", "app.bsky.graph.follow"); 478 + 479 + const collections = try ci.listKnownCollections(allocator); 480 + defer { 481 + for (collections) |c| allocator.free(c); 482 + allocator.free(collections); 483 + } 484 + 485 + try testing.expectEqual(@as(usize, 3), collections.len); 486 + 487 + // verify all three collections are present (order not guaranteed) 488 + var found_post = false; 489 + var found_like = false; 490 + var found_follow = false; 491 + for (collections) |c| { 492 + if (std.mem.eql(u8, c, "app.bsky.feed.post")) found_post = true; 493 + if (std.mem.eql(u8, c, "app.bsky.feed.like")) found_like = true; 494 + if (std.mem.eql(u8, c, "app.bsky.graph.follow")) found_follow = true; 495 + } 496 + try testing.expect(found_post); 497 + try testing.expect(found_like); 498 + try testing.expect(found_follow); 420 499 } 421 500 422 501 test "collection index: idempotent add" {
+11
src/event_log.zig
··· 174 174 \\) 175 175 , .{}); 176 176 177 + _ = try pool.exec( 178 + \\CREATE TABLE IF NOT EXISTS backfill_progress ( 179 + \\ collection TEXT PRIMARY KEY, 180 + \\ source TEXT NOT NULL, 181 + \\ cursor TEXT NOT NULL DEFAULT '', 182 + \\ imported_count BIGINT NOT NULL DEFAULT 0, 183 + \\ completed_at TIMESTAMPTZ, 184 + \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 185 + \\) 186 + , .{}); 187 + 177 188 var self = DiskPersist{ 178 189 .allocator = allocator, 179 190 .dir_path = try allocator.dupe(u8, dir_path),
+55 -6
src/main.zig
··· 26 26 const slurper_mod = @import("slurper.zig"); 27 27 const event_log_mod = @import("event_log.zig"); 28 28 const collection_index_mod = @import("collection_index.zig"); 29 + const backfill_mod = @import("backfill.zig"); 29 30 30 31 const log = std.log.scoped(.relay); 31 32 ··· 38 39 persist: *event_log_mod.DiskPersist, 39 40 slurper: *slurper_mod.Slurper, 40 41 collection_index: *collection_index_mod.CollectionIndex, 42 + backfiller: *backfill_mod.Backfiller, 41 43 42 44 fn run(self: *HttpServer) void { 43 45 while (!shutdown_flag.load(.acquire)) { ··· 46 48 log.debug("http accept error: {s}", .{@errorName(err)}); 47 49 continue; 48 50 }; 49 - handleHttpConn(conn.stream, self.stats, self.persist, self.slurper, self.collection_index); 51 + handleHttpConn(conn.stream, self.stats, self.persist, self.slurper, self.collection_index, self.backfiller); 50 52 } 51 53 } 52 54 }; ··· 101 103 }; 102 104 defer ci.deinit(); 103 105 106 + // init backfiller (collection index backfill from source relay) 107 + var backfiller = backfill_mod.Backfiller.init(allocator, &ci, dp.db); 108 + 104 109 // init slurper (multi-host crawl manager) 105 110 var slurper = slurper_mod.Slurper.init( 106 111 allocator, ··· 133 138 .persist = &dp, 134 139 .slurper = &slurper, 135 140 .collection_index = &ci, 141 + .backfiller = &backfiller, 136 142 }; 137 143 const http_thread = try std.Thread.spawn(.{}, HttpServer.run, .{&http_srv}); 138 144 ··· 212 218 std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 213 219 } 214 220 215 - fn handleHttpConn(stream: std.net.Stream, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex) void { 221 + fn handleHttpConn(stream: std.net.Stream, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex, backfiller: *backfill_mod.Backfiller) void { 216 222 defer stream.close(); 217 223 218 224 var recv_buf: [8192]u8 = undefined; ··· 230 236 const query = if (qmark) |q| target[q + 1 ..] else ""; 231 237 232 238 if (request.head.method == .GET) { 233 - handleGet(&request, path, query, stats, persist, slurper, ci); 239 + handleGet(&request, path, query, stats, persist, slurper, ci, backfiller); 234 240 } else if (request.head.method == .POST) { 235 - handlePost(&request, path, persist, slurper); 241 + handlePost(&request, path, query, persist, slurper, backfiller); 236 242 } else { 237 243 respondText(&request, .method_not_allowed, "method not allowed"); 238 244 } 239 245 } 240 246 241 - fn handleGet(request: *http.Server.Request, path: []const u8, query: []const u8, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex) void { 247 + fn handleGet(request: *http.Server.Request, path: []const u8, query: []const u8, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex, backfiller: *backfill_mod.Backfiller) void { 242 248 if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 243 249 respondJson(request, .ok, "{\"status\":\"ok\"}"); 244 250 } else if (std.mem.eql(u8, path, "/_stats")) { ··· 264 270 handleListHosts(request, query, persist); 265 271 } else if (std.mem.eql(u8, path, "/admin/hosts")) { 266 272 handleAdminListHosts(request, persist, slurper); 273 + } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 274 + handleAdminBackfillStatus(request, backfiller); 267 275 } else if (std.mem.eql(u8, path, "/")) { 268 276 respondText(request, .ok, 269 277 \\ _ ··· 294 302 } 295 303 } 296 304 297 - fn handlePost(request: *http.Server.Request, path: []const u8, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper) void { 305 + fn handlePost(request: *http.Server.Request, path: []const u8, query: []const u8, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, backfiller: *backfill_mod.Backfiller) void { 298 306 if (std.mem.eql(u8, path, "/admin/repo/ban")) { 299 307 handleBan(request, persist); 300 308 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { ··· 303 311 handleAdminBlockHost(request, persist); 304 312 } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 305 313 handleAdminUnblockHost(request, persist); 314 + } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 315 + handleAdminBackfillTrigger(request, query, backfiller); 306 316 } else { 307 317 respondText(request, .not_found, "not found"); 308 318 } ··· 879 889 880 890 w.writeByte('}') catch return; 881 891 respondJson(request, .ok, fbs.getWritten()); 892 + } 893 + 894 + // --- backfill handlers --- 895 + 896 + fn handleAdminBackfillTrigger(request: *http.Server.Request, query: []const u8, backfiller: *backfill_mod.Backfiller) void { 897 + if (!checkAdmin(request)) return; 898 + 899 + const source = queryParam(query, "source") orelse "bsky.network"; 900 + 901 + backfiller.start(source) catch |err| { 902 + switch (err) { 903 + error.AlreadyRunning => { 904 + respondJson(request, .conflict, "{\"error\":\"backfill already in progress\"}"); 905 + }, 906 + else => { 907 + respondJson(request, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); 908 + }, 909 + } 910 + return; 911 + }; 912 + 913 + var buf: [256]u8 = undefined; 914 + const body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 915 + respondJson(request, .ok, "{\"status\":\"started\"}"); 916 + return; 917 + }; 918 + respondJson(request, .ok, body); 919 + } 920 + 921 + fn handleAdminBackfillStatus(request: *http.Server.Request, backfiller: *backfill_mod.Backfiller) void { 922 + if (!checkAdmin(request)) return; 923 + 924 + const body = backfiller.getStatus(backfiller.allocator) catch { 925 + respondJson(request, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); 926 + return; 927 + }; 928 + defer backfiller.allocator.free(body); 929 + 930 + respondJson(request, .ok, body); 882 931 } 883 932 884 933 // --- query string helpers ---