search for standard sites
pub-search.waow.tech
search
zig
blog
atproto
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "pydantic-settings"]
5# ///
6"""Wait for embedder to finish, then create the DiskANN index."""
7
8import os
9import sys
10import time
11
12import httpx
13from pydantic_settings import BaseSettings, SettingsConfigDict
14
15
16class Settings(BaseSettings):
17 model_config = SettingsConfigDict(
18 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore"
19 )
20 turso_url: str
21 turso_token: str
22
23 @property
24 def turso_host(self) -> str:
25 url = self.turso_url
26 if url.startswith("libsql://"):
27 url = url[len("libsql://") :]
28 return url
29
30
31def query(settings, sql, timeout=30):
32 response = httpx.post(
33 f"https://{settings.turso_host}/v2/pipeline",
34 headers={
35 "Authorization": f"Bearer {settings.turso_token}",
36 "Content-Type": "application/json",
37 },
38 json={"requests": [{"type": "execute", "stmt": {"sql": sql}}, {"type": "close"}]},
39 timeout=timeout,
40 )
41 response.raise_for_status()
42 result = response.json()["results"][0]
43 if result["type"] == "error":
44 raise Exception(result["error"])
45 return result["response"]["result"]
46
47
48def scalar(settings, sql):
49 cell = query(settings, sql)["rows"][0][0]
50 return int(cell["value"] if isinstance(cell, dict) else cell)
51
52
53settings = Settings() # type: ignore
54total = scalar(settings, "SELECT count(*) FROM documents")
55prev = 0
56stall_count = 0
57
58print(f"waiting for {total} documents to be embedded...", flush=True)
59
60while True:
61 embedded = scalar(settings, "SELECT count(*) FROM documents WHERE embedding IS NOT NULL")
62 remaining = total - embedded
63 rate = embedded - prev
64 prev = embedded
65
66 print(f" {embedded}/{total} ({remaining} left, +{rate} since last check)", flush=True)
67
68 if remaining == 0:
69 break
70
71 if rate == 0:
72 stall_count += 1
73 if stall_count >= 5:
74 print(f"embedder appears stalled at {embedded}/{total}", flush=True)
75 print("creating index with what we have", flush=True)
76 break
77 else:
78 stall_count = 0
79
80 time.sleep(30)
81
82print("creating DiskANN index...", flush=True)
83query(settings, "DROP TABLE IF EXISTS libsql_vector_meta_shadow", timeout=60)
84query(settings, "CREATE INDEX documents_embedding_idx ON documents(libsql_vector_idx(embedding))", timeout=300)
85
86indexed = scalar(settings,
87 "SELECT count(*) FROM vector_top_k('documents_embedding_idx', "
88 "(SELECT embedding FROM documents WHERE embedding IS NOT NULL LIMIT 1), 10000)"
89)
90print(f"done — {indexed} documents indexed", flush=True)