semantic bufo search find-bufo.com
bufo
at main 157 lines 4.5 kB view raw
1//! turbopuffer vector database implementation 2//! 3//! implements the `VectorStore` trait for turbopuffer's hybrid search API. 4 5use crate::providers::{SearchResult, VectorSearchError, VectorStore}; 6use reqwest::Client; 7use serde::{Deserialize, Serialize}; 8 9const TURBOPUFFER_API_BASE: &str = "https://api.turbopuffer.com/v1/vectors"; 10 11/// raw response row from turbopuffer API 12#[derive(Debug, Deserialize, Serialize, Clone)] 13pub struct QueryRow { 14 pub id: String, 15 pub dist: f32, 16 pub attributes: serde_json::Map<String, serde_json::Value>, 17} 18 19impl From<QueryRow> for SearchResult { 20 fn from(row: QueryRow) -> Self { 21 let attributes = row 22 .attributes 23 .iter() 24 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) 25 .collect(); 26 27 SearchResult { 28 id: row.id, 29 score: row.dist, 30 attributes, 31 } 32 } 33} 34 35#[derive(Debug, Deserialize)] 36struct ErrorResponse { 37 error: String, 38 #[allow(dead_code)] 39 status: String, 40} 41 42/// turbopuffer vector database client 43/// 44/// supports both ANN vector search and BM25 full-text search. 45#[derive(Clone)] 46pub struct TurbopufferStore { 47 client: Client, 48 api_key: String, 49 namespace: String, 50} 51 52impl TurbopufferStore { 53 pub fn new(api_key: String, namespace: String) -> Self { 54 Self { 55 client: Client::new(), 56 api_key, 57 namespace, 58 } 59 } 60 61 fn query_url(&self) -> String { 62 format!("{}/{}/query", TURBOPUFFER_API_BASE, self.namespace) 63 } 64 65 async fn execute_query( 66 &self, 67 request: serde_json::Value, 68 ) -> Result<Vec<QueryRow>, VectorSearchError> { 69 let response = self 70 .client 71 .post(self.query_url()) 72 .header("Authorization", format!("Bearer {}", self.api_key)) 73 .json(&request) 74 .send() 75 .await?; 76 77 if !response.status().is_success() { 78 let status = response.status().as_u16(); 79 let body = response.text().await.unwrap_or_default(); 80 81 // check for specific error types 82 if let Ok(error_resp) = serde_json::from_str::<ErrorResponse>(&body) { 83 if error_resp.error.contains("too long") && error_resp.error.contains("max 1024") { 84 return Err(VectorSearchError::QueryTooLong { 85 message: error_resp.error, 86 }); 87 } 88 } 89 90 return Err(VectorSearchError::Api { status, body }); 91 } 92 93 let body = response.text().await.map_err(|e| { 94 VectorSearchError::Other(anyhow::anyhow!("failed to read response: {}", e)) 95 })?; 96 97 serde_json::from_str(&body) 98 .map_err(|e| VectorSearchError::Parse(format!("failed to parse response: {}", e))) 99 } 100} 101 102impl VectorStore for TurbopufferStore { 103 async fn search_by_vector( 104 &self, 105 embedding: &[f32], 106 top_k: usize, 107 ) -> Result<Vec<SearchResult>, VectorSearchError> { 108 let request = serde_json::json!({ 109 "rank_by": ["vector", "ANN", embedding], 110 "top_k": top_k, 111 "include_attributes": ["url", "name", "filename"], 112 }); 113 114 log::debug!( 115 "turbopuffer vector query: {}", 116 serde_json::to_string_pretty(&request).unwrap_or_default() 117 ); 118 119 let rows = self.execute_query(request).await?; 120 Ok(rows.into_iter().map(SearchResult::from).collect()) 121 } 122 123 async fn search_by_keyword( 124 &self, 125 query: &str, 126 top_k: usize, 127 ) -> Result<Vec<SearchResult>, VectorSearchError> { 128 let request = serde_json::json!({ 129 "rank_by": ["name", "BM25", query], 130 "top_k": top_k, 131 "include_attributes": ["url", "name", "filename"], 132 }); 133 134 log::debug!( 135 "turbopuffer BM25 query: {}", 136 serde_json::to_string_pretty(&request).unwrap_or_default() 137 ); 138 139 let rows = self.execute_query(request).await?; 140 141 if let Some(first) = rows.first() { 142 log::info!( 143 "BM25 first result - id: {}, dist: {}, name: {:?}", 144 first.id, 145 first.dist, 146 first.attributes.get("name") 147 ); 148 } 149 150 Ok(rows.into_iter().map(SearchResult::from).collect()) 151 } 152 153 fn name(&self) -> &'static str { 154 "turbopuffer" 155 } 156} 157