search for standard sites pub-search.waow.tech
search zig blog atproto

chore: add vector index and bridgy-fed purge scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+498
+184
scripts/populate-vector-index
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """ 7 + Populate the DiskANN vector index for semantic search. 8 + 9 + The index only tracks writes that happen after it's created, so existing 10 + embeddings need to be touched (UPDATE SET embedding = embedding) to get 11 + indexed. This script does that in batches to avoid turso timeouts. 12 + 13 + Usage: 14 + ./scripts/populate-vector-index # populate index 15 + ./scripts/populate-vector-index --batch-size 200 # custom batch size 16 + ./scripts/populate-vector-index --check # just check index status 17 + """ 18 + 19 + import argparse 20 + import os 21 + import sys 22 + 23 + import httpx 24 + from pydantic_settings import BaseSettings, SettingsConfigDict 25 + 26 + 27 + class Settings(BaseSettings): 28 + model_config = SettingsConfigDict( 29 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 30 + ) 31 + turso_url: str 32 + turso_token: str 33 + 34 + @property 35 + def turso_host(self) -> str: 36 + url = self.turso_url 37 + if url.startswith("libsql://"): 38 + url = url[len("libsql://") :] 39 + return url 40 + 41 + 42 + def query(settings: Settings, sql: str, timeout: int = 30) -> dict: 43 + response = httpx.post( 44 + f"https://{settings.turso_host}/v2/pipeline", 45 + headers={ 46 + "Authorization": f"Bearer {settings.turso_token}", 47 + "Content-Type": "application/json", 48 + }, 49 + json={ 50 + "requests": [ 51 + {"type": "execute", "stmt": {"sql": sql}}, 52 + {"type": "close"}, 53 + ] 54 + }, 55 + timeout=timeout, 56 + ) 57 + response.raise_for_status() 58 + data = response.json() 59 + result = data["results"][0] 60 + if result["type"] == "error": 61 + raise Exception(f"turso error: {result['error']}") 62 + return result["response"]["result"] 63 + 64 + 65 + def exec_sql(settings: Settings, sql: str, timeout: int = 120) -> int: 66 + result = query(settings, sql, timeout=timeout) 67 + return result.get("affected_row_count", 0) 68 + 69 + 70 + def get_scalar(settings: Settings, sql: str) -> int: 71 + result = query(settings, sql) 72 + row = result["rows"][0] 73 + cell = row[0] 74 + return int(cell["value"] if isinstance(cell, dict) else cell) 75 + 76 + 77 + def check_status(settings: Settings) -> tuple[int, int]: 78 + """Returns (total_embeddings, indexed_count).""" 79 + total = get_scalar( 80 + settings, "SELECT COUNT(*) FROM documents WHERE embedding IS NOT NULL" 81 + ) 82 + 83 + try: 84 + # use a dummy vector to count indexed docs 85 + indexed = get_scalar( 86 + settings, 87 + "SELECT count(*) FROM vector_top_k('documents_embedding_idx', " 88 + "(SELECT embedding FROM documents WHERE embedding IS NOT NULL LIMIT 1), 10000)", 89 + ) 90 + except Exception as e: 91 + if "not found" in str(e).lower(): 92 + indexed = -1 # index doesn't exist 93 + else: 94 + raise 95 + 96 + return total, indexed 97 + 98 + 99 + def main(): 100 + parser = argparse.ArgumentParser( 101 + description="populate DiskANN vector index for semantic search" 102 + ) 103 + parser.add_argument( 104 + "--batch-size", 105 + type=int, 106 + default=500, 107 + help="rows per UPDATE batch (default: 500)", 108 + ) 109 + parser.add_argument( 110 + "--workers", type=int, default=8, help="concurrent workers (default: 8)" 111 + ) 112 + parser.add_argument( 113 + "--check", action="store_true", help="just check index status and exit" 114 + ) 115 + args = parser.parse_args() 116 + 117 + try: 118 + settings = Settings() # type: ignore 119 + except Exception as e: 120 + print(f"error loading settings: {e}", file=sys.stderr) 121 + print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr) 122 + sys.exit(1) 123 + 124 + total, indexed = check_status(settings) 125 + 126 + if indexed == -1: 127 + print("vector index does not exist — creating it...") 128 + exec_sql( 129 + settings, 130 + "CREATE INDEX IF NOT EXISTS documents_embedding_idx " 131 + "ON documents(libsql_vector_idx(embedding))", 132 + ) 133 + print("index created") 134 + indexed = 0 135 + 136 + print(f"embeddings: {total}, indexed: {indexed}") 137 + 138 + if args.check: 139 + return 140 + 141 + remaining = total - indexed 142 + if remaining <= 0: 143 + print("index is fully populated") 144 + return 145 + 146 + import time 147 + from concurrent.futures import ThreadPoolExecutor, as_completed 148 + 149 + batch_size = args.batch_size 150 + num_batches = (remaining + batch_size - 1) // batch_size 151 + workers = min(args.workers, num_batches) 152 + print(f"touching {remaining} rows: {num_batches} batches of {batch_size}, {workers} workers", flush=True) 153 + 154 + # pre-compute all batch offset ranges 155 + batches = list(range(0, total, batch_size)) 156 + 157 + def touch_batch(offset: int) -> int: 158 + sql = ( 159 + f"UPDATE documents SET embedding = embedding " 160 + f"WHERE rowid IN (" 161 + f" SELECT rowid FROM documents WHERE embedding IS NOT NULL " 162 + f" ORDER BY rowid LIMIT {batch_size} OFFSET {offset}" 163 + f")" 164 + ) 165 + return exec_sql(settings, sql, timeout=300) 166 + 167 + touched = 0 168 + start = time.time() 169 + with ThreadPoolExecutor(max_workers=workers) as pool: 170 + futures = {pool.submit(touch_batch, o): o for o in batches} 171 + for future in as_completed(futures): 172 + affected = future.result() 173 + touched += affected 174 + elapsed = time.time() - start 175 + rate = touched / elapsed if elapsed > 0 else 0 176 + eta = (remaining - touched) / rate if rate > 0 else 0 177 + print(f" {touched}/{remaining} ({rate:.0f} rows/s, ~{eta:.0f}s left)", flush=True) 178 + 179 + total_after, indexed_after = check_status(settings) 180 + print(f"done — embeddings: {total_after}, indexed: {indexed_after}") 181 + 182 + 183 + if __name__ == "__main__": 184 + main()
+224
scripts/purge-bridgyfed-vectors
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """ 7 + Purge bridgy fed vectors from turbopuffer. 8 + 9 + The turso purge script cleaned the database, but vectors in turbopuffer 10 + were never removed. This script finds vectors with platform='other' and 11 + empty base_path, resolves their DIDs via plc.directory, and deletes 12 + confirmed bridgy fed vectors from turbopuffer. 13 + 14 + Loops until all bridgy vectors are removed (tpuf caps queries at 10k). 15 + 16 + Usage: 17 + ./scripts/purge-bridgyfed-vectors # dry run (default) 18 + ./scripts/purge-bridgyfed-vectors --apply # actually delete 19 + """ 20 + 21 + import argparse 22 + import asyncio 23 + import os 24 + import sys 25 + import time 26 + 27 + import httpx 28 + from pydantic_settings import BaseSettings, SettingsConfigDict 29 + 30 + 31 + def log(msg: str) -> None: 32 + print(msg, flush=True) 33 + 34 + 35 + class Settings(BaseSettings): 36 + model_config = SettingsConfigDict( 37 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 38 + ) 39 + 40 + turbopuffer_api_key: str 41 + turbopuffer_namespace: str = "leaflet-search" 42 + 43 + 44 + async def resolve_pds( 45 + client: httpx.AsyncClient, did: str, semaphore: asyncio.Semaphore 46 + ) -> str | None: 47 + """Get PDS endpoint from PLC directory (or did:web .well-known).""" 48 + async with semaphore: 49 + try: 50 + if did.startswith("did:web:"): 51 + domain = did[len("did:web:"):] 52 + url = f"https://{domain}/.well-known/did.json" 53 + else: 54 + url = f"https://plc.directory/{did}" 55 + resp = await client.get(url, timeout=30) 56 + resp.raise_for_status() 57 + data = resp.json() 58 + for service in data.get("service", []): 59 + if service.get("type") == "AtprotoPersonalDataServer": 60 + return service["serviceEndpoint"] 61 + except Exception as e: 62 + print(f" lookup failed for {did}: {e}", file=sys.stderr, flush=True) 63 + return None 64 + 65 + 66 + def tpuf_query( 67 + client: httpx.Client, settings: Settings, filters: list, top_k: int = 10000 68 + ) -> list[dict]: 69 + """Query turbopuffer with attribute filters.""" 70 + url = f"https://api.turbopuffer.com/v2/namespaces/{settings.turbopuffer_namespace}/query" 71 + resp = client.post( 72 + url, 73 + headers={ 74 + "Authorization": f"Bearer {settings.turbopuffer_api_key}", 75 + "Content-Type": "application/json", 76 + }, 77 + json={ 78 + "top_k": top_k, 79 + "filters": filters, 80 + "include_attributes": ["did"], 81 + }, 82 + timeout=60, 83 + ) 84 + resp.raise_for_status() 85 + return resp.json().get("rows", []) 86 + 87 + 88 + def tpuf_delete(client: httpx.Client, settings: Settings, ids: list[str]) -> None: 89 + """Delete vectors by ID from turbopuffer.""" 90 + url = f"https://api.turbopuffer.com/v2/namespaces/{settings.turbopuffer_namespace}" 91 + resp = client.post( 92 + url, 93 + headers={ 94 + "Authorization": f"Bearer {settings.turbopuffer_api_key}", 95 + "Content-Type": "application/json", 96 + }, 97 + json={"deletes": ids}, 98 + timeout=60, 99 + ) 100 + resp.raise_for_status() 101 + 102 + 103 + async def resolve_new_dids( 104 + dids: list[str], cache: dict[str, bool] 105 + ) -> dict[str, bool]: 106 + """Resolve DIDs not already in cache. Returns updated cache.""" 107 + new_dids = [d for d in dids if d not in cache] 108 + if not new_dids: 109 + return cache 110 + 111 + log(f" resolving {len(new_dids)} new DIDs (20 concurrent)...") 112 + plc_sem = asyncio.Semaphore(20) 113 + 114 + async with httpx.AsyncClient() as client: 115 + 116 + async def resolve_one(did: str) -> tuple[str, bool]: 117 + pds = await resolve_pds(client, did, plc_sem) 118 + is_bridgy = pds is not None and "brid.gy" in pds 119 + return did, is_bridgy 120 + 121 + results = await asyncio.gather(*[resolve_one(d) for d in new_dids]) 122 + 123 + for did, is_bridgy in results: 124 + cache[did] = is_bridgy 125 + 126 + bridgy = sum(1 for _, b in results if b) 127 + log(f" {bridgy} bridgy, {len(new_dids) - bridgy} non-bridgy") 128 + return cache 129 + 130 + 131 + async def main(): 132 + parser = argparse.ArgumentParser( 133 + description="Purge bridgy fed vectors from turbopuffer" 134 + ) 135 + parser.add_argument( 136 + "--apply", action="store_true", help="Actually delete (default is dry run)" 137 + ) 138 + args = parser.parse_args() 139 + 140 + try: 141 + settings = Settings() # type: ignore 142 + except Exception as e: 143 + print(f"error loading settings: {e}", file=sys.stderr) 144 + print("required env vars: TURBOPUFFER_API_KEY", file=sys.stderr) 145 + sys.exit(1) 146 + 147 + if not args.apply: 148 + log("=== DRY RUN (pass --apply to delete) ===\n") 149 + 150 + sync_client = httpx.Client() 151 + filters = ["And", [["platform", "Eq", "other"], ["base_path", "Eq", ""]]] 152 + 153 + # cache: DID -> is_bridgy (persists across rounds) 154 + pds_cache: dict[str, bool] = {} 155 + total_deleted = 0 156 + round_num = 0 157 + 158 + while True: 159 + round_num += 1 160 + log(f"--- round {round_num} ---") 161 + log("querying turbopuffer for platform='other', base_path=''...") 162 + 163 + rows = tpuf_query(sync_client, settings, filters) 164 + log(f" found {len(rows)} vectors") 165 + 166 + if not rows: 167 + break 168 + 169 + # collect unique DIDs and map DID -> vector IDs 170 + did_to_ids: dict[str, list[str]] = {} 171 + for row in rows: 172 + did_to_ids.setdefault(row["did"], []).append(row["id"]) 173 + 174 + # resolve any new DIDs 175 + t0 = time.monotonic() 176 + pds_cache = await resolve_new_dids(list(did_to_ids.keys()), pds_cache) 177 + elapsed = time.monotonic() - t0 178 + if elapsed > 0.1: 179 + log(f" resolved in {elapsed:.1f}s") 180 + 181 + # collect vector IDs for bridgy DIDs 182 + ids_to_delete = [] 183 + for did, ids in did_to_ids.items(): 184 + if pds_cache.get(did, False): 185 + ids_to_delete.extend(ids) 186 + 187 + bridgy_count = sum(1 for d in did_to_ids if pds_cache.get(d, False)) 188 + log(f" {bridgy_count} bridgy DIDs, {len(ids_to_delete)} vectors to delete\n") 189 + 190 + if not ids_to_delete: 191 + break 192 + 193 + if not args.apply: 194 + total_deleted += len(ids_to_delete) 195 + # dry run: single pass only (can't paginate without deleting) 196 + if len(rows) == 10000: 197 + log(" (tpuf capped at 10k — actual count may be higher)") 198 + break 199 + 200 + # batch delete 201 + for i in range(0, len(ids_to_delete), 500): 202 + batch = ids_to_delete[i : i + 500] 203 + tpuf_delete(sync_client, settings, batch) 204 + total_deleted += len(batch) 205 + log(f" deleted {len(batch)} vectors (total: {total_deleted})") 206 + 207 + # if we got fewer than 10k, we've seen everything 208 + if len(rows) < 10000: 209 + break 210 + 211 + sync_client.close() 212 + 213 + bridgy_total = sum(1 for v in pds_cache.values() if v) 214 + if total_deleted: 215 + log( 216 + f"\n{'would delete' if not args.apply else 'done! deleted'} " 217 + f"{total_deleted} vectors from {bridgy_total} bridgy fed DIDs" 218 + ) 219 + else: 220 + log("\nnothing to purge!") 221 + 222 + 223 + if __name__ == "__main__": 224 + asyncio.run(main())
+90
scripts/wait-and-create-index
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """Wait for embedder to finish, then create the DiskANN index.""" 7 + 8 + import os 9 + import sys 10 + import time 11 + 12 + import httpx 13 + from pydantic_settings import BaseSettings, SettingsConfigDict 14 + 15 + 16 + class Settings(BaseSettings): 17 + model_config = SettingsConfigDict( 18 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 19 + ) 20 + turso_url: str 21 + turso_token: str 22 + 23 + @property 24 + def turso_host(self) -> str: 25 + url = self.turso_url 26 + if url.startswith("libsql://"): 27 + url = url[len("libsql://") :] 28 + return url 29 + 30 + 31 + def query(settings, sql, timeout=30): 32 + response = httpx.post( 33 + f"https://{settings.turso_host}/v2/pipeline", 34 + headers={ 35 + "Authorization": f"Bearer {settings.turso_token}", 36 + "Content-Type": "application/json", 37 + }, 38 + json={"requests": [{"type": "execute", "stmt": {"sql": sql}}, {"type": "close"}]}, 39 + timeout=timeout, 40 + ) 41 + response.raise_for_status() 42 + result = response.json()["results"][0] 43 + if result["type"] == "error": 44 + raise Exception(result["error"]) 45 + return result["response"]["result"] 46 + 47 + 48 + def scalar(settings, sql): 49 + cell = query(settings, sql)["rows"][0][0] 50 + return int(cell["value"] if isinstance(cell, dict) else cell) 51 + 52 + 53 + settings = Settings() # type: ignore 54 + total = scalar(settings, "SELECT count(*) FROM documents") 55 + prev = 0 56 + stall_count = 0 57 + 58 + print(f"waiting for {total} documents to be embedded...", flush=True) 59 + 60 + while True: 61 + embedded = scalar(settings, "SELECT count(*) FROM documents WHERE embedding IS NOT NULL") 62 + remaining = total - embedded 63 + rate = embedded - prev 64 + prev = embedded 65 + 66 + print(f" {embedded}/{total} ({remaining} left, +{rate} since last check)", flush=True) 67 + 68 + if remaining == 0: 69 + break 70 + 71 + if rate == 0: 72 + stall_count += 1 73 + if stall_count >= 5: 74 + print(f"embedder appears stalled at {embedded}/{total}", flush=True) 75 + print("creating index with what we have", flush=True) 76 + break 77 + else: 78 + stall_count = 0 79 + 80 + time.sleep(30) 81 + 82 + print("creating DiskANN index...", flush=True) 83 + query(settings, "DROP TABLE IF EXISTS libsql_vector_meta_shadow", timeout=60) 84 + query(settings, "CREATE INDEX documents_embedding_idx ON documents(libsql_vector_idx(embedding))", timeout=300) 85 + 86 + indexed = scalar(settings, 87 + "SELECT count(*) FROM vector_top_k('documents_embedding_idx', " 88 + "(SELECT embedding FROM documents WHERE embedding IS NOT NULL LIMIT 1), 10000)" 89 + ) 90 + print(f"done — {indexed} documents indexed", flush=True)