search for standard sites pub-search.waow.tech
search zig blog atproto
at main 148 lines 4.6 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx", "pydantic-settings"] 5# /// 6""" 7Reset the turbopuffer vector index and trigger a full re-embedding. 8 9This script: 10 1. Deletes the turbopuffer namespace (old vectors with wrong dimensions) 11 2. Clears embedded_at in turso so the backend embedder re-processes all docs 12 13The backend embedder will automatically pick up unembedded docs and 14re-embed them with the current model on its next poll cycle (~60s). 15 16Usage: 17 ./scripts/rebuild-vector-index # delete namespace + clear embedded_at 18 ./scripts/rebuild-vector-index --check # show current state without changing anything 19""" 20 21import argparse 22import os 23import subprocess 24import sys 25 26import httpx 27from pydantic_settings import BaseSettings, SettingsConfigDict 28 29 30TPUF_NAMESPACE = "leaflet-search" 31FLY_APP = "leaflet-search-backend" 32 33 34class Settings(BaseSettings): 35 model_config = SettingsConfigDict( 36 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 37 ) 38 turso_url: str 39 turso_token: str 40 41 @property 42 def turso_host(self) -> str: 43 url = self.turso_url 44 if url.startswith("libsql://"): 45 url = url[len("libsql://"):] 46 return url 47 48 49def pipeline(settings: Settings, statements: list[str]) -> list[dict]: 50 requests = [{"type": "execute", "stmt": {"sql": sql}} for sql in statements] 51 requests.append({"type": "close"}) 52 53 resp = httpx.post( 54 f"https://{settings.turso_host}/v2/pipeline", 55 headers={ 56 "Authorization": f"Bearer {settings.turso_token}", 57 "Content-Type": "application/json", 58 }, 59 json={"requests": requests}, 60 timeout=60, 61 ) 62 resp.raise_for_status() 63 data = resp.json() 64 65 results = [] 66 for i, result in enumerate(data["results"][:-1]): 67 if result["type"] == "error": 68 raise Exception(f"statement {i} failed: {result['error']}") 69 results.append(result["response"]["result"]) 70 return results 71 72 73def scalar(settings: Settings, sql: str) -> int: 74 results = pipeline(settings, [sql]) 75 cell = results[0]["rows"][0][0] 76 return int(cell["value"] if isinstance(cell, dict) else cell) 77 78 79def get_tpuf_key() -> str: 80 result = subprocess.run( 81 ["fly", "-a", FLY_APP, "ssh", "console", "-C", "printenv TURBOPUFFER_API_KEY"], 82 capture_output=True, text=True, 83 ) 84 if result.returncode != 0: 85 raise Exception(f"fly ssh failed: {result.stderr.strip()}") 86 key = result.stdout.strip().splitlines()[-1].strip() 87 if not key.startswith("tpuf_"): 88 raise Exception(f"unexpected key format: {key[:10]}...") 89 return key 90 91 92def delete_tpuf_namespace(api_key: str) -> str: 93 resp = httpx.delete( 94 f"https://api.turbopuffer.com/v2/namespaces/{TPUF_NAMESPACE}", 95 headers={ 96 "Authorization": f"Bearer {api_key}", 97 "Content-Type": "application/json", 98 }, 99 timeout=30, 100 ) 101 if resp.status_code == 200: 102 return "deleted" 103 data = resp.json() 104 if "not found" in data.get("error", ""): 105 return "already gone" 106 raise Exception(f"tpuf delete failed: {data}") 107 108 109def main(): 110 parser = argparse.ArgumentParser() 111 parser.add_argument("--check", action="store_true", help="show state without changing anything") 112 args = parser.parse_args() 113 114 try: 115 settings = Settings() # type: ignore 116 except Exception as e: 117 print(f"error: {e}", file=sys.stderr) 118 print("required: TURSO_URL, TURSO_TOKEN (or .env file)", file=sys.stderr) 119 sys.exit(1) 120 121 total = scalar(settings, "SELECT COUNT(*) FROM documents") 122 embedded = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL") 123 print(f"documents: {total}, embedded: {embedded}") 124 125 if args.check: 126 return 127 128 # step 1: get tpuf key 129 print("getting tpuf key from fly...", end="", flush=True) 130 tpuf_key = get_tpuf_key() 131 print(f" ok ({tpuf_key[:10]}...)") 132 133 # step 2: delete namespace 134 print(f"deleting tpuf namespace '{TPUF_NAMESPACE}'...", end="", flush=True) 135 status = delete_tpuf_namespace(tpuf_key) 136 print(f" ok ({status})") 137 138 # step 3: clear embedded_at 139 print(f"clearing embedded_at...", end="", flush=True) 140 pipeline(settings, ["UPDATE documents SET embedded_at = NULL"]) 141 remaining = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL") 142 print(f" ok ({remaining} remaining)") 143 144 print(f"\ndone. embedder will re-embed {total} docs on next poll (~60s).") 145 146 147if __name__ == "__main__": 148 main()