search for standard sites pub-search.waow.tech
search zig blog atproto

feat: add reconciler to detect and clean stale documents

background worker verifies documents still exist at their source PDS
via com.atproto.repo.getRecord. catches deletions missed while the tap
was down (firehose delete events are ephemeral and never replayed).

also fixes the forward path: firehose deletes now clean turbopuffer
vectors in addition to turso records.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+390
+3
backend/src/db/schema.zig
··· 235 235 // DiskANN vector index (documents_embedding_idx) is managed via scripts/rebuild-documents-table 236 236 // DO NOT add CREATE INDEX here — it hangs on startup when the index already exists 237 237 238 + // verified_at: tracks when reconciler last verified document exists at source PDS 239 + client.exec("ALTER TABLE documents ADD COLUMN verified_at TEXT", &.{}) catch {}; 240 + 238 241 // indexed_at: tracks when a document was inserted/updated in Turso 239 242 // used by incremental sync (created_at is publication date, not insertion time, 240 243 // so resynced documents with old created_at were missed by incremental sync)
+4
backend/src/ingest/tap.zig
··· 8 8 const logfire = @import("logfire"); 9 9 const indexer = @import("indexer.zig"); 10 10 const extractor = @import("extractor.zig"); 11 + const tpuf = @import("../tpuf.zig"); 11 12 12 13 // leaflet-specific collections 13 14 const LEAFLET_DOCUMENT = "pub.leaflet.document"; ··· 444 445 } else if (rec.isDelete()) { 445 446 if (isDocumentCollection(rec.collection)) { 446 447 indexer.deleteDocument(uri); 448 + // also clean up turbopuffer vector (deleteDocument only handles turso) 449 + const hashed = tpuf.hashId(uri); 450 + tpuf.delete(allocator, &.{&hashed}) catch {}; 447 451 } else if (isPublicationCollection(rec.collection)) { 448 452 indexer.deletePublication(uri); 449 453 }
+4
backend/src/main.zig
··· 8 8 const metrics = @import("metrics.zig"); 9 9 const server = @import("server.zig"); 10 10 const ingest = @import("ingest.zig"); 11 + const reconcile = @import("reconcile.zig"); 11 12 12 13 const MAX_HTTP_WORKERS = 16; 13 14 const SOCKET_TIMEOUT_SECS = 5; ··· 85 86 86 87 // init vector store (reads TURBOPUFFER_API_KEY from env) 87 88 tpuf.init(); 89 + 90 + // start reconciler (verifies documents still exist at source PDS) 91 + reconcile.start(allocator); 88 92 89 93 // start embedder (voyage-4-lite, 1024 dims, 1 worker) 90 94 ingest.embedder.start(allocator);
+379
backend/src/reconcile.zig
··· 1 + //! Background worker for reconciling stale documents. 2 + //! 3 + //! Periodically verifies documents still exist at their source PDS. 4 + //! Documents that return 400/404 from com.atproto.repo.getRecord are 5 + //! deleted from turso and turbopuffer. 6 + //! 7 + //! This catches deletions missed while the tap was down — the firehose 8 + //! is ephemeral and delete events are never replayed. 9 + 10 + const std = @import("std"); 11 + const http = std.http; 12 + const json = std.json; 13 + const mem = std.mem; 14 + const posix = std.posix; 15 + const Allocator = mem.Allocator; 16 + const logfire = @import("logfire"); 17 + const db = @import("db/mod.zig"); 18 + const tpuf = @import("tpuf.zig"); 19 + const indexer = @import("ingest/indexer.zig"); 20 + 21 + // config (env vars with defaults) 22 + fn getIntervalSecs() u64 { 23 + const val = posix.getenv("RECONCILE_INTERVAL_SECS") orelse "1800"; 24 + return std.fmt.parseInt(u64, val, 10) catch 1800; 25 + } 26 + 27 + fn getBatchSize() usize { 28 + const val = posix.getenv("RECONCILE_BATCH_SIZE") orelse "50"; 29 + return std.fmt.parseInt(usize, val, 10) catch 50; 30 + } 31 + 32 + fn getReverifyDays() u64 { 33 + const val = posix.getenv("RECONCILE_REVERIFY_DAYS") orelse "7"; 34 + return std.fmt.parseInt(u64, val, 10) catch 7; 35 + } 36 + 37 + fn isEnabled() bool { 38 + const val = posix.getenv("RECONCILE_ENABLED") orelse "true"; 39 + return !mem.eql(u8, val, "false") and !mem.eql(u8, val, "0"); 40 + } 41 + 42 + /// AT-URI components parsed from "at://{did}/{collection}/{rkey}" 43 + const UriParts = struct { 44 + did: []const u8, 45 + collection: []const u8, 46 + rkey: []const u8, 47 + }; 48 + 49 + fn parseAtUri(uri: []const u8) ?UriParts { 50 + const prefix = "at://"; 51 + if (!mem.startsWith(u8, uri, prefix)) return null; 52 + const rest = uri[prefix.len..]; 53 + 54 + const first_slash = mem.indexOf(u8, rest, "/") orelse return null; 55 + const did = rest[0..first_slash]; 56 + const after_did = rest[first_slash + 1 ..]; 57 + 58 + const second_slash = mem.indexOf(u8, after_did, "/") orelse return null; 59 + const collection = after_did[0..second_slash]; 60 + const rkey = after_did[second_slash + 1 ..]; 61 + 62 + if (did.len == 0 or collection.len == 0 or rkey.len == 0) return null; 63 + return .{ .did = did, .collection = collection, .rkey = rkey }; 64 + } 65 + 66 + /// Start the reconciler background worker. 67 + pub fn start(allocator: Allocator) void { 68 + if (!isEnabled()) { 69 + logfire.info("reconcile: disabled via RECONCILE_ENABLED", .{}); 70 + return; 71 + } 72 + 73 + const thread = std.Thread.spawn(.{}, worker, .{allocator}) catch |err| { 74 + logfire.err("reconcile: failed to start thread: {}", .{err}); 75 + return; 76 + }; 77 + thread.detach(); 78 + logfire.info("reconcile: background worker started", .{}); 79 + } 80 + 81 + fn worker(allocator: Allocator) void { 82 + // wait for db to be ready 83 + std.Thread.sleep(10 * std.time.ns_per_s); 84 + 85 + // PDS cache: DID → PDS endpoint URL (persists across cycles) 86 + var pds_cache = std.StringHashMap([]const u8).init(allocator); 87 + defer { 88 + var it = pds_cache.iterator(); 89 + while (it.next()) |entry| { 90 + allocator.free(entry.key_ptr.*); 91 + allocator.free(entry.value_ptr.*); 92 + } 93 + pds_cache.deinit(); 94 + } 95 + 96 + var consecutive_errors: u32 = 0; 97 + 98 + while (true) { 99 + const result = runCycle(allocator, &pds_cache); 100 + if (result) |counts| { 101 + consecutive_errors = 0; 102 + if (counts.verified > 0 or counts.deleted > 0) { 103 + logfire.info("reconcile: verified {d} documents, deleted {d}", .{ counts.verified, counts.deleted }); 104 + } 105 + } else |err| { 106 + consecutive_errors += 1; 107 + logfire.warn("reconcile: cycle error: {}, consecutive: {d}", .{ err, consecutive_errors }); 108 + } 109 + 110 + const interval = getIntervalSecs(); 111 + const backoff: u64 = if (consecutive_errors > 0) 112 + @min(interval * consecutive_errors, 3600) 113 + else 114 + interval; 115 + std.Thread.sleep(backoff * std.time.ns_per_s); 116 + } 117 + } 118 + 119 + const CycleCounts = struct { 120 + verified: usize, 121 + deleted: usize, 122 + }; 123 + 124 + fn runCycle(allocator: Allocator, pds_cache: *std.StringHashMap([]const u8)) !CycleCounts { 125 + const span = logfire.span("reconcile.cycle", .{}); 126 + defer span.end(); 127 + 128 + const client = db.getClient() orelse return error.NoClient; 129 + const batch_size = getBatchSize(); 130 + const reverify_days = getReverifyDays(); 131 + 132 + // fetch docs ordered by verified_at (NULLs first = never verified = highest priority) 133 + // re-verify docs older than RECONCILE_REVERIFY_DAYS 134 + // compute cutoff timestamp in Zig (avoids strftime with parameterized modifiers) 135 + var batch_str: [10]u8 = undefined; 136 + const batch_str_val = std.fmt.bufPrint(&batch_str, "{d}", .{batch_size}) catch "50"; 137 + 138 + const cutoff_ts = formatTimestamp(std.time.timestamp() - @as(i64, @intCast(reverify_days * 86400))); 139 + const cutoff = cutoff_ts.slice(); 140 + 141 + var result = try client.query( 142 + \\SELECT uri, did FROM documents 143 + \\WHERE verified_at IS NULL 144 + \\ OR verified_at < ? 145 + \\ORDER BY verified_at ASC NULLS FIRST 146 + \\LIMIT ? 147 + , 148 + &.{ cutoff, batch_str_val }, 149 + ); 150 + defer result.deinit(); 151 + 152 + if (result.rows.len == 0) return .{ .verified = 0, .deleted = 0 }; 153 + 154 + // collect URIs and DIDs from the result (copy since result owns the memory) 155 + const DocInfo = struct { uri: []const u8, did: []const u8 }; 156 + var docs: std.ArrayList(DocInfo) = .empty; 157 + defer { 158 + for (docs.items) |doc| { 159 + allocator.free(doc.uri); 160 + allocator.free(doc.did); 161 + } 162 + docs.deinit(allocator); 163 + } 164 + 165 + for (result.rows) |row| { 166 + const uri = allocator.dupe(u8, row.text(0)) catch continue; 167 + const did = allocator.dupe(u8, row.text(1)) catch { 168 + allocator.free(uri); 169 + continue; 170 + }; 171 + docs.append(allocator, .{ .uri = uri, .did = did }) catch { 172 + allocator.free(uri); 173 + allocator.free(did); 174 + continue; 175 + }; 176 + } 177 + 178 + var verified: usize = 0; 179 + var deleted: usize = 0; 180 + 181 + // collect hashed IDs of stale docs for batch tpuf delete 182 + var stale_ids: std.ArrayList([32]u8) = .empty; 183 + defer stale_ids.deinit(allocator); 184 + 185 + for (docs.items) |doc| { 186 + const parts = parseAtUri(doc.uri) orelse { 187 + logfire.warn("reconcile: invalid AT-URI: {s}", .{doc.uri}); 188 + continue; 189 + }; 190 + 191 + // resolve PDS for this DID 192 + const pds = resolvePds(allocator, parts.did, pds_cache) orelse { 193 + // PDS unknown or DID deactivated — skip, don't delete 194 + continue; 195 + }; 196 + 197 + // check if record still exists at source 198 + const status = checkRecord(allocator, pds, parts.did, parts.collection, parts.rkey); 199 + 200 + switch (status) { 201 + .exists => { 202 + // update verified_at 203 + updateVerifiedAt(client, doc.uri); 204 + verified += 1; 205 + }, 206 + .deleted => { 207 + // record gone — delete from turso + queue for tpuf batch delete 208 + indexer.deleteDocument(doc.uri); 209 + const hashed = tpuf.hashId(doc.uri); 210 + stale_ids.append(allocator, hashed) catch {}; 211 + deleted += 1; 212 + logfire.info("reconcile: deleted stale document: {s}", .{doc.uri}); 213 + }, 214 + .error_skip => { 215 + // 5xx / timeout / network error — don't update verified_at, retry next cycle 216 + }, 217 + } 218 + 219 + // rate limit: 200ms between PDS requests 220 + std.Thread.sleep(200 * std.time.ns_per_ms); 221 + } 222 + 223 + // batch delete from tpuf 224 + if (stale_ids.items.len > 0 and tpuf.isEnabled()) { 225 + // build slice of pointers to the hashed IDs 226 + var id_ptrs = allocator.alloc([]const u8, stale_ids.items.len) catch { 227 + logfire.warn("reconcile: alloc failed for tpuf delete batch", .{}); 228 + return .{ .verified = verified, .deleted = deleted }; 229 + }; 230 + defer allocator.free(id_ptrs); 231 + 232 + for (stale_ids.items, 0..) |*id, i| { 233 + id_ptrs[i] = id; 234 + } 235 + 236 + tpuf.delete(allocator, id_ptrs) catch |err| { 237 + logfire.warn("reconcile: tpuf batch delete failed: {}", .{err}); 238 + }; 239 + } 240 + 241 + return .{ .verified = verified, .deleted = deleted }; 242 + } 243 + 244 + fn updateVerifiedAt(client: *db.Client, uri: []const u8) void { 245 + const now = formatTimestamp(std.time.timestamp()); 246 + client.exec( 247 + "UPDATE documents SET verified_at = ? WHERE uri = ?", 248 + &.{ now.slice(), uri }, 249 + ) catch |err| { 250 + logfire.warn("reconcile: failed to update verified_at for {s}: {}", .{ uri, err }); 251 + }; 252 + } 253 + 254 + /// Format a unix timestamp as ISO 8601 string (same approach as embedder.zig). 255 + const TimestampBuf = struct { 256 + buf: [20]u8, 257 + len: usize, 258 + 259 + fn slice(self: *const TimestampBuf) []const u8 { 260 + return self.buf[0..self.len]; 261 + } 262 + }; 263 + 264 + fn formatTimestamp(ts: i64) TimestampBuf { 265 + const epoch_secs: u64 = @intCast(@max(ts, 0)); 266 + const epoch = std.time.epoch.EpochSeconds{ .secs = epoch_secs }; 267 + const day = epoch.getDaySeconds(); 268 + const yd = epoch.getEpochDay().calculateYearDay(); 269 + const md = yd.calculateMonthDay(); 270 + var result: TimestampBuf = undefined; 271 + const formatted = std.fmt.bufPrint(&result.buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}", .{ 272 + yd.year, md.month.numeric(), @as(u32, md.day_index) + 1, 273 + day.getHoursIntoDay(), day.getMinutesIntoHour(), day.getSecondsIntoMinute(), 274 + }) catch { 275 + // fallback: epoch (will cause re-verify, which is safe) 276 + const fallback = "1970-01-01T00:00:00"; 277 + @memcpy(result.buf[0..fallback.len], fallback); 278 + result.len = fallback.len; 279 + return result; 280 + }; 281 + result.len = formatted.len; 282 + return result; 283 + } 284 + 285 + const RecordStatus = enum { exists, deleted, error_skip }; 286 + 287 + fn checkRecord(allocator: Allocator, pds: []const u8, did: []const u8, collection: []const u8, rkey: []const u8) RecordStatus { 288 + // build URL: {pds}/xrpc/com.atproto.repo.getRecord?repo={did}&collection={collection}&rkey={rkey} 289 + var url_buf: [512]u8 = undefined; 290 + const url = std.fmt.bufPrint(&url_buf, "{s}/xrpc/com.atproto.repo.getRecord?repo={s}&collection={s}&rkey={s}", .{ pds, did, collection, rkey }) catch { 291 + return .error_skip; 292 + }; 293 + 294 + var http_client: http.Client = .{ .allocator = allocator }; 295 + defer http_client.deinit(); 296 + 297 + var response_body: std.Io.Writer.Allocating = .init(allocator); 298 + defer response_body.deinit(); 299 + 300 + const res = http_client.fetch(.{ 301 + .location = .{ .url = url }, 302 + .method = .GET, 303 + .response_writer = &response_body.writer, 304 + }) catch { 305 + return .error_skip; 306 + }; 307 + 308 + const status_int: u10 = @intFromEnum(res.status); 309 + if (status_int >= 200 and status_int < 300) return .exists; 310 + if (status_int == 400 or status_int == 404) return .deleted; 311 + // 5xx, rate limit, or unexpected status — skip 312 + return .error_skip; 313 + } 314 + 315 + /// Resolve a DID to its PDS endpoint URL via plc.directory. 316 + /// Returns null if DID is deactivated or PDS cannot be determined. 317 + /// Caches results in pds_cache (persists across cycles). 318 + fn resolvePds(allocator: Allocator, did: []const u8, cache: *std.StringHashMap([]const u8)) ?[]const u8 { 319 + if (cache.get(did)) |pds| return pds; 320 + 321 + const pds = resolvePdsHttp(allocator, did) orelse return null; 322 + 323 + // cache with duped key + value 324 + const key = allocator.dupe(u8, did) catch return pds; 325 + cache.put(key, pds) catch { 326 + allocator.free(key); 327 + }; 328 + 329 + return pds; 330 + } 331 + 332 + fn resolvePdsHttp(allocator: Allocator, did: []const u8) ?[]const u8 { 333 + var url_buf: [256]u8 = undefined; 334 + const url = std.fmt.bufPrint(&url_buf, "https://plc.directory/{s}", .{did}) catch return null; 335 + 336 + var http_client: http.Client = .{ .allocator = allocator }; 337 + defer http_client.deinit(); 338 + 339 + var response_body: std.Io.Writer.Allocating = .init(allocator); 340 + defer response_body.deinit(); 341 + 342 + const res = http_client.fetch(.{ 343 + .location = .{ .url = url }, 344 + .method = .GET, 345 + .response_writer = &response_body.writer, 346 + }) catch |err| { 347 + logfire.warn("reconcile: PLC lookup failed for {s}: {}", .{ did, err }); 348 + return null; 349 + }; 350 + 351 + if (res.status != .ok) { 352 + logfire.warn("reconcile: PLC lookup {s} returned {}", .{ did, res.status }); 353 + return null; 354 + } 355 + 356 + const body = response_body.toOwnedSlice() catch return null; 357 + defer allocator.free(body); 358 + 359 + const parsed = json.parseFromSlice(json.Value, allocator, body, .{}) catch return null; 360 + defer parsed.deinit(); 361 + 362 + // look for service[].serviceEndpoint where type == "AtprotoPersonalDataServer" 363 + const services = parsed.value.object.get("service") orelse return null; 364 + if (services != .array) return null; 365 + 366 + for (services.array.items) |svc| { 367 + if (svc != .object) continue; 368 + const svc_type = svc.object.get("type") orelse continue; 369 + if (svc_type != .string) continue; 370 + if (!mem.eql(u8, svc_type.string, "AtprotoPersonalDataServer")) continue; 371 + const endpoint = svc.object.get("serviceEndpoint") orelse continue; 372 + if (endpoint != .string) continue; 373 + 374 + // dupe the endpoint — it's owned by the parsed json which we're about to free 375 + return allocator.dupe(u8, endpoint.string) catch null; 376 + } 377 + 378 + return null; 379 + }