#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx", "pydantic-settings"] # /// """Wait for embedder to finish, then create the DiskANN index.""" import os import sys import time import httpx from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): model_config = SettingsConfigDict( env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" ) turso_url: str turso_token: str @property def turso_host(self) -> str: url = self.turso_url if url.startswith("libsql://"): url = url[len("libsql://") :] return url def query(settings, sql, timeout=30): response = httpx.post( f"https://{settings.turso_host}/v2/pipeline", headers={ "Authorization": f"Bearer {settings.turso_token}", "Content-Type": "application/json", }, json={"requests": [{"type": "execute", "stmt": {"sql": sql}}, {"type": "close"}]}, timeout=timeout, ) response.raise_for_status() result = response.json()["results"][0] if result["type"] == "error": raise Exception(result["error"]) return result["response"]["result"] def scalar(settings, sql): cell = query(settings, sql)["rows"][0][0] return int(cell["value"] if isinstance(cell, dict) else cell) settings = Settings() # type: ignore total = scalar(settings, "SELECT count(*) FROM documents") prev = 0 stall_count = 0 print(f"waiting for {total} documents to be embedded...", flush=True) while True: embedded = scalar(settings, "SELECT count(*) FROM documents WHERE embedding IS NOT NULL") remaining = total - embedded rate = embedded - prev prev = embedded print(f" {embedded}/{total} ({remaining} left, +{rate} since last check)", flush=True) if remaining == 0: break if rate == 0: stall_count += 1 if stall_count >= 5: print(f"embedder appears stalled at {embedded}/{total}", flush=True) print("creating index with what we have", flush=True) break else: stall_count = 0 time.sleep(30) print("creating DiskANN index...", flush=True) query(settings, "DROP TABLE IF EXISTS libsql_vector_meta_shadow", timeout=60) query(settings, "CREATE INDEX documents_embedding_idx ON documents(libsql_vector_idx(embedding))", timeout=300) indexed = scalar(settings, "SELECT count(*) FROM vector_top_k('documents_embedding_idx', " "(SELECT embedding FROM documents WHERE embedding IS NOT NULL LIMIT 1), 10000)" ) print(f"done — {indexed} documents indexed", flush=True)