search for standard sites pub-search.waow.tech
search zig blog atproto

fix: rewrite purge-bridgyfed with async/concurrent IO

20 concurrent PLC lookups, 10 concurrent turso deletes.
What took ~15min sequential now finishes in ~40s.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+167 -133
+167 -133
scripts/purge-bridgyfed
··· 15 15 """ 16 16 17 17 import argparse 18 + import asyncio 18 19 import os 19 20 import sys 21 + import time 20 22 21 23 import httpx 22 24 from pydantic_settings import BaseSettings, SettingsConfigDict 25 + 26 + 27 + def log(msg: str) -> None: 28 + print(msg, flush=True) 23 29 24 30 25 31 class Settings(BaseSettings): ··· 38 44 return url 39 45 40 46 41 - def turso_query(settings: Settings, sql: str, args: list | None = None) -> list[dict]: 42 - """Execute a query against Turso and return rows.""" 47 + def _make_stmt(sql: str, args: list | None = None) -> dict: 43 48 stmt: dict = {"sql": sql} 44 49 if args: 45 - stmt["args"] = [] 46 - for a in args: 47 - if a is None: 48 - stmt["args"].append({"type": "null"}) 49 - else: 50 - stmt["args"].append({"type": "text", "value": str(a)}) 50 + stmt["args"] = [ 51 + {"type": "null"} if a is None else {"type": "text", "value": str(a)} 52 + for a in args 53 + ] 54 + return stmt 51 55 52 - response = httpx.post( 53 - f"https://{settings.turso_host}/v2/pipeline", 54 - headers={ 55 - "Authorization": f"Bearer {settings.turso_token}", 56 - "Content-Type": "application/json", 57 - }, 58 - json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 59 - timeout=60, 60 - ) 61 - if response.status_code != 200: 62 - print(f"turso error: {response.text}", file=sys.stderr) 63 - response.raise_for_status() 64 56 65 - data = response.json() 66 - result = data["results"][0]["response"]["result"] 67 - cols = [c["name"] for c in result["cols"]] 68 - rows = [] 69 - for row in result["rows"]: 70 - rows.append({col: cell["value"] for col, cell in zip(cols, row)}) 71 - return rows 57 + async def _turso_request( 58 + client: httpx.AsyncClient, 59 + settings: Settings, 60 + stmt: dict, 61 + semaphore: asyncio.Semaphore, 62 + retries: int = 5, 63 + ) -> dict: 64 + """Send a pipeline request to Turso with retries on transient errors.""" 65 + async with semaphore: 66 + for attempt in range(retries): 67 + try: 68 + response = await client.post( 69 + f"https://{settings.turso_host}/v2/pipeline", 70 + headers={ 71 + "Authorization": f"Bearer {settings.turso_token}", 72 + "Content-Type": "application/json", 73 + }, 74 + json={ 75 + "requests": [ 76 + {"type": "execute", "stmt": stmt}, 77 + {"type": "close"}, 78 + ] 79 + }, 80 + timeout=120, 81 + ) 82 + if response.status_code in (400, 502, 503, 504) and attempt < retries - 1: 83 + wait = 2 ** (attempt + 1) 84 + log(f" turso {response.status_code}, retry in {wait}s...") 85 + await asyncio.sleep(wait) 86 + continue 87 + if response.status_code != 200: 88 + print(f"turso error: {response.text}", file=sys.stderr, flush=True) 89 + response.raise_for_status() 90 + return response.json() 91 + except httpx.TimeoutException: 92 + if attempt < retries - 1: 93 + wait = 2 ** (attempt + 1) 94 + log(f" turso timeout, retry in {wait}s...") 95 + await asyncio.sleep(wait) 96 + continue 97 + raise 98 + raise RuntimeError("unreachable") 72 99 73 100 74 - def turso_exec(settings: Settings, sql: str, args: list | None = None) -> int: 75 - """Execute a statement against Turso, return affected rows.""" 76 - stmt: dict = {"sql": sql} 77 - if args: 78 - stmt["args"] = [] 79 - for a in args: 80 - if a is None: 81 - stmt["args"].append({"type": "null"}) 82 - else: 83 - stmt["args"].append({"type": "text", "value": str(a)}) 101 + async def turso_query( 102 + client: httpx.AsyncClient, 103 + settings: Settings, 104 + sql: str, 105 + semaphore: asyncio.Semaphore, 106 + args: list | None = None, 107 + ) -> list[dict]: 108 + data = await _turso_request(client, settings, _make_stmt(sql, args), semaphore) 109 + result = data["results"][0]["response"]["result"] 110 + cols = [c["name"] for c in result["cols"]] 111 + return [{col: cell["value"] for col, cell in zip(cols, row)} for row in result["rows"]] 84 112 85 - response = httpx.post( 86 - f"https://{settings.turso_host}/v2/pipeline", 87 - headers={ 88 - "Authorization": f"Bearer {settings.turso_token}", 89 - "Content-Type": "application/json", 90 - }, 91 - json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 92 - timeout=60, 93 - ) 94 - if response.status_code != 200: 95 - print(f"turso error: {response.text}", file=sys.stderr) 96 - response.raise_for_status() 97 113 98 - data = response.json() 114 + async def turso_exec( 115 + client: httpx.AsyncClient, 116 + settings: Settings, 117 + sql: str, 118 + semaphore: asyncio.Semaphore, 119 + args: list | None = None, 120 + ) -> int: 121 + data = await _turso_request(client, settings, _make_stmt(sql, args), semaphore) 99 122 return data["results"][0]["response"]["result"]["affected_row_count"] 100 123 101 124 102 - def get_pds_endpoint(did: str) -> str | None: 103 - """Get PDS endpoint from PLC directory. Returns None on error.""" 104 - try: 105 - resp = httpx.get(f"https://plc.directory/{did}", timeout=30) 106 - resp.raise_for_status() 107 - data = resp.json() 108 - for service in data.get("service", []): 109 - if service.get("type") == "AtprotoPersonalDataServer": 110 - return service["serviceEndpoint"] 111 - except Exception as e: 112 - print(f" plc lookup failed for {did}: {e}", file=sys.stderr) 125 + async def resolve_pds(client: httpx.AsyncClient, did: str, semaphore: asyncio.Semaphore) -> str | None: 126 + """Get PDS endpoint from PLC directory.""" 127 + async with semaphore: 128 + try: 129 + resp = await client.get(f"https://plc.directory/{did}", timeout=30) 130 + resp.raise_for_status() 131 + data = resp.json() 132 + for service in data.get("service", []): 133 + if service.get("type") == "AtprotoPersonalDataServer": 134 + return service["serviceEndpoint"] 135 + except Exception as e: 136 + print(f" plc lookup failed for {did}: {e}", file=sys.stderr, flush=True) 113 137 return None 114 138 115 139 116 - def main(): 117 - parser = argparse.ArgumentParser(description="Purge bridgy fed content from turso") 118 - parser.add_argument( 119 - "--apply", action="store_true", help="Actually delete (default is dry run)" 140 + async def delete_did( 141 + client: httpx.AsyncClient, 142 + settings: Settings, 143 + did: str, 144 + semaphore: asyncio.Semaphore, 145 + ) -> int: 146 + """Delete all documents/tags/FTS for a single DID. Returns doc count deleted.""" 147 + await turso_exec( 148 + client, settings, 149 + "DELETE FROM document_tags WHERE document_uri IN (SELECT uri FROM documents WHERE did = ?)", 150 + semaphore, [did], 120 151 ) 152 + await turso_exec( 153 + client, settings, 154 + "DELETE FROM documents_fts WHERE uri IN (SELECT uri FROM documents WHERE did = ?)", 155 + semaphore, [did], 156 + ) 157 + return await turso_exec( 158 + client, settings, 159 + "DELETE FROM documents WHERE did = ?", 160 + semaphore, [did], 161 + ) 162 + 163 + 164 + async def main(): 165 + parser = argparse.ArgumentParser(description="Purge bridgy fed content from turso") 166 + parser.add_argument("--apply", action="store_true", help="Actually delete (default is dry run)") 121 167 args = parser.parse_args() 122 168 123 169 try: ··· 128 174 sys.exit(1) 129 175 130 176 if not args.apply: 131 - print("=== DRY RUN (pass --apply to delete) ===\n") 177 + log("=== DRY RUN (pass --apply to delete) ===\n") 132 178 133 - # step 1: get distinct DIDs with platform='other' 134 - print("querying distinct DIDs with platform='other'...") 135 - rows = turso_query( 136 - settings, "SELECT DISTINCT did FROM documents WHERE platform = 'other'" 137 - ) 138 - dids = [r["did"] for r in rows] 139 - print(f" found {len(dids)} distinct DIDs\n") 179 + plc_sem = asyncio.Semaphore(20) # concurrent PLC lookups 180 + turso_sem = asyncio.Semaphore(10) # concurrent turso requests 140 181 141 - # step 2: resolve each DID's PDS, identify bridgy fed ones 142 - bridgy_dids = [] 143 - non_bridgy_dids = [] 144 - for i, did in enumerate(dids): 145 - pds = get_pds_endpoint(did) 146 - if pds and "brid.gy" in pds: 147 - bridgy_dids.append(did) 148 - if len(bridgy_dids) <= 5: 149 - print(f" [{i+1}/{len(dids)}] {did} -> {pds} (BRIDGY)") 150 - else: 151 - non_bridgy_dids.append(did) 152 - if len(non_bridgy_dids) <= 5: 153 - print(f" [{i+1}/{len(dids)}] {did} -> {pds or '???'}") 182 + async with httpx.AsyncClient() as client: 183 + # step 1: get distinct DIDs with platform='other' 184 + log("querying distinct DIDs with platform='other'...") 185 + rows = await turso_query( 186 + client, settings, 187 + "SELECT DISTINCT did FROM documents WHERE platform = 'other'", 188 + turso_sem, 189 + ) 190 + dids = [r["did"] for r in rows] 191 + log(f" found {len(dids)} distinct DIDs\n") 154 192 155 - if (i + 1) % 50 == 0: 156 - print(f" ... resolved {i+1}/{len(dids)} DIDs") 193 + # step 2: resolve PDS concurrently 194 + log("resolving PDS endpoints (20 concurrent)...") 195 + t0 = time.monotonic() 157 196 158 - print(f"\n bridgy fed: {len(bridgy_dids)}") 159 - print(f" non-bridgy: {len(non_bridgy_dids)}\n") 197 + async def resolve_one(did: str) -> tuple[str, str | None]: 198 + pds = await resolve_pds(client, did, plc_sem) 199 + return did, pds 160 200 161 - if not bridgy_dids: 162 - print("nothing to purge!") 163 - return 201 + results = await asyncio.gather(*[resolve_one(d) for d in dids]) 202 + elapsed = time.monotonic() - t0 164 203 165 - # step 3: count docs to be deleted 166 - total_deleted = 0 167 - for did in bridgy_dids: 168 - count_rows = turso_query( 169 - settings, 170 - "SELECT COUNT(*) as cnt FROM documents WHERE did = ?", 171 - [did], 172 - ) 173 - count = int(count_rows[0]["cnt"]) if count_rows else 0 174 - total_deleted += count 204 + bridgy_dids = [] 205 + non_bridgy_dids = [] 206 + for did, pds in results: 207 + if pds and "brid.gy" in pds: 208 + bridgy_dids.append(did) 209 + else: 210 + non_bridgy_dids.append(did) 175 211 176 - print(f"total documents to delete: {total_deleted}") 212 + log(f" resolved {len(dids)} DIDs in {elapsed:.1f}s") 213 + log(f" bridgy fed: {len(bridgy_dids)}") 214 + log(f" non-bridgy: {len(non_bridgy_dids)}\n") 215 + 216 + if not bridgy_dids: 217 + log("nothing to purge!") 218 + return 219 + 220 + if not args.apply: 221 + log("pass --apply to actually delete these documents") 222 + return 177 223 178 - if not args.apply: 179 - print("\npass --apply to actually delete these documents") 180 - return 224 + # step 3: delete concurrently (10 concurrent turso connections) 225 + log(f"deleting {len(bridgy_dids)} bridgy fed DIDs (10 concurrent)...") 226 + t0 = time.monotonic() 227 + deleted = 0 228 + done = 0 181 229 182 - # step 4: delete in batches per DID 183 - print("\ndeleting...") 184 - deleted_docs = 0 185 - for i, did in enumerate(bridgy_dids): 186 - tags = turso_exec( 187 - settings, 188 - "DELETE FROM document_tags WHERE document_uri IN (SELECT uri FROM documents WHERE did = ?)", 189 - [did], 190 - ) 191 - fts = turso_exec( 192 - settings, 193 - "DELETE FROM documents_fts WHERE uri IN (SELECT uri FROM documents WHERE did = ?)", 194 - [did], 195 - ) 196 - docs = turso_exec( 197 - settings, 198 - "DELETE FROM documents WHERE did = ?", 199 - [did], 200 - ) 201 - deleted_docs += docs 230 + async def delete_one(did: str) -> int: 231 + return await delete_did(client, settings, did, turso_sem) 202 232 203 - if (i + 1) % 10 == 0 or (i + 1) == len(bridgy_dids): 204 - print( 205 - f" [{i+1}/{len(bridgy_dids)}] deleted {deleted_docs} docs so far" 206 - ) 233 + # process in chunks of 50 for progress reporting 234 + for chunk_start in range(0, len(bridgy_dids), 50): 235 + chunk = bridgy_dids[chunk_start : chunk_start + 50] 236 + counts = await asyncio.gather(*[delete_one(d) for d in chunk]) 237 + deleted += sum(counts) 238 + done += len(chunk) 239 + log(f" [{done}/{len(bridgy_dids)}] deleted {deleted} docs so far") 207 240 208 - print(f"\ndone! deleted {deleted_docs} documents from {len(bridgy_dids)} bridgy fed DIDs") 241 + elapsed = time.monotonic() - t0 242 + log(f"\ndone! deleted {deleted} documents from {len(bridgy_dids)} DIDs in {elapsed:.1f}s") 209 243 210 244 211 245 if __name__ == "__main__": 212 - main() 246 + asyncio.run(main())