search for standard sites
pub-search.waow.tech
search
zig
blog
atproto
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "pydantic-settings"]
5# ///
6"""
7Backfill records directly from a PDS.
8
9Usage:
10 ./scripts/backfill-pds did:plc:mkqt76xvfgxuemlwlx6ruc3w
11 ./scripts/backfill-pds zat.dev
12"""
13
14import argparse
15import json
16import os
17import sys
18
19import httpx
20from pydantic_settings import BaseSettings, SettingsConfigDict
21
22
23class Settings(BaseSettings):
24 model_config = SettingsConfigDict(
25 env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore"
26 )
27
28 turso_url: str
29 turso_token: str
30
31 @property
32 def turso_host(self) -> str:
33 url = self.turso_url
34 if url.startswith("libsql://"):
35 url = url[len("libsql://") :]
36 return url
37
38
39def resolve_handle(handle: str) -> str:
40 """Resolve a handle to a DID."""
41 resp = httpx.get(
42 f"https://bsky.social/xrpc/com.atproto.identity.resolveHandle",
43 params={"handle": handle},
44 timeout=30,
45 )
46 resp.raise_for_status()
47 return resp.json()["did"]
48
49
50def get_pds_endpoint(did: str) -> str:
51 """Get PDS endpoint from PLC directory."""
52 resp = httpx.get(f"https://plc.directory/{did}", timeout=30)
53 resp.raise_for_status()
54 data = resp.json()
55 for service in data.get("service", []):
56 if service.get("type") == "AtprotoPersonalDataServer":
57 return service["serviceEndpoint"]
58 raise ValueError(f"No PDS endpoint found for {did}")
59
60
61def list_records(pds: str, did: str, collection: str) -> list[dict]:
62 """List all records from a collection."""
63 records = []
64 cursor = None
65 while True:
66 params = {"repo": did, "collection": collection, "limit": 100}
67 if cursor:
68 params["cursor"] = cursor
69 resp = httpx.get(
70 f"{pds}/xrpc/com.atproto.repo.listRecords", params=params, timeout=30
71 )
72 resp.raise_for_status()
73 data = resp.json()
74 records.extend(data.get("records", []))
75 cursor = data.get("cursor")
76 if not cursor:
77 break
78 return records
79
80
81def turso_exec(settings: Settings, sql: str, args: list | None = None) -> None:
82 """Execute a statement against Turso."""
83 stmt = {"sql": sql}
84 if args:
85 # Handle None values properly - use null type
86 stmt["args"] = []
87 for a in args:
88 if a is None:
89 stmt["args"].append({"type": "null"})
90 else:
91 stmt["args"].append({"type": "text", "value": str(a)})
92
93 response = httpx.post(
94 f"https://{settings.turso_host}/v2/pipeline",
95 headers={
96 "Authorization": f"Bearer {settings.turso_token}",
97 "Content-Type": "application/json",
98 },
99 json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]},
100 timeout=30,
101 )
102 if response.status_code != 200:
103 print(f"Turso error: {response.text}", file=sys.stderr)
104 response.raise_for_status()
105
106
107def extract_leaflet_blocks(pages: list) -> str:
108 """Extract text from leaflet pages/blocks structure."""
109 texts = []
110 for page in pages:
111 if not isinstance(page, dict):
112 continue
113 blocks = page.get("blocks", [])
114 for wrapper in blocks:
115 if not isinstance(wrapper, dict):
116 continue
117 block = wrapper.get("block", {})
118 if not isinstance(block, dict):
119 continue
120 # Extract plaintext from text, header, blockquote, code blocks
121 block_type = block.get("$type", "")
122 if block_type in (
123 "pub.leaflet.blocks.text",
124 "pub.leaflet.blocks.header",
125 "pub.leaflet.blocks.blockquote",
126 "pub.leaflet.blocks.code",
127 ):
128 plaintext = block.get("plaintext", "")
129 if plaintext:
130 texts.append(plaintext)
131 # Handle lists
132 elif block_type == "pub.leaflet.blocks.unorderedList":
133 texts.extend(extract_list_items(block.get("children", [])))
134 return " ".join(texts)
135
136
137def extract_list_items(children: list) -> list[str]:
138 """Recursively extract text from list items."""
139 texts = []
140 for child in children:
141 if not isinstance(child, dict):
142 continue
143 content = child.get("content", {})
144 if isinstance(content, dict):
145 plaintext = content.get("plaintext", "")
146 if plaintext:
147 texts.append(plaintext)
148 # Recurse into nested children
149 nested = child.get("children", [])
150 if nested:
151 texts.extend(extract_list_items(nested))
152 return texts
153
154
155def extract_document(record: dict, collection: str) -> dict | None:
156 """Extract document fields from a record."""
157 value = record.get("value", {})
158
159 # Get title
160 title = value.get("title")
161 if not title:
162 return None
163
164 # Skip author-only whitewind entries
165 if collection == "com.whtwnd.blog.entry":
166 if value.get("visibility") == "author":
167 return None
168
169 # Get content - try textContent (site.standard), then content as string
170 # (whitewind stores markdown), then leaflet blocks
171 content = value.get("textContent") or ""
172 if not content:
173 content_obj = value.get("content")
174 if isinstance(content_obj, str):
175 content = content_obj
176 elif isinstance(content_obj, dict):
177 pages = content_obj.get("pages", [])
178 if pages:
179 content = extract_leaflet_blocks(pages)
180 if not content:
181 # Try leaflet-style pages/blocks at top level (pub.leaflet.document)
182 pages = value.get("pages", [])
183 if pages:
184 content = extract_leaflet_blocks(pages)
185
186 # Get created_at (prefer publishedAt for leaflet/standard, createdAt for whitewind)
187 created_at = value.get("publishedAt") or value.get("createdAt", "")
188
189 # Get publication reference - try "publication" (leaflet) then "site" (site.standard)
190 publication = value.get("publication") or value.get("site")
191 publication_uri = None
192 if publication:
193 if isinstance(publication, dict):
194 publication_uri = publication.get("uri")
195 elif isinstance(publication, str):
196 publication_uri = publication
197
198 # Get URL path (site.standard.document uses "path" field like "/001")
199 path = value.get("path")
200
201 # Get tags
202 tags = value.get("tags", [])
203 if not isinstance(tags, list):
204 tags = []
205
206 # Determine platform from collection
207 if collection.startswith("pub.leaflet"):
208 platform = "leaflet"
209 elif collection.startswith("blog.pckt"):
210 platform = "pckt"
211 elif collection.startswith("com.whtwnd"):
212 platform = "whitewind"
213 else:
214 # site.standard.* and others - platform will be detected from publication basePath
215 platform = "unknown"
216
217 return {
218 "title": title,
219 "content": content,
220 "created_at": created_at,
221 "publication_uri": publication_uri,
222 "tags": tags,
223 "platform": platform,
224 "collection": collection,
225 "path": path,
226 }
227
228
229def main():
230 parser = argparse.ArgumentParser(description="Backfill records from a PDS")
231 parser.add_argument("identifier", help="DID or handle to backfill")
232 parser.add_argument("--dry-run", action="store_true", help="Show what would be done")
233 args = parser.parse_args()
234
235 try:
236 settings = Settings() # type: ignore
237 except Exception as e:
238 print(f"error loading settings: {e}", file=sys.stderr)
239 print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr)
240 sys.exit(1)
241
242 # Resolve identifier to DID
243 identifier = args.identifier
244 if identifier.startswith("did:"):
245 did = identifier
246 else:
247 print(f"resolving handle {identifier}...")
248 did = resolve_handle(identifier)
249 print(f" -> {did}")
250
251 # Get PDS endpoint
252 print(f"looking up PDS for {did}...")
253 pds = get_pds_endpoint(did)
254 print(f" -> {pds}")
255
256 # Collections to fetch
257 collections = [
258 "pub.leaflet.document",
259 "pub.leaflet.publication",
260 "site.standard.document",
261 "site.standard.publication",
262 "com.whtwnd.blog.entry",
263 ]
264
265 total_docs = 0
266 total_pubs = 0
267
268 for collection in collections:
269 print(f"fetching {collection}...")
270 try:
271 records = list_records(pds, did, collection)
272 except httpx.HTTPStatusError as e:
273 if e.response.status_code == 400:
274 print(f" (no records)")
275 continue
276 raise
277
278 if not records:
279 print(f" (no records)")
280 continue
281
282 print(f" found {len(records)} records")
283
284 for record in records:
285 uri = record["uri"]
286 # Parse rkey from URI: at://did/collection/rkey
287 parts = uri.split("/")
288 rkey = parts[-1]
289
290 if collection.endswith(".document") or collection == "com.whtwnd.blog.entry":
291 doc = extract_document(record, collection)
292 if not doc:
293 print(f" skip {uri} (no title)")
294 continue
295
296 if args.dry_run:
297 print(f" would insert: {doc['title'][:50]}...")
298 else:
299 # Insert document
300 turso_exec(
301 settings,
302 """
303 INSERT INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection, path, indexed_at)
304 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%S', 'now'))
305 ON CONFLICT(did, rkey) DO UPDATE SET
306 uri = excluded.uri,
307 title = excluded.title,
308 content = excluded.content,
309 created_at = excluded.created_at,
310 publication_uri = excluded.publication_uri,
311 platform = excluded.platform,
312 source_collection = excluded.source_collection,
313 path = excluded.path,
314 indexed_at = strftime('%Y-%m-%dT%H:%M:%S', 'now')
315 """,
316 [uri, did, rkey, doc["title"], doc["content"], doc["created_at"], doc["publication_uri"], doc["platform"], doc["collection"], doc["path"]],
317 )
318 # Insert tags
319 for tag in doc["tags"]:
320 turso_exec(
321 settings,
322 "INSERT OR IGNORE INTO document_tags (document_uri, tag) VALUES (?, ?)",
323 [uri, tag],
324 )
325 # Update FTS index (delete then insert, FTS5 doesn't support ON CONFLICT)
326 turso_exec(settings, "DELETE FROM documents_fts WHERE uri = ?", [uri])
327 turso_exec(
328 settings,
329 "INSERT INTO documents_fts (uri, title, content) VALUES (?, ?, ?)",
330 [uri, doc["title"], doc["content"]],
331 )
332 print(f" indexed: {doc['title'][:50]}...")
333 total_docs += 1
334
335 elif collection.endswith(".publication"):
336 value = record["value"]
337 name = value.get("name", "")
338 description = value.get("description")
339 # base_path: try leaflet's "base_path", then strip scheme from site.standard's "url"
340 base_path = value.get("base_path")
341 if not base_path:
342 url = value.get("url")
343 if url:
344 # Strip https:// or http:// prefix
345 if url.startswith("https://"):
346 base_path = url[len("https://"):]
347 elif url.startswith("http://"):
348 base_path = url[len("http://"):]
349 else:
350 base_path = url
351
352 if args.dry_run:
353 print(f" would insert pub: {name}")
354 else:
355 turso_exec(
356 settings,
357 """
358 INSERT INTO publications (uri, did, rkey, name, description, base_path)
359 VALUES (?, ?, ?, ?, ?, ?)
360 ON CONFLICT(uri) DO UPDATE SET
361 name = excluded.name,
362 description = excluded.description,
363 base_path = excluded.base_path
364 """,
365 [uri, did, rkey, name, description, base_path],
366 )
367 print(f" indexed pub: {name}")
368 total_pubs += 1
369
370 # post-process: detect platform from publication basePath
371 if not args.dry_run and (total_docs > 0 or total_pubs > 0):
372 print("detecting platforms from publication basePath...")
373 turso_exec(
374 settings,
375 """
376 UPDATE documents SET platform = 'pckt'
377 WHERE platform IN ('standardsite', 'unknown')
378 AND publication_uri IN (SELECT uri FROM publications WHERE base_path LIKE '%pckt.blog%')
379 """,
380 )
381 turso_exec(
382 settings,
383 """
384 UPDATE documents SET platform = 'leaflet'
385 WHERE platform IN ('standardsite', 'unknown')
386 AND publication_uri IN (SELECT uri FROM publications WHERE base_path LIKE '%leaflet.pub%')
387 """,
388 )
389 print(" done")
390
391 print(f"\ndone! {total_docs} documents, {total_pubs} publications")
392
393
394if __name__ == "__main__":
395 main()