semantic bufo search find-bufo.com
bufo
at main 332 lines 12 kB view raw
1#!/usr/bin/env python3 2# /// script 3# requires-python = ">=3.11" 4# dependencies = [ 5# "httpx", 6# "beautifulsoup4", 7# "rich", 8# "python-dotenv", 9# "pillow", 10# ] 11# /// 12""" 13Scrape all bufos from bufo.zone, generate embeddings, and upload to turbopuffer. 14""" 15 16import asyncio 17import base64 18import hashlib 19import os 20import re 21from io import BytesIO 22from pathlib import Path 23from typing import List 24 25import httpx 26from bs4 import BeautifulSoup 27from PIL import Image 28from rich.console import Console 29from rich.progress import Progress, SpinnerColumn, TextColumn 30from dotenv import load_dotenv 31 32console = Console() 33 34# Load .env from project root 35load_dotenv(Path(__file__).parent.parent / ".env") 36 37 38async def fetch_bufo_urls() -> set[str]: 39 """Fetch all unique bufo URLs from bufo.zone""" 40 console.print("[cyan]fetching bufo list from bufo.zone...[/cyan]") 41 42 async with httpx.AsyncClient() as client: 43 response = await client.get("https://bufo.zone", timeout=30.0) 44 response.raise_for_status() 45 46 soup = BeautifulSoup(response.text, "html.parser") 47 48 urls = set() 49 for img in soup.find_all("img"): 50 src = img.get("src", "") 51 if "all-the.bufo.zone" in src: 52 urls.add(src) 53 54 pattern = re.compile( 55 r"https://all-the\.bufo\.zone/[^\"'>\s]+\.(png|gif|jpg|jpeg|webp)" 56 ) 57 for match in pattern.finditer(response.text): 58 urls.add(match.group(0)) 59 60 console.print(f"[green]found {len(urls)} unique bufo images[/green]") 61 return urls 62 63 64async def download_bufo(client: httpx.AsyncClient, url: str, output_dir: Path) -> str | None: 65 """Download a single bufo and return filename""" 66 filename = url.split("/")[-1] 67 output_path = output_dir / filename 68 69 if output_path.exists() and output_path.stat().st_size > 0: 70 return filename 71 72 try: 73 response = await client.get(url, timeout=30.0) 74 response.raise_for_status() 75 output_path.write_bytes(response.content) 76 return filename 77 except Exception as e: 78 console.print(f"[red]error downloading {url}: {e}[/red]") 79 return None 80 81 82async def download_all_bufos(urls: set[str], output_dir: Path) -> List[Path]: 83 """Download all bufos concurrently""" 84 output_dir.mkdir(parents=True, exist_ok=True) 85 86 downloaded_files = [] 87 88 async with httpx.AsyncClient() as client: 89 with Progress( 90 SpinnerColumn(), 91 TextColumn("[progress.description]{task.description}"), 92 console=console, 93 ) as progress: 94 task = progress.add_task( 95 f"[cyan]downloading {len(urls)} bufos...", total=len(urls) 96 ) 97 98 batch_size = 10 99 urls_list = list(urls) 100 101 for i in range(0, len(urls_list), batch_size): 102 batch = urls_list[i : i + batch_size] 103 tasks = [download_bufo(client, url, output_dir) for url in batch] 104 results = await asyncio.gather(*tasks, return_exceptions=True) 105 106 for filename in results: 107 if filename and not isinstance(filename, Exception): 108 downloaded_files.append(output_dir / filename) 109 110 progress.update(task, advance=len(batch)) 111 112 if i + batch_size < len(urls_list): 113 await asyncio.sleep(0.5) 114 115 console.print(f"[green]downloaded {len(downloaded_files)} bufos[/green]") 116 return downloaded_files 117 118 119async def embed_image(client: httpx.AsyncClient, image_path: Path, api_key: str, max_retries: int = 3) -> List[float] | None: 120 """Generate embedding for an image using Voyage AI with retry logic""" 121 for attempt in range(max_retries): 122 try: 123 image = Image.open(image_path) 124 125 # check if this is an animated image 126 is_animated = hasattr(image, 'n_frames') and image.n_frames > 1 127 128 # extract semantic meaning from filename for early fusion 129 # convert "bufo-jumping-on-bed.png" -> "bufo jumping on bed" 130 filename_text = image_path.stem.replace("-", " ").replace("_", " ") 131 132 # start content array with filename text for early fusion 133 content = [{ 134 "type": "text", 135 "text": filename_text 136 }] 137 138 if is_animated: 139 # for animated GIFs, extract multiple keyframes for temporal representation 140 num_frames = image.n_frames 141 # extract up to 5 evenly distributed frames 142 max_frames = min(5, num_frames) 143 frame_indices = [int(i * (num_frames - 1) / (max_frames - 1)) for i in range(max_frames)] 144 145 # add each frame to content array 146 for frame_idx in frame_indices: 147 image.seek(frame_idx) 148 buffered = BytesIO() 149 image.convert("RGB").save(buffered, format="WEBP", lossless=True) 150 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") 151 content.append({ 152 "type": "image_base64", 153 "image_base64": f"data:image/webp;base64,{img_base64}", 154 }) 155 else: 156 # for static images, add single image to content array 157 buffered = BytesIO() 158 image.convert("RGB").save(buffered, format="WEBP", lossless=True) 159 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") 160 content.append({ 161 "type": "image_base64", 162 "image_base64": f"data:image/webp;base64,{img_base64}", 163 }) 164 165 response = await client.post( 166 "https://api.voyageai.com/v1/multimodalembeddings", 167 headers={ 168 "Authorization": f"Bearer {api_key}", 169 "Content-Type": "application/json", 170 }, 171 json={ 172 "inputs": [{"content": content}], 173 "model": "voyage-multimodal-3", 174 "input_type": "document", 175 }, 176 timeout=60.0, 177 ) 178 response.raise_for_status() 179 result = response.json() 180 return result["data"][0]["embedding"] 181 except httpx.HTTPStatusError as e: 182 if e.response.status_code == 429: 183 # rate limited - exponential backoff 184 wait_time = (2 ** attempt) * 2 # 2s, 4s, 8s 185 if attempt < max_retries - 1: 186 await asyncio.sleep(wait_time) 187 continue 188 # show actual error response for 400s 189 error_detail = e.response.text if e.response.status_code == 400 else str(e) 190 console.print(f"[red]error embedding {image_path.name} ({e.response.status_code}): {error_detail}[/red]") 191 return None 192 except Exception as e: 193 console.print(f"[red]error embedding {image_path.name}: {e}[/red]") 194 return None 195 return None 196 197 198async def generate_embeddings( 199 image_paths: List[Path], api_key: str 200) -> dict[str, List[float]]: 201 """Generate embeddings for all images with controlled concurrency""" 202 embeddings = {} 203 204 # limit to 50 concurrent requests to stay well under 2000/min rate limit 205 semaphore = asyncio.Semaphore(50) 206 207 async def embed_with_semaphore(client, image_path): 208 async with semaphore: 209 embedding = await embed_image(client, image_path, api_key) 210 return (image_path.name, embedding) 211 212 async with httpx.AsyncClient() as client: 213 with Progress( 214 SpinnerColumn(), 215 TextColumn("[progress.description]{task.description}"), 216 console=console, 217 ) as progress: 218 task = progress.add_task( 219 f"[cyan]generating embeddings for {len(image_paths)} images...", 220 total=len(image_paths), 221 ) 222 223 # process all images concurrently with semaphore 224 tasks = [embed_with_semaphore(client, img) for img in image_paths] 225 results = await asyncio.gather(*tasks) 226 227 for name, embedding in results: 228 if embedding: 229 embeddings[name] = embedding 230 progress.update(task, advance=1) 231 232 console.print(f"[green]generated {len(embeddings)} embeddings[/green]") 233 return embeddings 234 235 236async def upload_to_turbopuffer( 237 embeddings: dict[str, List[float]], 238 bufo_urls: dict[str, str], 239 api_key: str, 240 namespace: str, 241): 242 """Upload embeddings to turbopuffer""" 243 console.print("[cyan]uploading to turbopuffer...[/cyan]") 244 245 ids = [] 246 vectors = [] 247 urls = [] 248 names = [] 249 filenames = [] 250 251 for filename, embedding in embeddings.items(): 252 # use hash as ID to stay under 64 byte limit 253 file_hash = hashlib.sha256(filename.encode()).hexdigest()[:16] 254 ids.append(file_hash) 255 vectors.append(embedding) 256 urls.append(bufo_urls.get(filename, "")) 257 names.append(filename.rsplit(".", 1)[0]) 258 filenames.append(filename) 259 260 async with httpx.AsyncClient() as client: 261 response = await client.post( 262 f"https://api.turbopuffer.com/v1/vectors/{namespace}", 263 headers={ 264 "Authorization": f"Bearer {api_key}", 265 "Content-Type": "application/json", 266 }, 267 json={ 268 "ids": ids, 269 "vectors": vectors, 270 "distance_metric": "cosine_distance", 271 "attributes": { 272 "url": urls, 273 "name": names, 274 "filename": filenames, 275 }, 276 "schema": { 277 "name": { 278 "type": "string", 279 "full_text_search": True, 280 }, 281 "filename": { 282 "type": "string", 283 "full_text_search": True, 284 }, 285 }, 286 }, 287 timeout=120.0, 288 ) 289 if response.status_code != 200: 290 console.print(f"[red]turbopuffer error: {response.text}[/red]") 291 response.raise_for_status() 292 293 console.print( 294 f"[green]uploaded {len(ids)} bufos to turbopuffer namespace '{namespace}'[/green]" 295 ) 296 297 298async def main(): 299 """Main function""" 300 console.print("[bold cyan]bufo ingestion pipeline[/bold cyan]\n") 301 302 voyage_api_key = os.getenv("VOYAGE_API_TOKEN") 303 if not voyage_api_key: 304 console.print("[red]VOYAGE_API_TOKEN not set[/red]") 305 return 306 307 tpuf_api_key = os.getenv("TURBOPUFFER_API_KEY") 308 if not tpuf_api_key: 309 console.print("[red]TURBOPUFFER_API_KEY not set[/red]") 310 return 311 312 tpuf_namespace = os.getenv("TURBOPUFFER_NAMESPACE", "bufos") 313 314 script_dir = Path(__file__).parent 315 project_root = script_dir.parent 316 output_dir = project_root / "data" / "bufos" 317 318 bufo_urls_raw = await fetch_bufo_urls() 319 320 bufo_urls_map = {url.split("/")[-1]: url for url in bufo_urls_raw} 321 322 image_paths = await download_all_bufos(bufo_urls_raw, output_dir) 323 324 embeddings = await generate_embeddings(image_paths, voyage_api_key) 325 326 await upload_to_turbopuffer(embeddings, bufo_urls_map, tpuf_api_key, tpuf_namespace) 327 328 console.print("\n[bold green]ingestion complete![/bold green]") 329 330 331if __name__ == "__main__": 332 asyncio.run(main())