semantic bufo search
find-bufo.com
bufo
1#!/usr/bin/env python3
2# /// script
3# requires-python = ">=3.11"
4# dependencies = [
5# "httpx",
6# "beautifulsoup4",
7# "rich",
8# "python-dotenv",
9# "pillow",
10# ]
11# ///
12"""
13Scrape all bufos from bufo.zone, generate embeddings, and upload to turbopuffer.
14"""
15
16import asyncio
17import base64
18import hashlib
19import os
20import re
21from io import BytesIO
22from pathlib import Path
23from typing import List
24
25import httpx
26from bs4 import BeautifulSoup
27from PIL import Image
28from rich.console import Console
29from rich.progress import Progress, SpinnerColumn, TextColumn
30from dotenv import load_dotenv
31
32console = Console()
33
34# Load .env from project root
35load_dotenv(Path(__file__).parent.parent / ".env")
36
37
38async def fetch_bufo_urls() -> set[str]:
39 """Fetch all unique bufo URLs from bufo.zone"""
40 console.print("[cyan]fetching bufo list from bufo.zone...[/cyan]")
41
42 async with httpx.AsyncClient() as client:
43 response = await client.get("https://bufo.zone", timeout=30.0)
44 response.raise_for_status()
45
46 soup = BeautifulSoup(response.text, "html.parser")
47
48 urls = set()
49 for img in soup.find_all("img"):
50 src = img.get("src", "")
51 if "all-the.bufo.zone" in src:
52 urls.add(src)
53
54 pattern = re.compile(
55 r"https://all-the\.bufo\.zone/[^\"'>\s]+\.(png|gif|jpg|jpeg|webp)"
56 )
57 for match in pattern.finditer(response.text):
58 urls.add(match.group(0))
59
60 console.print(f"[green]found {len(urls)} unique bufo images[/green]")
61 return urls
62
63
64async def download_bufo(client: httpx.AsyncClient, url: str, output_dir: Path) -> str | None:
65 """Download a single bufo and return filename"""
66 filename = url.split("/")[-1]
67 output_path = output_dir / filename
68
69 if output_path.exists() and output_path.stat().st_size > 0:
70 return filename
71
72 try:
73 response = await client.get(url, timeout=30.0)
74 response.raise_for_status()
75 output_path.write_bytes(response.content)
76 return filename
77 except Exception as e:
78 console.print(f"[red]error downloading {url}: {e}[/red]")
79 return None
80
81
82async def download_all_bufos(urls: set[str], output_dir: Path) -> List[Path]:
83 """Download all bufos concurrently"""
84 output_dir.mkdir(parents=True, exist_ok=True)
85
86 downloaded_files = []
87
88 async with httpx.AsyncClient() as client:
89 with Progress(
90 SpinnerColumn(),
91 TextColumn("[progress.description]{task.description}"),
92 console=console,
93 ) as progress:
94 task = progress.add_task(
95 f"[cyan]downloading {len(urls)} bufos...", total=len(urls)
96 )
97
98 batch_size = 10
99 urls_list = list(urls)
100
101 for i in range(0, len(urls_list), batch_size):
102 batch = urls_list[i : i + batch_size]
103 tasks = [download_bufo(client, url, output_dir) for url in batch]
104 results = await asyncio.gather(*tasks, return_exceptions=True)
105
106 for filename in results:
107 if filename and not isinstance(filename, Exception):
108 downloaded_files.append(output_dir / filename)
109
110 progress.update(task, advance=len(batch))
111
112 if i + batch_size < len(urls_list):
113 await asyncio.sleep(0.5)
114
115 console.print(f"[green]downloaded {len(downloaded_files)} bufos[/green]")
116 return downloaded_files
117
118
119async def embed_image(client: httpx.AsyncClient, image_path: Path, api_key: str, max_retries: int = 3) -> List[float] | None:
120 """Generate embedding for an image using Voyage AI with retry logic"""
121 for attempt in range(max_retries):
122 try:
123 image = Image.open(image_path)
124
125 # check if this is an animated image
126 is_animated = hasattr(image, 'n_frames') and image.n_frames > 1
127
128 if is_animated:
129 # for animated GIFs, extract multiple keyframes for temporal representation
130 num_frames = image.n_frames
131 # extract up to 5 evenly distributed frames
132 max_frames = min(5, num_frames)
133 frame_indices = [int(i * (num_frames - 1) / (max_frames - 1)) for i in range(max_frames)]
134
135 # extract each frame as base64 image
136 content = []
137 for frame_idx in frame_indices:
138 image.seek(frame_idx)
139 buffered = BytesIO()
140 image.convert("RGB").save(buffered, format="WEBP", lossless=True)
141 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
142 content.append({
143 "type": "image_base64",
144 "image_base64": f"data:image/webp;base64,{img_base64}",
145 })
146 else:
147 # for static images, just send the single image
148 buffered = BytesIO()
149 image.convert("RGB").save(buffered, format="WEBP", lossless=True)
150 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
151 content = [{
152 "type": "image_base64",
153 "image_base64": f"data:image/webp;base64,{img_base64}",
154 }]
155
156 response = await client.post(
157 "https://api.voyageai.com/v1/multimodalembeddings",
158 headers={
159 "Authorization": f"Bearer {api_key}",
160 "Content-Type": "application/json",
161 },
162 json={
163 "inputs": [{"content": content}],
164 "model": "voyage-multimodal-3",
165 },
166 timeout=60.0,
167 )
168 response.raise_for_status()
169 result = response.json()
170 return result["data"][0]["embedding"]
171 except httpx.HTTPStatusError as e:
172 if e.response.status_code == 429:
173 # rate limited - exponential backoff
174 wait_time = (2 ** attempt) * 2 # 2s, 4s, 8s
175 if attempt < max_retries - 1:
176 await asyncio.sleep(wait_time)
177 continue
178 # show actual error response for 400s
179 error_detail = e.response.text if e.response.status_code == 400 else str(e)
180 console.print(f"[red]error embedding {image_path.name} ({e.response.status_code}): {error_detail}[/red]")
181 return None
182 except Exception as e:
183 console.print(f"[red]error embedding {image_path.name}: {e}[/red]")
184 return None
185 return None
186
187
188async def generate_embeddings(
189 image_paths: List[Path], api_key: str
190) -> dict[str, List[float]]:
191 """Generate embeddings for all images with controlled concurrency"""
192 embeddings = {}
193
194 # limit to 50 concurrent requests to stay well under 2000/min rate limit
195 semaphore = asyncio.Semaphore(50)
196
197 async def embed_with_semaphore(client, image_path):
198 async with semaphore:
199 embedding = await embed_image(client, image_path, api_key)
200 return (image_path.name, embedding)
201
202 async with httpx.AsyncClient() as client:
203 with Progress(
204 SpinnerColumn(),
205 TextColumn("[progress.description]{task.description}"),
206 console=console,
207 ) as progress:
208 task = progress.add_task(
209 f"[cyan]generating embeddings for {len(image_paths)} images...",
210 total=len(image_paths),
211 )
212
213 # process all images concurrently with semaphore
214 tasks = [embed_with_semaphore(client, img) for img in image_paths]
215 results = await asyncio.gather(*tasks)
216
217 for name, embedding in results:
218 if embedding:
219 embeddings[name] = embedding
220 progress.update(task, advance=1)
221
222 console.print(f"[green]generated {len(embeddings)} embeddings[/green]")
223 return embeddings
224
225
226async def upload_to_turbopuffer(
227 embeddings: dict[str, List[float]],
228 bufo_urls: dict[str, str],
229 api_key: str,
230 namespace: str,
231):
232 """Upload embeddings to turbopuffer"""
233 console.print("[cyan]uploading to turbopuffer...[/cyan]")
234
235 ids = []
236 vectors = []
237 urls = []
238 names = []
239 filenames = []
240
241 for filename, embedding in embeddings.items():
242 # use hash as ID to stay under 64 byte limit
243 file_hash = hashlib.sha256(filename.encode()).hexdigest()[:16]
244 ids.append(file_hash)
245 vectors.append(embedding)
246 urls.append(bufo_urls.get(filename, ""))
247 names.append(filename.rsplit(".", 1)[0])
248 filenames.append(filename)
249
250 async with httpx.AsyncClient() as client:
251 response = await client.post(
252 f"https://api.turbopuffer.com/v1/vectors/{namespace}",
253 headers={
254 "Authorization": f"Bearer {api_key}",
255 "Content-Type": "application/json",
256 },
257 json={
258 "ids": ids,
259 "vectors": vectors,
260 "distance_metric": "cosine_distance",
261 "attributes": {
262 "url": urls,
263 "name": names,
264 "filename": filenames,
265 },
266 "schema": {
267 "name": {
268 "type": "string",
269 "full_text_search": True,
270 },
271 "filename": {
272 "type": "string",
273 "full_text_search": True,
274 },
275 },
276 },
277 timeout=120.0,
278 )
279 if response.status_code != 200:
280 console.print(f"[red]turbopuffer error: {response.text}[/red]")
281 response.raise_for_status()
282
283 console.print(
284 f"[green]uploaded {len(ids)} bufos to turbopuffer namespace '{namespace}'[/green]"
285 )
286
287
288async def main():
289 """Main function"""
290 console.print("[bold cyan]bufo ingestion pipeline[/bold cyan]\n")
291
292 voyage_api_key = os.getenv("VOYAGE_API_TOKEN")
293 if not voyage_api_key:
294 console.print("[red]VOYAGE_API_TOKEN not set[/red]")
295 return
296
297 tpuf_api_key = os.getenv("TURBOPUFFER_API_KEY")
298 if not tpuf_api_key:
299 console.print("[red]TURBOPUFFER_API_KEY not set[/red]")
300 return
301
302 tpuf_namespace = os.getenv("TURBOPUFFER_NAMESPACE", "bufos")
303
304 script_dir = Path(__file__).parent
305 project_root = script_dir.parent
306 output_dir = project_root / "data" / "bufos"
307
308 bufo_urls_raw = await fetch_bufo_urls()
309
310 bufo_urls_map = {url.split("/")[-1]: url for url in bufo_urls_raw}
311
312 image_paths = await download_all_bufos(bufo_urls_raw, output_dir)
313
314 embeddings = await generate_embeddings(image_paths, voyage_api_key)
315
316 await upload_to_turbopuffer(embeddings, bufo_urls_map, tpuf_api_key, tpuf_namespace)
317
318 console.print("\n[bold green]ingestion complete![/bold green]")
319
320
321if __name__ == "__main__":
322 asyncio.run(main())