#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx", "pydantic-settings"] # /// """ Reset the turbopuffer vector index and trigger a full re-embedding. This script: 1. Deletes the turbopuffer namespace (old vectors with wrong dimensions) 2. Clears embedded_at in turso so the backend embedder re-processes all docs The backend embedder will automatically pick up unembedded docs and re-embed them with the current model on its next poll cycle (~60s). Usage: ./scripts/rebuild-vector-index # delete namespace + clear embedded_at ./scripts/rebuild-vector-index --check # show current state without changing anything """ import argparse import os import subprocess import sys import httpx from pydantic_settings import BaseSettings, SettingsConfigDict TPUF_NAMESPACE = "leaflet-search" FLY_APP = "leaflet-search-backend" class Settings(BaseSettings): model_config = SettingsConfigDict( env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" ) turso_url: str turso_token: str @property def turso_host(self) -> str: url = self.turso_url if url.startswith("libsql://"): url = url[len("libsql://"):] return url def pipeline(settings: Settings, statements: list[str]) -> list[dict]: requests = [{"type": "execute", "stmt": {"sql": sql}} for sql in statements] requests.append({"type": "close"}) resp = httpx.post( f"https://{settings.turso_host}/v2/pipeline", headers={ "Authorization": f"Bearer {settings.turso_token}", "Content-Type": "application/json", }, json={"requests": requests}, timeout=60, ) resp.raise_for_status() data = resp.json() results = [] for i, result in enumerate(data["results"][:-1]): if result["type"] == "error": raise Exception(f"statement {i} failed: {result['error']}") results.append(result["response"]["result"]) return results def scalar(settings: Settings, sql: str) -> int: results = pipeline(settings, [sql]) cell = results[0]["rows"][0][0] return int(cell["value"] if isinstance(cell, dict) else cell) def get_tpuf_key() -> str: result = subprocess.run( ["fly", "-a", FLY_APP, "ssh", "console", "-C", "printenv TURBOPUFFER_API_KEY"], capture_output=True, text=True, ) if result.returncode != 0: raise Exception(f"fly ssh failed: {result.stderr.strip()}") key = result.stdout.strip().splitlines()[-1].strip() if not key.startswith("tpuf_"): raise Exception(f"unexpected key format: {key[:10]}...") return key def delete_tpuf_namespace(api_key: str) -> str: resp = httpx.delete( f"https://api.turbopuffer.com/v2/namespaces/{TPUF_NAMESPACE}", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, timeout=30, ) if resp.status_code == 200: return "deleted" data = resp.json() if "not found" in data.get("error", ""): return "already gone" raise Exception(f"tpuf delete failed: {data}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--check", action="store_true", help="show state without changing anything") args = parser.parse_args() try: settings = Settings() # type: ignore except Exception as e: print(f"error: {e}", file=sys.stderr) print("required: TURSO_URL, TURSO_TOKEN (or .env file)", file=sys.stderr) sys.exit(1) total = scalar(settings, "SELECT COUNT(*) FROM documents") embedded = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL") print(f"documents: {total}, embedded: {embedded}") if args.check: return # step 1: get tpuf key print("getting tpuf key from fly...", end="", flush=True) tpuf_key = get_tpuf_key() print(f" ok ({tpuf_key[:10]}...)") # step 2: delete namespace print(f"deleting tpuf namespace '{TPUF_NAMESPACE}'...", end="", flush=True) status = delete_tpuf_namespace(tpuf_key) print(f" ok ({status})") # step 3: clear embedded_at print(f"clearing embedded_at...", end="", flush=True) pipeline(settings, ["UPDATE documents SET embedded_at = NULL"]) remaining = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL") print(f" ok ({remaining} remaining)") print(f"\ndone. embedder will re-embed {total} docs on next poll (~60s).") if __name__ == "__main__": main()