semantic bufo search
find-bufo.com
bufo
1//! turbopuffer vector database implementation
2//!
3//! implements the `VectorStore` trait for turbopuffer's hybrid search API.
4
5use crate::providers::{SearchResult, VectorSearchError, VectorStore};
6use reqwest::Client;
7use serde::{Deserialize, Serialize};
8
9const TURBOPUFFER_API_BASE: &str = "https://api.turbopuffer.com/v1/vectors";
10
11/// raw response row from turbopuffer API
12#[derive(Debug, Deserialize, Serialize, Clone)]
13pub struct QueryRow {
14 pub id: String,
15 pub dist: f32,
16 pub attributes: serde_json::Map<String, serde_json::Value>,
17}
18
19impl From<QueryRow> for SearchResult {
20 fn from(row: QueryRow) -> Self {
21 let attributes = row
22 .attributes
23 .iter()
24 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
25 .collect();
26
27 SearchResult {
28 id: row.id,
29 score: row.dist,
30 attributes,
31 }
32 }
33}
34
35#[derive(Debug, Deserialize)]
36struct ErrorResponse {
37 error: String,
38 #[allow(dead_code)]
39 status: String,
40}
41
42/// turbopuffer vector database client
43///
44/// supports both ANN vector search and BM25 full-text search.
45#[derive(Clone)]
46pub struct TurbopufferStore {
47 client: Client,
48 api_key: String,
49 namespace: String,
50}
51
52impl TurbopufferStore {
53 pub fn new(api_key: String, namespace: String) -> Self {
54 Self {
55 client: Client::new(),
56 api_key,
57 namespace,
58 }
59 }
60
61 fn query_url(&self) -> String {
62 format!("{}/{}/query", TURBOPUFFER_API_BASE, self.namespace)
63 }
64
65 async fn execute_query(
66 &self,
67 request: serde_json::Value,
68 ) -> Result<Vec<QueryRow>, VectorSearchError> {
69 let response = self
70 .client
71 .post(self.query_url())
72 .header("Authorization", format!("Bearer {}", self.api_key))
73 .json(&request)
74 .send()
75 .await?;
76
77 if !response.status().is_success() {
78 let status = response.status().as_u16();
79 let body = response.text().await.unwrap_or_default();
80
81 // check for specific error types
82 if let Ok(error_resp) = serde_json::from_str::<ErrorResponse>(&body) {
83 if error_resp.error.contains("too long") && error_resp.error.contains("max 1024") {
84 return Err(VectorSearchError::QueryTooLong {
85 message: error_resp.error,
86 });
87 }
88 }
89
90 return Err(VectorSearchError::Api { status, body });
91 }
92
93 let body = response.text().await.map_err(|e| {
94 VectorSearchError::Other(anyhow::anyhow!("failed to read response: {}", e))
95 })?;
96
97 serde_json::from_str(&body)
98 .map_err(|e| VectorSearchError::Parse(format!("failed to parse response: {}", e)))
99 }
100}
101
102impl VectorStore for TurbopufferStore {
103 async fn search_by_vector(
104 &self,
105 embedding: &[f32],
106 top_k: usize,
107 ) -> Result<Vec<SearchResult>, VectorSearchError> {
108 let request = serde_json::json!({
109 "rank_by": ["vector", "ANN", embedding],
110 "top_k": top_k,
111 "include_attributes": ["url", "name", "filename"],
112 });
113
114 log::debug!(
115 "turbopuffer vector query: {}",
116 serde_json::to_string_pretty(&request).unwrap_or_default()
117 );
118
119 let rows = self.execute_query(request).await?;
120 Ok(rows.into_iter().map(SearchResult::from).collect())
121 }
122
123 async fn search_by_keyword(
124 &self,
125 query: &str,
126 top_k: usize,
127 ) -> Result<Vec<SearchResult>, VectorSearchError> {
128 let request = serde_json::json!({
129 "rank_by": ["name", "BM25", query],
130 "top_k": top_k,
131 "include_attributes": ["url", "name", "filename"],
132 });
133
134 log::debug!(
135 "turbopuffer BM25 query: {}",
136 serde_json::to_string_pretty(&request).unwrap_or_default()
137 );
138
139 let rows = self.execute_query(request).await?;
140
141 if let Some(first) = rows.first() {
142 log::info!(
143 "BM25 first result - id: {}, dist: {}, name: {:?}",
144 first.id,
145 first.dist,
146 first.attributes.get("name")
147 );
148 }
149
150 Ok(rows.into_iter().map(SearchResult::from).collect())
151 }
152
153 fn name(&self) -> &'static str {
154 "turbopuffer"
155 }
156}
157