search for standard sites pub-search.waow.tech
search zig blog atproto
at main 395 lines 14 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx", "pydantic-settings"] 5# /// 6""" 7Backfill records directly from a PDS. 8 9Usage: 10 ./scripts/backfill-pds did:plc:mkqt76xvfgxuemlwlx6ruc3w 11 ./scripts/backfill-pds zat.dev 12""" 13 14import argparse 15import json 16import os 17import sys 18 19import httpx 20from pydantic_settings import BaseSettings, SettingsConfigDict 21 22 23class Settings(BaseSettings): 24 model_config = SettingsConfigDict( 25 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 26 ) 27 28 turso_url: str 29 turso_token: str 30 31 @property 32 def turso_host(self) -> str: 33 url = self.turso_url 34 if url.startswith("libsql://"): 35 url = url[len("libsql://") :] 36 return url 37 38 39def resolve_handle(handle: str) -> str: 40 """Resolve a handle to a DID.""" 41 resp = httpx.get( 42 f"https://bsky.social/xrpc/com.atproto.identity.resolveHandle", 43 params={"handle": handle}, 44 timeout=30, 45 ) 46 resp.raise_for_status() 47 return resp.json()["did"] 48 49 50def get_pds_endpoint(did: str) -> str: 51 """Get PDS endpoint from PLC directory.""" 52 resp = httpx.get(f"https://plc.directory/{did}", timeout=30) 53 resp.raise_for_status() 54 data = resp.json() 55 for service in data.get("service", []): 56 if service.get("type") == "AtprotoPersonalDataServer": 57 return service["serviceEndpoint"] 58 raise ValueError(f"No PDS endpoint found for {did}") 59 60 61def list_records(pds: str, did: str, collection: str) -> list[dict]: 62 """List all records from a collection.""" 63 records = [] 64 cursor = None 65 while True: 66 params = {"repo": did, "collection": collection, "limit": 100} 67 if cursor: 68 params["cursor"] = cursor 69 resp = httpx.get( 70 f"{pds}/xrpc/com.atproto.repo.listRecords", params=params, timeout=30 71 ) 72 resp.raise_for_status() 73 data = resp.json() 74 records.extend(data.get("records", [])) 75 cursor = data.get("cursor") 76 if not cursor: 77 break 78 return records 79 80 81def turso_exec(settings: Settings, sql: str, args: list | None = None) -> None: 82 """Execute a statement against Turso.""" 83 stmt = {"sql": sql} 84 if args: 85 # Handle None values properly - use null type 86 stmt["args"] = [] 87 for a in args: 88 if a is None: 89 stmt["args"].append({"type": "null"}) 90 else: 91 stmt["args"].append({"type": "text", "value": str(a)}) 92 93 response = httpx.post( 94 f"https://{settings.turso_host}/v2/pipeline", 95 headers={ 96 "Authorization": f"Bearer {settings.turso_token}", 97 "Content-Type": "application/json", 98 }, 99 json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 100 timeout=30, 101 ) 102 if response.status_code != 200: 103 print(f"Turso error: {response.text}", file=sys.stderr) 104 response.raise_for_status() 105 106 107def extract_leaflet_blocks(pages: list) -> str: 108 """Extract text from leaflet pages/blocks structure.""" 109 texts = [] 110 for page in pages: 111 if not isinstance(page, dict): 112 continue 113 blocks = page.get("blocks", []) 114 for wrapper in blocks: 115 if not isinstance(wrapper, dict): 116 continue 117 block = wrapper.get("block", {}) 118 if not isinstance(block, dict): 119 continue 120 # Extract plaintext from text, header, blockquote, code blocks 121 block_type = block.get("$type", "") 122 if block_type in ( 123 "pub.leaflet.blocks.text", 124 "pub.leaflet.blocks.header", 125 "pub.leaflet.blocks.blockquote", 126 "pub.leaflet.blocks.code", 127 ): 128 plaintext = block.get("plaintext", "") 129 if plaintext: 130 texts.append(plaintext) 131 # Handle lists 132 elif block_type == "pub.leaflet.blocks.unorderedList": 133 texts.extend(extract_list_items(block.get("children", []))) 134 return " ".join(texts) 135 136 137def extract_list_items(children: list) -> list[str]: 138 """Recursively extract text from list items.""" 139 texts = [] 140 for child in children: 141 if not isinstance(child, dict): 142 continue 143 content = child.get("content", {}) 144 if isinstance(content, dict): 145 plaintext = content.get("plaintext", "") 146 if plaintext: 147 texts.append(plaintext) 148 # Recurse into nested children 149 nested = child.get("children", []) 150 if nested: 151 texts.extend(extract_list_items(nested)) 152 return texts 153 154 155def extract_document(record: dict, collection: str) -> dict | None: 156 """Extract document fields from a record.""" 157 value = record.get("value", {}) 158 159 # Get title 160 title = value.get("title") 161 if not title: 162 return None 163 164 # Skip author-only whitewind entries 165 if collection == "com.whtwnd.blog.entry": 166 if value.get("visibility") == "author": 167 return None 168 169 # Get content - try textContent (site.standard), then content as string 170 # (whitewind stores markdown), then leaflet blocks 171 content = value.get("textContent") or "" 172 if not content: 173 content_obj = value.get("content") 174 if isinstance(content_obj, str): 175 content = content_obj 176 elif isinstance(content_obj, dict): 177 pages = content_obj.get("pages", []) 178 if pages: 179 content = extract_leaflet_blocks(pages) 180 if not content: 181 # Try leaflet-style pages/blocks at top level (pub.leaflet.document) 182 pages = value.get("pages", []) 183 if pages: 184 content = extract_leaflet_blocks(pages) 185 186 # Get created_at (prefer publishedAt for leaflet/standard, createdAt for whitewind) 187 created_at = value.get("publishedAt") or value.get("createdAt", "") 188 189 # Get publication reference - try "publication" (leaflet) then "site" (site.standard) 190 publication = value.get("publication") or value.get("site") 191 publication_uri = None 192 if publication: 193 if isinstance(publication, dict): 194 publication_uri = publication.get("uri") 195 elif isinstance(publication, str): 196 publication_uri = publication 197 198 # Get URL path (site.standard.document uses "path" field like "/001") 199 path = value.get("path") 200 201 # Get tags 202 tags = value.get("tags", []) 203 if not isinstance(tags, list): 204 tags = [] 205 206 # Determine platform from collection 207 if collection.startswith("pub.leaflet"): 208 platform = "leaflet" 209 elif collection.startswith("blog.pckt"): 210 platform = "pckt" 211 elif collection.startswith("com.whtwnd"): 212 platform = "whitewind" 213 else: 214 # site.standard.* and others - platform will be detected from publication basePath 215 platform = "unknown" 216 217 return { 218 "title": title, 219 "content": content, 220 "created_at": created_at, 221 "publication_uri": publication_uri, 222 "tags": tags, 223 "platform": platform, 224 "collection": collection, 225 "path": path, 226 } 227 228 229def main(): 230 parser = argparse.ArgumentParser(description="Backfill records from a PDS") 231 parser.add_argument("identifier", help="DID or handle to backfill") 232 parser.add_argument("--dry-run", action="store_true", help="Show what would be done") 233 args = parser.parse_args() 234 235 try: 236 settings = Settings() # type: ignore 237 except Exception as e: 238 print(f"error loading settings: {e}", file=sys.stderr) 239 print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr) 240 sys.exit(1) 241 242 # Resolve identifier to DID 243 identifier = args.identifier 244 if identifier.startswith("did:"): 245 did = identifier 246 else: 247 print(f"resolving handle {identifier}...") 248 did = resolve_handle(identifier) 249 print(f" -> {did}") 250 251 # Get PDS endpoint 252 print(f"looking up PDS for {did}...") 253 pds = get_pds_endpoint(did) 254 print(f" -> {pds}") 255 256 # Collections to fetch 257 collections = [ 258 "pub.leaflet.document", 259 "pub.leaflet.publication", 260 "site.standard.document", 261 "site.standard.publication", 262 "com.whtwnd.blog.entry", 263 ] 264 265 total_docs = 0 266 total_pubs = 0 267 268 for collection in collections: 269 print(f"fetching {collection}...") 270 try: 271 records = list_records(pds, did, collection) 272 except httpx.HTTPStatusError as e: 273 if e.response.status_code == 400: 274 print(f" (no records)") 275 continue 276 raise 277 278 if not records: 279 print(f" (no records)") 280 continue 281 282 print(f" found {len(records)} records") 283 284 for record in records: 285 uri = record["uri"] 286 # Parse rkey from URI: at://did/collection/rkey 287 parts = uri.split("/") 288 rkey = parts[-1] 289 290 if collection.endswith(".document") or collection == "com.whtwnd.blog.entry": 291 doc = extract_document(record, collection) 292 if not doc: 293 print(f" skip {uri} (no title)") 294 continue 295 296 if args.dry_run: 297 print(f" would insert: {doc['title'][:50]}...") 298 else: 299 # Insert document 300 turso_exec( 301 settings, 302 """ 303 INSERT INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection, path, indexed_at) 304 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%S', 'now')) 305 ON CONFLICT(did, rkey) DO UPDATE SET 306 uri = excluded.uri, 307 title = excluded.title, 308 content = excluded.content, 309 created_at = excluded.created_at, 310 publication_uri = excluded.publication_uri, 311 platform = excluded.platform, 312 source_collection = excluded.source_collection, 313 path = excluded.path, 314 indexed_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') 315 """, 316 [uri, did, rkey, doc["title"], doc["content"], doc["created_at"], doc["publication_uri"], doc["platform"], doc["collection"], doc["path"]], 317 ) 318 # Insert tags 319 for tag in doc["tags"]: 320 turso_exec( 321 settings, 322 "INSERT OR IGNORE INTO document_tags (document_uri, tag) VALUES (?, ?)", 323 [uri, tag], 324 ) 325 # Update FTS index (delete then insert, FTS5 doesn't support ON CONFLICT) 326 turso_exec(settings, "DELETE FROM documents_fts WHERE uri = ?", [uri]) 327 turso_exec( 328 settings, 329 "INSERT INTO documents_fts (uri, title, content) VALUES (?, ?, ?)", 330 [uri, doc["title"], doc["content"]], 331 ) 332 print(f" indexed: {doc['title'][:50]}...") 333 total_docs += 1 334 335 elif collection.endswith(".publication"): 336 value = record["value"] 337 name = value.get("name", "") 338 description = value.get("description") 339 # base_path: try leaflet's "base_path", then strip scheme from site.standard's "url" 340 base_path = value.get("base_path") 341 if not base_path: 342 url = value.get("url") 343 if url: 344 # Strip https:// or http:// prefix 345 if url.startswith("https://"): 346 base_path = url[len("https://"):] 347 elif url.startswith("http://"): 348 base_path = url[len("http://"):] 349 else: 350 base_path = url 351 352 if args.dry_run: 353 print(f" would insert pub: {name}") 354 else: 355 turso_exec( 356 settings, 357 """ 358 INSERT INTO publications (uri, did, rkey, name, description, base_path) 359 VALUES (?, ?, ?, ?, ?, ?) 360 ON CONFLICT(uri) DO UPDATE SET 361 name = excluded.name, 362 description = excluded.description, 363 base_path = excluded.base_path 364 """, 365 [uri, did, rkey, name, description, base_path], 366 ) 367 print(f" indexed pub: {name}") 368 total_pubs += 1 369 370 # post-process: detect platform from publication basePath 371 if not args.dry_run and (total_docs > 0 or total_pubs > 0): 372 print("detecting platforms from publication basePath...") 373 turso_exec( 374 settings, 375 """ 376 UPDATE documents SET platform = 'pckt' 377 WHERE platform IN ('standardsite', 'unknown') 378 AND publication_uri IN (SELECT uri FROM publications WHERE base_path LIKE '%pckt.blog%') 379 """, 380 ) 381 turso_exec( 382 settings, 383 """ 384 UPDATE documents SET platform = 'leaflet' 385 WHERE platform IN ('standardsite', 'unknown') 386 AND publication_uri IN (SELECT uri FROM publications WHERE base_path LIKE '%leaflet.pub%') 387 """, 388 ) 389 print(" done") 390 391 print(f"\ndone! {total_docs} documents, {total_pubs} publications") 392 393 394if __name__ == "__main__": 395 main()