semantic bufo search
find-bufo.com
bufo
1#!/usr/bin/env python3
2# /// script
3# requires-python = ">=3.11"
4# dependencies = [
5# "httpx",
6# "beautifulsoup4",
7# "rich",
8# "python-dotenv",
9# "pillow",
10# ]
11# ///
12"""
13Scrape all bufos from bufo.zone, generate embeddings, and upload to turbopuffer.
14"""
15
16import asyncio
17import base64
18import hashlib
19import os
20import re
21from io import BytesIO
22from pathlib import Path
23from typing import List
24
25import httpx
26from bs4 import BeautifulSoup
27from PIL import Image
28from rich.console import Console
29from rich.progress import Progress, SpinnerColumn, TextColumn
30from dotenv import load_dotenv
31
32console = Console()
33
34# Load .env from project root
35load_dotenv(Path(__file__).parent.parent / ".env")
36
37
38async def fetch_bufo_urls() -> set[str]:
39 """Fetch all unique bufo URLs from bufo.zone"""
40 console.print("[cyan]fetching bufo list from bufo.zone...[/cyan]")
41
42 async with httpx.AsyncClient() as client:
43 response = await client.get("https://bufo.zone", timeout=30.0)
44 response.raise_for_status()
45
46 soup = BeautifulSoup(response.text, "html.parser")
47
48 urls = set()
49 for img in soup.find_all("img"):
50 src = img.get("src", "")
51 if "all-the.bufo.zone" in src:
52 urls.add(src)
53
54 pattern = re.compile(
55 r"https://all-the\.bufo\.zone/[^\"'>\s]+\.(png|gif|jpg|jpeg|webp)"
56 )
57 for match in pattern.finditer(response.text):
58 urls.add(match.group(0))
59
60 console.print(f"[green]found {len(urls)} unique bufo images[/green]")
61 return urls
62
63
64async def download_bufo(client: httpx.AsyncClient, url: str, output_dir: Path) -> str | None:
65 """Download a single bufo and return filename"""
66 filename = url.split("/")[-1]
67 output_path = output_dir / filename
68
69 if output_path.exists() and output_path.stat().st_size > 0:
70 return filename
71
72 try:
73 response = await client.get(url, timeout=30.0)
74 response.raise_for_status()
75 output_path.write_bytes(response.content)
76 return filename
77 except Exception as e:
78 console.print(f"[red]error downloading {url}: {e}[/red]")
79 return None
80
81
82async def download_all_bufos(urls: set[str], output_dir: Path) -> List[Path]:
83 """Download all bufos concurrently"""
84 output_dir.mkdir(parents=True, exist_ok=True)
85
86 downloaded_files = []
87
88 async with httpx.AsyncClient() as client:
89 with Progress(
90 SpinnerColumn(),
91 TextColumn("[progress.description]{task.description}"),
92 console=console,
93 ) as progress:
94 task = progress.add_task(
95 f"[cyan]downloading {len(urls)} bufos...", total=len(urls)
96 )
97
98 batch_size = 10
99 urls_list = list(urls)
100
101 for i in range(0, len(urls_list), batch_size):
102 batch = urls_list[i : i + batch_size]
103 tasks = [download_bufo(client, url, output_dir) for url in batch]
104 results = await asyncio.gather(*tasks, return_exceptions=True)
105
106 for filename in results:
107 if filename and not isinstance(filename, Exception):
108 downloaded_files.append(output_dir / filename)
109
110 progress.update(task, advance=len(batch))
111
112 if i + batch_size < len(urls_list):
113 await asyncio.sleep(0.5)
114
115 console.print(f"[green]downloaded {len(downloaded_files)} bufos[/green]")
116 return downloaded_files
117
118
119async def embed_image(client: httpx.AsyncClient, image_path: Path, api_key: str, max_retries: int = 3) -> List[float] | None:
120 """Generate embedding for an image using Voyage AI with retry logic"""
121 for attempt in range(max_retries):
122 try:
123 image = Image.open(image_path)
124
125 # check if this is an animated image
126 is_animated = hasattr(image, 'n_frames') and image.n_frames > 1
127
128 # extract semantic meaning from filename for early fusion
129 # convert "bufo-jumping-on-bed.png" -> "bufo jumping on bed"
130 filename_text = image_path.stem.replace("-", " ").replace("_", " ")
131
132 # start content array with filename text for early fusion
133 content = [{
134 "type": "text",
135 "text": filename_text
136 }]
137
138 if is_animated:
139 # for animated GIFs, extract multiple keyframes for temporal representation
140 num_frames = image.n_frames
141 # extract up to 5 evenly distributed frames
142 max_frames = min(5, num_frames)
143 frame_indices = [int(i * (num_frames - 1) / (max_frames - 1)) for i in range(max_frames)]
144
145 # add each frame to content array
146 for frame_idx in frame_indices:
147 image.seek(frame_idx)
148 buffered = BytesIO()
149 image.convert("RGB").save(buffered, format="WEBP", lossless=True)
150 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
151 content.append({
152 "type": "image_base64",
153 "image_base64": f"data:image/webp;base64,{img_base64}",
154 })
155 else:
156 # for static images, add single image to content array
157 buffered = BytesIO()
158 image.convert("RGB").save(buffered, format="WEBP", lossless=True)
159 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
160 content.append({
161 "type": "image_base64",
162 "image_base64": f"data:image/webp;base64,{img_base64}",
163 })
164
165 response = await client.post(
166 "https://api.voyageai.com/v1/multimodalembeddings",
167 headers={
168 "Authorization": f"Bearer {api_key}",
169 "Content-Type": "application/json",
170 },
171 json={
172 "inputs": [{"content": content}],
173 "model": "voyage-multimodal-3",
174 "input_type": "document",
175 },
176 timeout=60.0,
177 )
178 response.raise_for_status()
179 result = response.json()
180 return result["data"][0]["embedding"]
181 except httpx.HTTPStatusError as e:
182 if e.response.status_code == 429:
183 # rate limited - exponential backoff
184 wait_time = (2 ** attempt) * 2 # 2s, 4s, 8s
185 if attempt < max_retries - 1:
186 await asyncio.sleep(wait_time)
187 continue
188 # show actual error response for 400s
189 error_detail = e.response.text if e.response.status_code == 400 else str(e)
190 console.print(f"[red]error embedding {image_path.name} ({e.response.status_code}): {error_detail}[/red]")
191 return None
192 except Exception as e:
193 console.print(f"[red]error embedding {image_path.name}: {e}[/red]")
194 return None
195 return None
196
197
198async def generate_embeddings(
199 image_paths: List[Path], api_key: str
200) -> dict[str, List[float]]:
201 """Generate embeddings for all images with controlled concurrency"""
202 embeddings = {}
203
204 # limit to 50 concurrent requests to stay well under 2000/min rate limit
205 semaphore = asyncio.Semaphore(50)
206
207 async def embed_with_semaphore(client, image_path):
208 async with semaphore:
209 embedding = await embed_image(client, image_path, api_key)
210 return (image_path.name, embedding)
211
212 async with httpx.AsyncClient() as client:
213 with Progress(
214 SpinnerColumn(),
215 TextColumn("[progress.description]{task.description}"),
216 console=console,
217 ) as progress:
218 task = progress.add_task(
219 f"[cyan]generating embeddings for {len(image_paths)} images...",
220 total=len(image_paths),
221 )
222
223 # process all images concurrently with semaphore
224 tasks = [embed_with_semaphore(client, img) for img in image_paths]
225 results = await asyncio.gather(*tasks)
226
227 for name, embedding in results:
228 if embedding:
229 embeddings[name] = embedding
230 progress.update(task, advance=1)
231
232 console.print(f"[green]generated {len(embeddings)} embeddings[/green]")
233 return embeddings
234
235
236async def upload_to_turbopuffer(
237 embeddings: dict[str, List[float]],
238 bufo_urls: dict[str, str],
239 api_key: str,
240 namespace: str,
241):
242 """Upload embeddings to turbopuffer"""
243 console.print("[cyan]uploading to turbopuffer...[/cyan]")
244
245 ids = []
246 vectors = []
247 urls = []
248 names = []
249 filenames = []
250
251 for filename, embedding in embeddings.items():
252 # use hash as ID to stay under 64 byte limit
253 file_hash = hashlib.sha256(filename.encode()).hexdigest()[:16]
254 ids.append(file_hash)
255 vectors.append(embedding)
256 urls.append(bufo_urls.get(filename, ""))
257 names.append(filename.rsplit(".", 1)[0])
258 filenames.append(filename)
259
260 async with httpx.AsyncClient() as client:
261 response = await client.post(
262 f"https://api.turbopuffer.com/v1/vectors/{namespace}",
263 headers={
264 "Authorization": f"Bearer {api_key}",
265 "Content-Type": "application/json",
266 },
267 json={
268 "ids": ids,
269 "vectors": vectors,
270 "distance_metric": "cosine_distance",
271 "attributes": {
272 "url": urls,
273 "name": names,
274 "filename": filenames,
275 },
276 "schema": {
277 "name": {
278 "type": "string",
279 "full_text_search": True,
280 },
281 "filename": {
282 "type": "string",
283 "full_text_search": True,
284 },
285 },
286 },
287 timeout=120.0,
288 )
289 if response.status_code != 200:
290 console.print(f"[red]turbopuffer error: {response.text}[/red]")
291 response.raise_for_status()
292
293 console.print(
294 f"[green]uploaded {len(ids)} bufos to turbopuffer namespace '{namespace}'[/green]"
295 )
296
297
298async def main():
299 """Main function"""
300 console.print("[bold cyan]bufo ingestion pipeline[/bold cyan]\n")
301
302 voyage_api_key = os.getenv("VOYAGE_API_TOKEN")
303 if not voyage_api_key:
304 console.print("[red]VOYAGE_API_TOKEN not set[/red]")
305 return
306
307 tpuf_api_key = os.getenv("TURBOPUFFER_API_KEY")
308 if not tpuf_api_key:
309 console.print("[red]TURBOPUFFER_API_KEY not set[/red]")
310 return
311
312 tpuf_namespace = os.getenv("TURBOPUFFER_NAMESPACE", "bufos")
313
314 script_dir = Path(__file__).parent
315 project_root = script_dir.parent
316 output_dir = project_root / "data" / "bufos"
317
318 bufo_urls_raw = await fetch_bufo_urls()
319
320 bufo_urls_map = {url.split("/")[-1]: url for url in bufo_urls_raw}
321
322 image_paths = await download_all_bufos(bufo_urls_raw, output_dir)
323
324 embeddings = await generate_embeddings(image_paths, voyage_api_key)
325
326 await upload_to_turbopuffer(embeddings, bufo_urls_map, tpuf_api_key, tpuf_namespace)
327
328 console.print("\n[bold green]ingestion complete![/bold green]")
329
330
331if __name__ == "__main__":
332 asyncio.run(main())