search for standard sites pub-search.waow.tech
search zig blog atproto
at ca756a01806bc76bc6514afb7ba67f4baa3b5491 90 lines 2.7 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx", "pydantic-settings"] 5# /// 6"""Wait for embedder to finish, then create the DiskANN index.""" 7 8import os 9import sys 10import time 11 12import httpx 13from pydantic_settings import BaseSettings, SettingsConfigDict 14 15 16class Settings(BaseSettings): 17 model_config = SettingsConfigDict( 18 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 19 ) 20 turso_url: str 21 turso_token: str 22 23 @property 24 def turso_host(self) -> str: 25 url = self.turso_url 26 if url.startswith("libsql://"): 27 url = url[len("libsql://") :] 28 return url 29 30 31def query(settings, sql, timeout=30): 32 response = httpx.post( 33 f"https://{settings.turso_host}/v2/pipeline", 34 headers={ 35 "Authorization": f"Bearer {settings.turso_token}", 36 "Content-Type": "application/json", 37 }, 38 json={"requests": [{"type": "execute", "stmt": {"sql": sql}}, {"type": "close"}]}, 39 timeout=timeout, 40 ) 41 response.raise_for_status() 42 result = response.json()["results"][0] 43 if result["type"] == "error": 44 raise Exception(result["error"]) 45 return result["response"]["result"] 46 47 48def scalar(settings, sql): 49 cell = query(settings, sql)["rows"][0][0] 50 return int(cell["value"] if isinstance(cell, dict) else cell) 51 52 53settings = Settings() # type: ignore 54total = scalar(settings, "SELECT count(*) FROM documents") 55prev = 0 56stall_count = 0 57 58print(f"waiting for {total} documents to be embedded...", flush=True) 59 60while True: 61 embedded = scalar(settings, "SELECT count(*) FROM documents WHERE embedding IS NOT NULL") 62 remaining = total - embedded 63 rate = embedded - prev 64 prev = embedded 65 66 print(f" {embedded}/{total} ({remaining} left, +{rate} since last check)", flush=True) 67 68 if remaining == 0: 69 break 70 71 if rate == 0: 72 stall_count += 1 73 if stall_count >= 5: 74 print(f"embedder appears stalled at {embedded}/{total}", flush=True) 75 print("creating index with what we have", flush=True) 76 break 77 else: 78 stall_count = 0 79 80 time.sleep(30) 81 82print("creating DiskANN index...", flush=True) 83query(settings, "DROP TABLE IF EXISTS libsql_vector_meta_shadow", timeout=60) 84query(settings, "CREATE INDEX documents_embedding_idx ON documents(libsql_vector_idx(embedding))", timeout=300) 85 86indexed = scalar(settings, 87 "SELECT count(*) FROM vector_top_k('documents_embedding_idx', " 88 "(SELECT embedding FROM documents WHERE embedding IS NOT NULL LIMIT 1), 10000)" 89) 90print(f"done — {indexed} documents indexed", flush=True)