search for standard sites pub-search.waow.tech
search zig blog atproto

add vector similarity search and tombstone tracking

- add /similar endpoint for finding semantically related documents
- add tombstones table for tracking deleted records
- backfill script for embedding documents via Voyage AI (3470 docs done)
- show "related to [top result]" section at bottom of search results
- helper scripts for checking vector index and testing queries

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+564
+50
backend/src/db/mod.zig
··· 83 83 84 84 pub fn deleteDocument(uri: []const u8) void { 85 85 var c = &(client orelse return); 86 + // record tombstone 87 + var ts_buf: [20]u8 = undefined; 88 + const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 89 + c.exec( 90 + "INSERT OR REPLACE INTO tombstones (uri, record_type, deleted_at) VALUES (?, 'document', ?)", 91 + &.{ uri, ts }, 92 + ) catch {}; 93 + // delete record 86 94 c.exec("DELETE FROM documents WHERE uri = ?", &.{uri}) catch {}; 87 95 c.exec("DELETE FROM documents_fts WHERE uri = ?", &.{uri}) catch {}; 88 96 c.exec("DELETE FROM document_tags WHERE document_uri = ?", &.{uri}) catch {}; ··· 90 98 91 99 pub fn deletePublication(uri: []const u8) void { 92 100 var c = &(client orelse return); 101 + // record tombstone 102 + var ts_buf: [20]u8 = undefined; 103 + const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 104 + c.exec( 105 + "INSERT OR REPLACE INTO tombstones (uri, record_type, deleted_at) VALUES (?, 'publication', ?)", 106 + &.{ uri, ts }, 107 + ) catch {}; 108 + // delete record 93 109 c.exec("DELETE FROM publications WHERE uri = ?", &.{uri}) catch {}; 94 110 c.exec("DELETE FROM publications_fts WHERE uri = ?", &.{uri}) catch {}; 95 111 } ··· 378 394 379 395 return buf[0 .. trimmed_len + 1]; 380 396 } 397 + 398 + /// Find documents similar to a given document using vector similarity 399 + pub fn findSimilar(alloc: Allocator, uri: []const u8, limit: usize) ![]const u8 { 400 + var c = &(client orelse return error.NotInitialized); 401 + 402 + var output: std.Io.Writer.Allocating = .init(alloc); 403 + errdefer output.deinit(); 404 + 405 + var limit_buf: [8]u8 = undefined; 406 + const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit + 1}) catch "6"; // +1 to exclude self 407 + 408 + // vector similarity search using the document's embedding 409 + // note: CAST required because Hrana sends all values as text 410 + var res = c.query( 411 + \\SELECT d.uri, d.did, d.title, '' as snippet, 412 + \\ d.created_at, d.rkey, COALESCE(p.base_path, '') as base_path, 413 + \\ CASE WHEN d.publication_uri != '' THEN 1 ELSE 0 END as has_publication 414 + \\FROM vector_top_k('documents_embedding_idx', 415 + \\ (SELECT embedding FROM documents WHERE uri = ?), CAST(? AS INTEGER)) AS v 416 + \\JOIN documents d ON d.rowid = v.id 417 + \\LEFT JOIN publications p ON d.publication_uri = p.uri 418 + \\WHERE d.uri != ? 419 + , &.{ uri, limit_str, uri }) catch { 420 + try output.writer.writeAll("[]"); 421 + return try output.toOwnedSlice(); 422 + }; 423 + defer res.deinit(); 424 + 425 + var jw: json.Stringify = .{ .writer = &output.writer }; 426 + try jw.beginArray(); 427 + for (res.rows) |row| try jw.write(Doc.fromRow(row).toJson()); 428 + try jw.endArray(); 429 + return try output.toOwnedSlice(); 430 + }
+11
backend/src/db/schema.zig
··· 89 89 \\ count INTEGER DEFAULT 1 90 90 \\) 91 91 , &.{}); 92 + 93 + // tombstones for deleted records 94 + try client.exec( 95 + \\CREATE TABLE IF NOT EXISTS tombstones ( 96 + \\ uri TEXT PRIMARY KEY, 97 + \\ record_type TEXT NOT NULL, 98 + \\ deleted_at INTEGER NOT NULL 99 + \\) 100 + , &.{}); 92 101 } 93 102 94 103 fn runMigrations(client: *Client) !void { ··· 96 105 client.exec("ALTER TABLE documents ADD COLUMN publication_uri TEXT", &.{}) catch {}; 97 106 client.exec("ALTER TABLE publications ADD COLUMN base_path TEXT", &.{}) catch {}; 98 107 client.exec("ALTER TABLE stats ADD COLUMN service_started_at INTEGER", &.{}) catch {}; 108 + 109 + // vector embeddings column already added by backfill script 99 110 }
+20
backend/src/server.zig
··· 56 56 try handlePopular(request); 57 57 } else if (mem.eql(u8, target, "/dashboard")) { 58 58 try handleDashboard(request); 59 + } else if (mem.startsWith(u8, target, "/similar")) { 60 + try handleSimilar(request, target); 59 61 } else { 60 62 try sendNotFound(request); 61 63 } ··· 193 195 }, 194 196 }); 195 197 } 198 + 199 + fn handleSimilar(request: *http.Server.Request, target: []const u8) !void { 200 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 201 + defer arena.deinit(); 202 + const alloc = arena.allocator(); 203 + 204 + const uri = parseQueryParam(alloc, target, "uri") catch { 205 + try sendJson(request, "{\"error\":\"missing uri parameter\"}"); 206 + return; 207 + }; 208 + 209 + const results = db.findSimilar(alloc, uri, 5) catch { 210 + try sendJson(request, "[]"); 211 + return; 212 + }; 213 + 214 + try sendJson(request, results); 215 + }
+189
scripts/backfill-embeddings
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """ 7 + Backfill embeddings for leaflet-search documents. 8 + 9 + Usage: 10 + ./scripts/backfill-embeddings # process all documents missing embeddings 11 + ./scripts/backfill-embeddings --limit 10 # process 10 documents 12 + ./scripts/backfill-embeddings --dry-run # show what would be processed 13 + """ 14 + 15 + import argparse 16 + import json 17 + import os 18 + import sys 19 + 20 + import httpx 21 + from pydantic_settings import BaseSettings, SettingsConfigDict 22 + 23 + 24 + class Settings(BaseSettings): 25 + model_config = SettingsConfigDict( 26 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 27 + ) 28 + 29 + turso_url: str 30 + turso_token: str 31 + voyage_api_key: str 32 + 33 + @property 34 + def turso_host(self) -> str: 35 + """Strip libsql:// prefix if present.""" 36 + url = self.turso_url 37 + if url.startswith("libsql://"): 38 + url = url[len("libsql://"):] 39 + return url 40 + 41 + 42 + def turso_query(settings: Settings, sql: str, args: list | None = None) -> list[dict]: 43 + """Execute a query against Turso and return rows.""" 44 + stmt = {"sql": sql} 45 + if args: 46 + stmt["args"] = [{"type": "text", "value": str(a)} for a in args] 47 + 48 + response = httpx.post( 49 + f"https://{settings.turso_host}/v2/pipeline", 50 + headers={ 51 + "Authorization": f"Bearer {settings.turso_token}", 52 + "Content-Type": "application/json", 53 + }, 54 + json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 55 + timeout=30, 56 + ) 57 + response.raise_for_status() 58 + data = response.json() 59 + 60 + result = data["results"][0] 61 + if result["type"] == "error": 62 + raise Exception(f"Turso error: {result['error']}") 63 + 64 + cols = [c["name"] for c in result["response"]["result"]["cols"]] 65 + rows = result["response"]["result"]["rows"] 66 + 67 + def extract_value(cell): 68 + if cell is None: 69 + return None 70 + if isinstance(cell, dict): 71 + return cell.get("value") 72 + # cell might be the value directly in some formats 73 + return cell 74 + 75 + return [dict(zip(cols, [extract_value(cell) for cell in row])) for row in rows] 76 + 77 + 78 + def turso_exec(settings: Settings, sql: str, args: list | None = None) -> None: 79 + """Execute a statement against Turso.""" 80 + stmt = {"sql": sql} 81 + if args: 82 + stmt["args"] = [{"type": "text", "value": str(a)} for a in args] 83 + 84 + response = httpx.post( 85 + f"https://{settings.turso_host}/v2/pipeline", 86 + headers={ 87 + "Authorization": f"Bearer {settings.turso_token}", 88 + "Content-Type": "application/json", 89 + }, 90 + json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 91 + timeout=30, 92 + ) 93 + response.raise_for_status() 94 + data = response.json() 95 + result = data["results"][0] 96 + if result["type"] == "error": 97 + raise Exception(f"Turso error: {result['error']}") 98 + 99 + 100 + def voyage_embed(settings: Settings, texts: list[str]) -> list[list[float]]: 101 + """Generate embeddings using Voyage AI.""" 102 + response = httpx.post( 103 + "https://api.voyageai.com/v1/embeddings", 104 + headers={ 105 + "Authorization": f"Bearer {settings.voyage_api_key}", 106 + "Content-Type": "application/json", 107 + }, 108 + json={ 109 + "input": texts, 110 + "model": "voyage-3-lite", 111 + "input_type": "document", 112 + }, 113 + timeout=60, 114 + ) 115 + response.raise_for_status() 116 + data = response.json() 117 + return [item["embedding"] for item in data["data"]] 118 + 119 + 120 + def main(): 121 + parser = argparse.ArgumentParser(description="Backfill embeddings for leaflet-search") 122 + parser.add_argument("--limit", type=int, default=0, help="max documents to process (0 = all)") 123 + parser.add_argument("--batch-size", type=int, default=20, help="documents per Voyage API call") 124 + parser.add_argument("--dry-run", action="store_true", help="show what would be processed") 125 + args = parser.parse_args() 126 + 127 + try: 128 + settings = Settings() # type: ignore 129 + except Exception as e: 130 + print(f"error loading settings: {e}", file=sys.stderr) 131 + print("required env vars: TURSO_URL, TURSO_TOKEN, VOYAGE_API_KEY", file=sys.stderr) 132 + sys.exit(1) 133 + 134 + # check if embedding column exists, add if not 135 + try: 136 + turso_query(settings, "SELECT embedding FROM documents LIMIT 1") 137 + except Exception as e: 138 + if "no such column" in str(e).lower(): 139 + print("adding embedding column...") 140 + turso_exec(settings, "ALTER TABLE documents ADD COLUMN embedding F32_BLOB(512)") 141 + print("done") 142 + else: 143 + raise 144 + 145 + # get documents needing embeddings 146 + limit_clause = f"LIMIT {args.limit}" if args.limit > 0 else "" 147 + docs = turso_query( 148 + settings, 149 + f"SELECT uri, title, content FROM documents WHERE embedding IS NULL {limit_clause}", 150 + ) 151 + 152 + if not docs: 153 + print("no documents need embeddings") 154 + return 155 + 156 + print(f"found {len(docs)} documents needing embeddings") 157 + 158 + if args.dry_run: 159 + for doc in docs[:10]: 160 + print(f" - {doc['uri']}: {doc['title'][:50]}...") 161 + if len(docs) > 10: 162 + print(f" ... and {len(docs) - 10} more") 163 + return 164 + 165 + # process in batches 166 + processed = 0 167 + for i in range(0, len(docs), args.batch_size): 168 + batch = docs[i : i + args.batch_size] 169 + texts = [f"{doc['title']} {doc['content']}" for doc in batch] 170 + 171 + print(f"embedding batch {i // args.batch_size + 1} ({len(batch)} docs)...") 172 + embeddings = voyage_embed(settings, texts) 173 + 174 + for doc, embedding in zip(batch, embeddings): 175 + embedding_json = json.dumps(embedding) 176 + turso_exec( 177 + settings, 178 + "UPDATE documents SET embedding = vector32(?) WHERE uri = ?", 179 + [embedding_json, doc["uri"]], 180 + ) 181 + processed += 1 182 + 183 + print(f" updated {processed}/{len(docs)}") 184 + 185 + print(f"done! processed {processed} documents") 186 + 187 + 188 + if __name__ == "__main__": 189 + main()
+55
scripts/check-tables
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """Check what tables exist in the database.""" 7 + 8 + import os 9 + import httpx 10 + from pydantic_settings import BaseSettings, SettingsConfigDict 11 + 12 + 13 + class Settings(BaseSettings): 14 + model_config = SettingsConfigDict( 15 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 16 + ) 17 + turso_url: str 18 + turso_token: str 19 + 20 + @property 21 + def turso_host(self) -> str: 22 + url = self.turso_url 23 + if url.startswith("libsql://"): 24 + url = url[len("libsql://"):] 25 + return url 26 + 27 + 28 + settings = Settings() # type: ignore 29 + 30 + response = httpx.post( 31 + f"https://{settings.turso_host}/v2/pipeline", 32 + headers={ 33 + "Authorization": f"Bearer {settings.turso_token}", 34 + "Content-Type": "application/json", 35 + }, 36 + json={ 37 + "requests": [ 38 + {"type": "execute", "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"}}, 39 + {"type": "close"}, 40 + ] 41 + }, 42 + timeout=30, 43 + ) 44 + response.raise_for_status() 45 + data = response.json() 46 + 47 + result = data["results"][0] 48 + if result["type"] == "error": 49 + print(f"Error: {result['error']}") 50 + else: 51 + rows = result["response"]["result"]["rows"] 52 + print("Tables in database:") 53 + for row in rows: 54 + name = row[0]["value"] if isinstance(row[0], dict) else row[0] 55 + print(f" - {name}")
+78
scripts/check-vector
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """Check vector index and embeddings status.""" 7 + 8 + import os 9 + import httpx 10 + from pydantic_settings import BaseSettings, SettingsConfigDict 11 + 12 + 13 + class Settings(BaseSettings): 14 + model_config = SettingsConfigDict( 15 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 16 + ) 17 + turso_url: str 18 + turso_token: str 19 + 20 + @property 21 + def turso_host(self) -> str: 22 + url = self.turso_url 23 + if url.startswith("libsql://"): 24 + url = url[len("libsql://"):] 25 + return url 26 + 27 + 28 + def query(settings, sql): 29 + response = httpx.post( 30 + f"https://{settings.turso_host}/v2/pipeline", 31 + headers={ 32 + "Authorization": f"Bearer {settings.turso_token}", 33 + "Content-Type": "application/json", 34 + }, 35 + json={ 36 + "requests": [ 37 + {"type": "execute", "stmt": {"sql": sql}}, 38 + {"type": "close"}, 39 + ] 40 + }, 41 + timeout=30, 42 + ) 43 + response.raise_for_status() 44 + return response.json() 45 + 46 + 47 + settings = Settings() # type: ignore 48 + 49 + # Check embeddings count 50 + print("Checking embeddings...") 51 + result = query(settings, "SELECT COUNT(*) as total, SUM(CASE WHEN embedding IS NOT NULL THEN 1 ELSE 0 END) as with_embeddings FROM documents") 52 + data = result["results"][0]["response"]["result"]["rows"][0] 53 + total = data[0]["value"] if isinstance(data[0], dict) else data[0] 54 + with_emb = data[1]["value"] if isinstance(data[1], dict) else data[1] 55 + print(f" Total documents: {total}") 56 + print(f" With embeddings: {with_emb}") 57 + 58 + # Check if vector index exists 59 + print("\nChecking for vector index...") 60 + result = query(settings, "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%embedding%'") 61 + rows = result["results"][0]["response"]["result"]["rows"] 62 + for row in rows: 63 + name = row[0]["value"] if isinstance(row[0], dict) else row[0] 64 + print(f" Found table: {name}") 65 + 66 + # Try to use the index directly 67 + print("\nTrying vector search...") 68 + try: 69 + result = query(settings, """ 70 + SELECT d.uri, d.title 71 + FROM vector_top_k('documents_embedding_idx', 72 + (SELECT embedding FROM documents LIMIT 1), 3) AS v 73 + JOIN documents d ON d.rowid = v.id 74 + LIMIT 3 75 + """) 76 + print(f" Result: {result['results'][0]}") 77 + except Exception as e: 78 + print(f" Error: {e}")
+83
scripts/test-similar
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """Test the exact similar query.""" 7 + 8 + import os 9 + import sys 10 + import httpx 11 + from pydantic_settings import BaseSettings, SettingsConfigDict 12 + 13 + 14 + class Settings(BaseSettings): 15 + model_config = SettingsConfigDict( 16 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 17 + ) 18 + turso_url: str 19 + turso_token: str 20 + 21 + @property 22 + def turso_host(self) -> str: 23 + url = self.turso_url 24 + if url.startswith("libsql://"): 25 + url = url[len("libsql://"):] 26 + return url 27 + 28 + 29 + def query(settings, sql, args=None): 30 + stmt = {"sql": sql} 31 + if args: 32 + stmt["args"] = [{"type": "text", "value": str(a)} for a in args] 33 + 34 + response = httpx.post( 35 + f"https://{settings.turso_host}/v2/pipeline", 36 + headers={ 37 + "Authorization": f"Bearer {settings.turso_token}", 38 + "Content-Type": "application/json", 39 + }, 40 + json={ 41 + "requests": [ 42 + {"type": "execute", "stmt": stmt}, 43 + {"type": "close"}, 44 + ] 45 + }, 46 + timeout=30, 47 + ) 48 + response.raise_for_status() 49 + return response.json() 50 + 51 + 52 + settings = Settings() # type: ignore 53 + uri = sys.argv[1] if len(sys.argv) > 1 else "at://did:plc:aub4nfuiysmvcfc4g5gptp47/pub.leaflet.document/3lusgsxusqk2w" 54 + limit = "6" 55 + 56 + print(f"Testing similar query for: {uri}") 57 + print(f"Limit: {limit}") 58 + 59 + # Fixed SQL with CAST 60 + sql = """ 61 + SELECT d.uri, d.did, d.title, '' as snippet, 62 + d.created_at, d.rkey, COALESCE(p.base_path, '') as base_path, 63 + CASE WHEN d.publication_uri != '' THEN 1 ELSE 0 END as has_publication 64 + FROM vector_top_k('documents_embedding_idx', 65 + (SELECT embedding FROM documents WHERE uri = ?), CAST(? AS INTEGER)) AS v 66 + JOIN documents d ON d.rowid = v.id 67 + LEFT JOIN publications p ON d.publication_uri = p.uri 68 + WHERE d.uri != ? 69 + """ 70 + 71 + try: 72 + result = query(settings, sql, [uri, limit, uri]) 73 + print(f"\nResult type: {result['results'][0]['type']}") 74 + if result['results'][0]['type'] == 'ok': 75 + rows = result['results'][0]['response']['result']['rows'] 76 + print(f"Found {len(rows)} similar documents:") 77 + for row in rows[:5]: 78 + title = row[2]["value"] if isinstance(row[2], dict) else row[2] 79 + print(f" - {title}") 80 + else: 81 + print(f"Error: {result['results'][0]}") 82 + except Exception as e: 83 + print(f"Exception: {e}")
+78
site/index.html
··· 223 223 text-align: center; 224 224 } 225 225 226 + .related-section { 227 + margin-top: 1rem; 228 + padding-top: 0; 229 + } 230 + 231 + .related-header { 232 + font-size: 11px; 233 + color: #444; 234 + margin-bottom: 0.75rem; 235 + } 236 + 237 + .related-items { 238 + display: flex; 239 + flex-wrap: wrap; 240 + gap: 0.5rem; 241 + } 242 + 243 + .related-item { 244 + font-size: 12px; 245 + padding: 0.4rem 0.6rem; 246 + background: #111; 247 + border: 1px solid #222; 248 + border-radius: 3px; 249 + color: #888; 250 + text-decoration: none; 251 + max-width: 200px; 252 + overflow: hidden; 253 + text-overflow: ellipsis; 254 + white-space: nowrap; 255 + } 256 + 257 + .related-item:hover { 258 + background: #1a1a1a; 259 + border-color: #333; 260 + color: #aaa; 261 + } 262 + 226 263 .tags { 227 264 margin-bottom: 1rem; 228 265 } ··· 405 442 resultsDiv.innerHTML = html; 406 443 statsDiv.textContent = `${results.length} results`; 407 444 445 + // load related documents based on top result 446 + if (results.length > 0 && results[0].uri) { 447 + loadRelated(results[0]); 448 + } 449 + 408 450 } catch (err) { 409 451 resultsDiv.innerHTML = `<div class="status error">error: ${err.message}<br><code style="font-size:0.7rem">${searchUrl}</code></div>`; 410 452 } finally { ··· 569 611 570 612 if (initialQuery || initialTag) { 571 613 search(initialQuery || '', initialTag); 614 + } 615 + 616 + async function loadRelated(topResult) { 617 + try { 618 + const res = await fetch(`${API_URL}/similar?uri=${encodeURIComponent(topResult.uri)}`); 619 + const related = await res.json(); 620 + 621 + if (!related || related.length === 0) return; 622 + 623 + // filter out the top result itself and limit to 4 624 + const filtered = related 625 + .filter(r => r.uri !== topResult.uri) 626 + .slice(0, 4); 627 + 628 + if (filtered.length === 0) return; 629 + 630 + const items = filtered.map(doc => { 631 + const url = doc.basePath && doc.rkey 632 + ? `https://${doc.basePath}/${doc.rkey}` 633 + : (doc.did && doc.rkey ? `https://leaflet.pub/p/${doc.did}/${doc.rkey}` : null); 634 + return url 635 + ? `<a href="${url}" target="_blank" class="related-item">${escapeHtml(doc.title || 'Untitled')}</a>` 636 + : `<span class="related-item">${escapeHtml(doc.title || 'Untitled')}</span>`; 637 + }).join(''); 638 + 639 + const relatedHtml = ` 640 + <div class="related-section"> 641 + <div class="related-header">related to "${escapeHtml(topResult.title?.slice(0, 30) || 'top result')}${topResult.title?.length > 30 ? '...' : ''}"</div> 642 + <div class="related-items">${items}</div> 643 + </div> 644 + `; 645 + 646 + resultsDiv.insertAdjacentHTML('beforeend', relatedHtml); 647 + } catch (e) { 648 + // silently fail - related is optional 649 + } 572 650 } 573 651 574 652 loadTags();