search for standard sites pub-search.waow.tech
search zig blog atproto

feat: block bridgy fed content at ingestion + purge script

PLC directory lookup in tap worker to detect brid.gy-hosted DIDs and
skip indexing their documents/publications. Results cached per worker
lifetime, fails open on errors.

Purge script queries turso for platform='other' DIDs, resolves PDS,
and batch-deletes bridgy fed content (documents, tags, FTS entries).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+301 -2
+89 -2
backend/src/ingest/tap.zig
··· 120 120 } 121 121 }; 122 122 123 + /// Cache of DID → is_bridgy_fed results from PLC directory lookups. 124 + /// Single-threaded (owned by processWorker), no sync needed. 125 + const PdsCache = std.StringHashMap(bool); 126 + 123 127 fn processWorker(queue: *ProcessQueue) void { 124 128 logfire.info("tap: process worker started", .{}); 129 + var pds_cache = PdsCache.init(queue.allocator); 130 + defer { 131 + var it = pds_cache.iterator(); 132 + while (it.next()) |entry| queue.allocator.free(entry.key_ptr.*); 133 + pds_cache.deinit(); 134 + } 125 135 while (queue.pop()) |data| { 126 136 defer queue.allocator.free(data); 127 - processMessage(queue.allocator, data) catch |err| { 137 + processMessage(queue.allocator, data, &pds_cache) catch |err| { 128 138 logfire.err("message processing error: {}", .{err}); 129 139 }; 130 140 queue.mutex.lock(); ··· 291 301 base_path: ?[]const u8 = null, 292 302 }; 293 303 294 - fn processMessage(allocator: Allocator, payload: []const u8) !void { 304 + /// Check if a DID is hosted on brid.gy (bridged Mastodon/ActivityPub/Ghost content). 305 + /// Results are cached for the lifetime of the worker thread. 306 + /// Fails open: on HTTP/parse errors, returns false (allow through). 307 + fn isBridgyFed(allocator: Allocator, did: []const u8, cache: *PdsCache) bool { 308 + if (cache.get(did)) |is_bridgy| return is_bridgy; 309 + 310 + const result = resolvePdsIsBridgy(allocator, did); 311 + // cache with duped key (cache outlives the parsed message) 312 + const key = allocator.dupe(u8, did) catch return false; 313 + cache.put(key, result) catch { 314 + allocator.free(key); 315 + return result; 316 + }; 317 + if (result) { 318 + logfire.info("tap: blocked bridgy fed DID: {s}", .{did}); 319 + } 320 + return result; 321 + } 322 + 323 + /// HTTP GET plc.directory/{did}, check if PDS serviceEndpoint contains "brid.gy". 324 + fn resolvePdsIsBridgy(allocator: Allocator, did: []const u8) bool { 325 + const http = std.http; 326 + 327 + var url_buf: [256]u8 = undefined; 328 + const url = std.fmt.bufPrint(&url_buf, "https://plc.directory/{s}", .{did}) catch return false; 329 + 330 + var client: http.Client = .{ .allocator = allocator }; 331 + defer client.deinit(); 332 + 333 + var response_body: std.Io.Writer.Allocating = .init(allocator); 334 + defer response_body.deinit(); 335 + 336 + const res = client.fetch(.{ 337 + .location = .{ .url = url }, 338 + .method = .GET, 339 + .response_writer = &response_body.writer, 340 + }) catch |err| { 341 + logfire.warn("tap: PLC lookup failed for {s}: {}", .{ did, err }); 342 + return false; 343 + }; 344 + 345 + if (res.status != .ok) { 346 + logfire.warn("tap: PLC lookup {s} returned {}", .{ did, res.status }); 347 + return false; 348 + } 349 + 350 + const body = response_body.toOwnedSlice() catch return false; 351 + defer allocator.free(body); 352 + 353 + const parsed = json.parseFromSlice(json.Value, allocator, body, .{}) catch return false; 354 + defer parsed.deinit(); 355 + 356 + // look for service[].serviceEndpoint where type == "AtprotoPersonalDataServer" 357 + const services = parsed.value.object.get("service") orelse return false; 358 + if (services != .array) return false; 359 + 360 + for (services.array.items) |svc| { 361 + if (svc != .object) continue; 362 + const svc_type = svc.object.get("type") orelse continue; 363 + if (svc_type != .string) continue; 364 + if (!mem.eql(u8, svc_type.string, "AtprotoPersonalDataServer")) continue; 365 + const endpoint = svc.object.get("serviceEndpoint") orelse continue; 366 + if (endpoint != .string) continue; 367 + if (mem.indexOf(u8, endpoint.string, "brid.gy") != null) return true; 368 + } 369 + 370 + return false; 371 + } 372 + 373 + fn processMessage(allocator: Allocator, payload: []const u8, pds_cache: *PdsCache) !void { 295 374 const parsed = json.parseFromSlice(json.Value, allocator, payload, .{}) catch { 296 375 logfire.err("tap: JSON parse failed, first 100 bytes: {s}", .{payload[0..@min(payload.len, 100)]}); 297 376 return; ··· 317 396 logfire.span("tap.dropped", .{ .reason = "invalid_did", .collection = rec.collection }).end(); 318 397 return; 319 398 }; 399 + 400 + // skip bridgy fed content (bridged Mastodon/ActivityPub/Ghost posts) 401 + if (isDocumentCollection(rec.collection) or isPublicationCollection(rec.collection)) { 402 + if (isBridgyFed(allocator, did.raw, pds_cache)) { 403 + logfire.span("tap.dropped", .{ .reason = "bridgy_fed", .collection = rec.collection }).end(); 404 + return; 405 + } 406 + } 320 407 321 408 // build AT-URI string (no allocation - uses stack buffer) 322 409 var uri_buf: [256]u8 = undefined;
+212
scripts/purge-bridgyfed
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """ 7 + Purge bridgy fed content from turso. 8 + 9 + Finds all DIDs with platform='other', resolves their PDS via plc.directory, 10 + and deletes documents/tags/FTS entries for any DID hosted on brid.gy. 11 + 12 + Usage: 13 + ./scripts/purge-bridgyfed # dry run (default) 14 + ./scripts/purge-bridgyfed --apply # actually delete 15 + """ 16 + 17 + import argparse 18 + import os 19 + import sys 20 + 21 + import httpx 22 + from pydantic_settings import BaseSettings, SettingsConfigDict 23 + 24 + 25 + class Settings(BaseSettings): 26 + model_config = SettingsConfigDict( 27 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 28 + ) 29 + 30 + turso_url: str 31 + turso_token: str 32 + 33 + @property 34 + def turso_host(self) -> str: 35 + url = self.turso_url 36 + if url.startswith("libsql://"): 37 + url = url[len("libsql://") :] 38 + return url 39 + 40 + 41 + def turso_query(settings: Settings, sql: str, args: list | None = None) -> list[dict]: 42 + """Execute a query against Turso and return rows.""" 43 + stmt: dict = {"sql": sql} 44 + if args: 45 + stmt["args"] = [] 46 + for a in args: 47 + if a is None: 48 + stmt["args"].append({"type": "null"}) 49 + else: 50 + stmt["args"].append({"type": "text", "value": str(a)}) 51 + 52 + response = httpx.post( 53 + f"https://{settings.turso_host}/v2/pipeline", 54 + headers={ 55 + "Authorization": f"Bearer {settings.turso_token}", 56 + "Content-Type": "application/json", 57 + }, 58 + json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 59 + timeout=60, 60 + ) 61 + if response.status_code != 200: 62 + print(f"turso error: {response.text}", file=sys.stderr) 63 + response.raise_for_status() 64 + 65 + data = response.json() 66 + result = data["results"][0]["response"]["result"] 67 + cols = [c["name"] for c in result["cols"]] 68 + rows = [] 69 + for row in result["rows"]: 70 + rows.append({col: cell["value"] for col, cell in zip(cols, row)}) 71 + return rows 72 + 73 + 74 + def turso_exec(settings: Settings, sql: str, args: list | None = None) -> int: 75 + """Execute a statement against Turso, return affected rows.""" 76 + stmt: dict = {"sql": sql} 77 + if args: 78 + stmt["args"] = [] 79 + for a in args: 80 + if a is None: 81 + stmt["args"].append({"type": "null"}) 82 + else: 83 + stmt["args"].append({"type": "text", "value": str(a)}) 84 + 85 + response = httpx.post( 86 + f"https://{settings.turso_host}/v2/pipeline", 87 + headers={ 88 + "Authorization": f"Bearer {settings.turso_token}", 89 + "Content-Type": "application/json", 90 + }, 91 + json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 92 + timeout=60, 93 + ) 94 + if response.status_code != 200: 95 + print(f"turso error: {response.text}", file=sys.stderr) 96 + response.raise_for_status() 97 + 98 + data = response.json() 99 + return data["results"][0]["response"]["result"]["affected_row_count"] 100 + 101 + 102 + def get_pds_endpoint(did: str) -> str | None: 103 + """Get PDS endpoint from PLC directory. Returns None on error.""" 104 + try: 105 + resp = httpx.get(f"https://plc.directory/{did}", timeout=30) 106 + resp.raise_for_status() 107 + data = resp.json() 108 + for service in data.get("service", []): 109 + if service.get("type") == "AtprotoPersonalDataServer": 110 + return service["serviceEndpoint"] 111 + except Exception as e: 112 + print(f" plc lookup failed for {did}: {e}", file=sys.stderr) 113 + return None 114 + 115 + 116 + def main(): 117 + parser = argparse.ArgumentParser(description="Purge bridgy fed content from turso") 118 + parser.add_argument( 119 + "--apply", action="store_true", help="Actually delete (default is dry run)" 120 + ) 121 + args = parser.parse_args() 122 + 123 + try: 124 + settings = Settings() # type: ignore 125 + except Exception as e: 126 + print(f"error loading settings: {e}", file=sys.stderr) 127 + print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr) 128 + sys.exit(1) 129 + 130 + if not args.apply: 131 + print("=== DRY RUN (pass --apply to delete) ===\n") 132 + 133 + # step 1: get distinct DIDs with platform='other' 134 + print("querying distinct DIDs with platform='other'...") 135 + rows = turso_query( 136 + settings, "SELECT DISTINCT did FROM documents WHERE platform = 'other'" 137 + ) 138 + dids = [r["did"] for r in rows] 139 + print(f" found {len(dids)} distinct DIDs\n") 140 + 141 + # step 2: resolve each DID's PDS, identify bridgy fed ones 142 + bridgy_dids = [] 143 + non_bridgy_dids = [] 144 + for i, did in enumerate(dids): 145 + pds = get_pds_endpoint(did) 146 + if pds and "brid.gy" in pds: 147 + bridgy_dids.append(did) 148 + if len(bridgy_dids) <= 5: 149 + print(f" [{i+1}/{len(dids)}] {did} -> {pds} (BRIDGY)") 150 + else: 151 + non_bridgy_dids.append(did) 152 + if len(non_bridgy_dids) <= 5: 153 + print(f" [{i+1}/{len(dids)}] {did} -> {pds or '???'}") 154 + 155 + if (i + 1) % 50 == 0: 156 + print(f" ... resolved {i+1}/{len(dids)} DIDs") 157 + 158 + print(f"\n bridgy fed: {len(bridgy_dids)}") 159 + print(f" non-bridgy: {len(non_bridgy_dids)}\n") 160 + 161 + if not bridgy_dids: 162 + print("nothing to purge!") 163 + return 164 + 165 + # step 3: count docs to be deleted 166 + total_deleted = 0 167 + for did in bridgy_dids: 168 + count_rows = turso_query( 169 + settings, 170 + "SELECT COUNT(*) as cnt FROM documents WHERE did = ?", 171 + [did], 172 + ) 173 + count = int(count_rows[0]["cnt"]) if count_rows else 0 174 + total_deleted += count 175 + 176 + print(f"total documents to delete: {total_deleted}") 177 + 178 + if not args.apply: 179 + print("\npass --apply to actually delete these documents") 180 + return 181 + 182 + # step 4: delete in batches per DID 183 + print("\ndeleting...") 184 + deleted_docs = 0 185 + for i, did in enumerate(bridgy_dids): 186 + tags = turso_exec( 187 + settings, 188 + "DELETE FROM document_tags WHERE document_uri IN (SELECT uri FROM documents WHERE did = ?)", 189 + [did], 190 + ) 191 + fts = turso_exec( 192 + settings, 193 + "DELETE FROM documents_fts WHERE uri IN (SELECT uri FROM documents WHERE did = ?)", 194 + [did], 195 + ) 196 + docs = turso_exec( 197 + settings, 198 + "DELETE FROM documents WHERE did = ?", 199 + [did], 200 + ) 201 + deleted_docs += docs 202 + 203 + if (i + 1) % 10 == 0 or (i + 1) == len(bridgy_dids): 204 + print( 205 + f" [{i+1}/{len(bridgy_dids)}] deleted {deleted_docs} docs so far" 206 + ) 207 + 208 + print(f"\ndone! deleted {deleted_docs} documents from {len(bridgy_dids)} bridgy fed DIDs") 209 + 210 + 211 + if __name__ == "__main__": 212 + main()