search for standard sites
pub-search.waow.tech
search
zig
blog
atproto
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "pydantic-settings"]
5# ///
6"""
7Reset the turbopuffer vector index and trigger a full re-embedding.
8
9This script:
10 1. Deletes the turbopuffer namespace (old vectors with wrong dimensions)
11 2. Clears embedded_at in turso so the backend embedder re-processes all docs
12
13The backend embedder will automatically pick up unembedded docs and
14re-embed them with the current model on its next poll cycle (~60s).
15
16Usage:
17 ./scripts/rebuild-vector-index # delete namespace + clear embedded_at
18 ./scripts/rebuild-vector-index --check # show current state without changing anything
19"""
20
21import argparse
22import os
23import subprocess
24import sys
25
26import httpx
27from pydantic_settings import BaseSettings, SettingsConfigDict
28
29
30TPUF_NAMESPACE = "leaflet-search"
31FLY_APP = "leaflet-search-backend"
32
33
34class Settings(BaseSettings):
35 model_config = SettingsConfigDict(
36 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore"
37 )
38 turso_url: str
39 turso_token: str
40
41 @property
42 def turso_host(self) -> str:
43 url = self.turso_url
44 if url.startswith("libsql://"):
45 url = url[len("libsql://"):]
46 return url
47
48
49def pipeline(settings: Settings, statements: list[str]) -> list[dict]:
50 requests = [{"type": "execute", "stmt": {"sql": sql}} for sql in statements]
51 requests.append({"type": "close"})
52
53 resp = httpx.post(
54 f"https://{settings.turso_host}/v2/pipeline",
55 headers={
56 "Authorization": f"Bearer {settings.turso_token}",
57 "Content-Type": "application/json",
58 },
59 json={"requests": requests},
60 timeout=60,
61 )
62 resp.raise_for_status()
63 data = resp.json()
64
65 results = []
66 for i, result in enumerate(data["results"][:-1]):
67 if result["type"] == "error":
68 raise Exception(f"statement {i} failed: {result['error']}")
69 results.append(result["response"]["result"])
70 return results
71
72
73def scalar(settings: Settings, sql: str) -> int:
74 results = pipeline(settings, [sql])
75 cell = results[0]["rows"][0][0]
76 return int(cell["value"] if isinstance(cell, dict) else cell)
77
78
79def get_tpuf_key() -> str:
80 result = subprocess.run(
81 ["fly", "-a", FLY_APP, "ssh", "console", "-C", "printenv TURBOPUFFER_API_KEY"],
82 capture_output=True, text=True,
83 )
84 if result.returncode != 0:
85 raise Exception(f"fly ssh failed: {result.stderr.strip()}")
86 key = result.stdout.strip().splitlines()[-1].strip()
87 if not key.startswith("tpuf_"):
88 raise Exception(f"unexpected key format: {key[:10]}...")
89 return key
90
91
92def delete_tpuf_namespace(api_key: str) -> str:
93 resp = httpx.delete(
94 f"https://api.turbopuffer.com/v2/namespaces/{TPUF_NAMESPACE}",
95 headers={
96 "Authorization": f"Bearer {api_key}",
97 "Content-Type": "application/json",
98 },
99 timeout=30,
100 )
101 if resp.status_code == 200:
102 return "deleted"
103 data = resp.json()
104 if "not found" in data.get("error", ""):
105 return "already gone"
106 raise Exception(f"tpuf delete failed: {data}")
107
108
109def main():
110 parser = argparse.ArgumentParser()
111 parser.add_argument("--check", action="store_true", help="show state without changing anything")
112 args = parser.parse_args()
113
114 try:
115 settings = Settings() # type: ignore
116 except Exception as e:
117 print(f"error: {e}", file=sys.stderr)
118 print("required: TURSO_URL, TURSO_TOKEN (or .env file)", file=sys.stderr)
119 sys.exit(1)
120
121 total = scalar(settings, "SELECT COUNT(*) FROM documents")
122 embedded = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL")
123 print(f"documents: {total}, embedded: {embedded}")
124
125 if args.check:
126 return
127
128 # step 1: get tpuf key
129 print("getting tpuf key from fly...", end="", flush=True)
130 tpuf_key = get_tpuf_key()
131 print(f" ok ({tpuf_key[:10]}...)")
132
133 # step 2: delete namespace
134 print(f"deleting tpuf namespace '{TPUF_NAMESPACE}'...", end="", flush=True)
135 status = delete_tpuf_namespace(tpuf_key)
136 print(f" ok ({status})")
137
138 # step 3: clear embedded_at
139 print(f"clearing embedded_at...", end="", flush=True)
140 pipeline(settings, ["UPDATE documents SET embedded_at = NULL"])
141 remaining = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL")
142 print(f" ok ({remaining} remaining)")
143
144 print(f"\ndone. embedder will re-embed {total} docs on next poll (~60s).")
145
146
147if __name__ == "__main__":
148 main()