semantic bufo search find-bufo.com
bufo
at feature/logfire-integration 322 lines 11 kB view raw
1#!/usr/bin/env python3 2# /// script 3# requires-python = ">=3.11" 4# dependencies = [ 5# "httpx", 6# "beautifulsoup4", 7# "rich", 8# "python-dotenv", 9# "pillow", 10# ] 11# /// 12""" 13Scrape all bufos from bufo.zone, generate embeddings, and upload to turbopuffer. 14""" 15 16import asyncio 17import base64 18import hashlib 19import os 20import re 21from io import BytesIO 22from pathlib import Path 23from typing import List 24 25import httpx 26from bs4 import BeautifulSoup 27from PIL import Image 28from rich.console import Console 29from rich.progress import Progress, SpinnerColumn, TextColumn 30from dotenv import load_dotenv 31 32console = Console() 33 34# Load .env from project root 35load_dotenv(Path(__file__).parent.parent / ".env") 36 37 38async def fetch_bufo_urls() -> set[str]: 39 """Fetch all unique bufo URLs from bufo.zone""" 40 console.print("[cyan]fetching bufo list from bufo.zone...[/cyan]") 41 42 async with httpx.AsyncClient() as client: 43 response = await client.get("https://bufo.zone", timeout=30.0) 44 response.raise_for_status() 45 46 soup = BeautifulSoup(response.text, "html.parser") 47 48 urls = set() 49 for img in soup.find_all("img"): 50 src = img.get("src", "") 51 if "all-the.bufo.zone" in src: 52 urls.add(src) 53 54 pattern = re.compile( 55 r"https://all-the\.bufo\.zone/[^\"'>\s]+\.(png|gif|jpg|jpeg|webp)" 56 ) 57 for match in pattern.finditer(response.text): 58 urls.add(match.group(0)) 59 60 console.print(f"[green]found {len(urls)} unique bufo images[/green]") 61 return urls 62 63 64async def download_bufo(client: httpx.AsyncClient, url: str, output_dir: Path) -> str | None: 65 """Download a single bufo and return filename""" 66 filename = url.split("/")[-1] 67 output_path = output_dir / filename 68 69 if output_path.exists() and output_path.stat().st_size > 0: 70 return filename 71 72 try: 73 response = await client.get(url, timeout=30.0) 74 response.raise_for_status() 75 output_path.write_bytes(response.content) 76 return filename 77 except Exception as e: 78 console.print(f"[red]error downloading {url}: {e}[/red]") 79 return None 80 81 82async def download_all_bufos(urls: set[str], output_dir: Path) -> List[Path]: 83 """Download all bufos concurrently""" 84 output_dir.mkdir(parents=True, exist_ok=True) 85 86 downloaded_files = [] 87 88 async with httpx.AsyncClient() as client: 89 with Progress( 90 SpinnerColumn(), 91 TextColumn("[progress.description]{task.description}"), 92 console=console, 93 ) as progress: 94 task = progress.add_task( 95 f"[cyan]downloading {len(urls)} bufos...", total=len(urls) 96 ) 97 98 batch_size = 10 99 urls_list = list(urls) 100 101 for i in range(0, len(urls_list), batch_size): 102 batch = urls_list[i : i + batch_size] 103 tasks = [download_bufo(client, url, output_dir) for url in batch] 104 results = await asyncio.gather(*tasks, return_exceptions=True) 105 106 for filename in results: 107 if filename and not isinstance(filename, Exception): 108 downloaded_files.append(output_dir / filename) 109 110 progress.update(task, advance=len(batch)) 111 112 if i + batch_size < len(urls_list): 113 await asyncio.sleep(0.5) 114 115 console.print(f"[green]downloaded {len(downloaded_files)} bufos[/green]") 116 return downloaded_files 117 118 119async def embed_image(client: httpx.AsyncClient, image_path: Path, api_key: str, max_retries: int = 3) -> List[float] | None: 120 """Generate embedding for an image using Voyage AI with retry logic""" 121 for attempt in range(max_retries): 122 try: 123 image = Image.open(image_path) 124 125 # check if this is an animated image 126 is_animated = hasattr(image, 'n_frames') and image.n_frames > 1 127 128 if is_animated: 129 # for animated GIFs, extract multiple keyframes for temporal representation 130 num_frames = image.n_frames 131 # extract up to 5 evenly distributed frames 132 max_frames = min(5, num_frames) 133 frame_indices = [int(i * (num_frames - 1) / (max_frames - 1)) for i in range(max_frames)] 134 135 # extract each frame as base64 image 136 content = [] 137 for frame_idx in frame_indices: 138 image.seek(frame_idx) 139 buffered = BytesIO() 140 image.convert("RGB").save(buffered, format="WEBP", lossless=True) 141 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") 142 content.append({ 143 "type": "image_base64", 144 "image_base64": f"data:image/webp;base64,{img_base64}", 145 }) 146 else: 147 # for static images, just send the single image 148 buffered = BytesIO() 149 image.convert("RGB").save(buffered, format="WEBP", lossless=True) 150 img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") 151 content = [{ 152 "type": "image_base64", 153 "image_base64": f"data:image/webp;base64,{img_base64}", 154 }] 155 156 response = await client.post( 157 "https://api.voyageai.com/v1/multimodalembeddings", 158 headers={ 159 "Authorization": f"Bearer {api_key}", 160 "Content-Type": "application/json", 161 }, 162 json={ 163 "inputs": [{"content": content}], 164 "model": "voyage-multimodal-3", 165 }, 166 timeout=60.0, 167 ) 168 response.raise_for_status() 169 result = response.json() 170 return result["data"][0]["embedding"] 171 except httpx.HTTPStatusError as e: 172 if e.response.status_code == 429: 173 # rate limited - exponential backoff 174 wait_time = (2 ** attempt) * 2 # 2s, 4s, 8s 175 if attempt < max_retries - 1: 176 await asyncio.sleep(wait_time) 177 continue 178 # show actual error response for 400s 179 error_detail = e.response.text if e.response.status_code == 400 else str(e) 180 console.print(f"[red]error embedding {image_path.name} ({e.response.status_code}): {error_detail}[/red]") 181 return None 182 except Exception as e: 183 console.print(f"[red]error embedding {image_path.name}: {e}[/red]") 184 return None 185 return None 186 187 188async def generate_embeddings( 189 image_paths: List[Path], api_key: str 190) -> dict[str, List[float]]: 191 """Generate embeddings for all images with controlled concurrency""" 192 embeddings = {} 193 194 # limit to 50 concurrent requests to stay well under 2000/min rate limit 195 semaphore = asyncio.Semaphore(50) 196 197 async def embed_with_semaphore(client, image_path): 198 async with semaphore: 199 embedding = await embed_image(client, image_path, api_key) 200 return (image_path.name, embedding) 201 202 async with httpx.AsyncClient() as client: 203 with Progress( 204 SpinnerColumn(), 205 TextColumn("[progress.description]{task.description}"), 206 console=console, 207 ) as progress: 208 task = progress.add_task( 209 f"[cyan]generating embeddings for {len(image_paths)} images...", 210 total=len(image_paths), 211 ) 212 213 # process all images concurrently with semaphore 214 tasks = [embed_with_semaphore(client, img) for img in image_paths] 215 results = await asyncio.gather(*tasks) 216 217 for name, embedding in results: 218 if embedding: 219 embeddings[name] = embedding 220 progress.update(task, advance=1) 221 222 console.print(f"[green]generated {len(embeddings)} embeddings[/green]") 223 return embeddings 224 225 226async def upload_to_turbopuffer( 227 embeddings: dict[str, List[float]], 228 bufo_urls: dict[str, str], 229 api_key: str, 230 namespace: str, 231): 232 """Upload embeddings to turbopuffer""" 233 console.print("[cyan]uploading to turbopuffer...[/cyan]") 234 235 ids = [] 236 vectors = [] 237 urls = [] 238 names = [] 239 filenames = [] 240 241 for filename, embedding in embeddings.items(): 242 # use hash as ID to stay under 64 byte limit 243 file_hash = hashlib.sha256(filename.encode()).hexdigest()[:16] 244 ids.append(file_hash) 245 vectors.append(embedding) 246 urls.append(bufo_urls.get(filename, "")) 247 names.append(filename.rsplit(".", 1)[0]) 248 filenames.append(filename) 249 250 async with httpx.AsyncClient() as client: 251 response = await client.post( 252 f"https://api.turbopuffer.com/v1/vectors/{namespace}", 253 headers={ 254 "Authorization": f"Bearer {api_key}", 255 "Content-Type": "application/json", 256 }, 257 json={ 258 "ids": ids, 259 "vectors": vectors, 260 "distance_metric": "cosine_distance", 261 "attributes": { 262 "url": urls, 263 "name": names, 264 "filename": filenames, 265 }, 266 "schema": { 267 "name": { 268 "type": "string", 269 "full_text_search": True, 270 }, 271 "filename": { 272 "type": "string", 273 "full_text_search": True, 274 }, 275 }, 276 }, 277 timeout=120.0, 278 ) 279 if response.status_code != 200: 280 console.print(f"[red]turbopuffer error: {response.text}[/red]") 281 response.raise_for_status() 282 283 console.print( 284 f"[green]uploaded {len(ids)} bufos to turbopuffer namespace '{namespace}'[/green]" 285 ) 286 287 288async def main(): 289 """Main function""" 290 console.print("[bold cyan]bufo ingestion pipeline[/bold cyan]\n") 291 292 voyage_api_key = os.getenv("VOYAGE_API_TOKEN") 293 if not voyage_api_key: 294 console.print("[red]VOYAGE_API_TOKEN not set[/red]") 295 return 296 297 tpuf_api_key = os.getenv("TURBOPUFFER_API_KEY") 298 if not tpuf_api_key: 299 console.print("[red]TURBOPUFFER_API_KEY not set[/red]") 300 return 301 302 tpuf_namespace = os.getenv("TURBOPUFFER_NAMESPACE", "bufos") 303 304 script_dir = Path(__file__).parent 305 project_root = script_dir.parent 306 output_dir = project_root / "data" / "bufos" 307 308 bufo_urls_raw = await fetch_bufo_urls() 309 310 bufo_urls_map = {url.split("/")[-1]: url for url in bufo_urls_raw} 311 312 image_paths = await download_all_bufos(bufo_urls_raw, output_dir) 313 314 embeddings = await generate_embeddings(image_paths, voyage_api_key) 315 316 await upload_to_turbopuffer(embeddings, bufo_urls_map, tpuf_api_key, tpuf_namespace) 317 318 console.print("\n[bold green]ingestion complete![/bold green]") 319 320 321if __name__ == "__main__": 322 asyncio.run(main())