Highly ambitious ATProtocol AppView service and sdks
1//! DataLoader implementation for batching database queries
2//!
3//! This module provides a DataLoader that batches multiple requests for records
4//! into single database queries, eliminating the N+1 query problem.
5
6use async_graphql::dataloader::{DataLoader as AsyncGraphQLDataLoader, Loader};
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use crate::database::Database;
11use crate::models::{IndexedRecord, WhereClause, WhereCondition};
12
13/// Key for batching record queries by collection and DID
14#[derive(Debug, Clone, Hash, Eq, PartialEq)]
15pub struct CollectionDidKey {
16 pub slice_uri: String,
17 pub collection: String,
18 pub did: String,
19}
20
21/// Loader for batching record queries by collection and DID
22pub struct CollectionDidLoader {
23 db: Database,
24}
25
26impl CollectionDidLoader {
27 pub fn new(db: Database) -> Self {
28 Self { db }
29 }
30}
31
32impl Loader<CollectionDidKey> for CollectionDidLoader {
33 type Value = Vec<IndexedRecord>;
34 type Error = Arc<String>;
35
36 async fn load(
37 &self,
38 keys: &[CollectionDidKey],
39 ) -> Result<HashMap<CollectionDidKey, Self::Value>, Self::Error> {
40 // Group keys by slice_uri and collection for optimal batching
41 let mut grouped: HashMap<(String, String), Vec<String>> = HashMap::new();
42
43 for key in keys {
44 grouped
45 .entry((key.slice_uri.clone(), key.collection.clone()))
46 .or_insert_with(Vec::new)
47 .push(key.did.clone());
48 }
49
50 let mut results: HashMap<CollectionDidKey, Vec<IndexedRecord>> = HashMap::new();
51
52 // Execute one query per (slice, collection) combination
53 for ((slice_uri, collection), dids) in grouped {
54 let mut where_clause = WhereClause {
55 conditions: HashMap::new(),
56 or_conditions: None,
57 and: None,
58 or: None,
59 };
60
61 // Filter by collection
62 where_clause.conditions.insert(
63 "collection".to_string(),
64 WhereCondition {
65 eq: Some(serde_json::Value::String(collection.clone())),
66 in_values: None,
67 contains: None,
68 fuzzy: None,
69 gt: None,
70 gte: None,
71 lt: None,
72 lte: None,
73 },
74 );
75
76 // Filter by DIDs using IN clause for batching
77 where_clause.conditions.insert(
78 "did".to_string(),
79 WhereCondition {
80 eq: None,
81 in_values: Some(
82 dids.iter()
83 .map(|did| serde_json::Value::String(did.clone()))
84 .collect(),
85 ),
86 contains: None,
87 fuzzy: None,
88 gt: None,
89 gte: None,
90 lt: None,
91 lte: None,
92 },
93 );
94
95 // Query database with no limit - load all records for batched filtering
96 match self
97 .db
98 .get_slice_collections_records(
99 &slice_uri,
100 None, // No limit - load all records for this DID
101 None, // cursor
102 None, // sort
103 Some(&where_clause),
104 )
105 .await
106 {
107 Ok((records, _cursor)) => {
108 // Group results by DID
109 for record in records {
110 let key = CollectionDidKey {
111 slice_uri: slice_uri.clone(),
112 collection: collection.clone(),
113 did: record.did.clone(),
114 };
115
116 // Convert Record to IndexedRecord
117 let indexed_record = IndexedRecord {
118 uri: record.uri,
119 cid: record.cid,
120 did: record.did,
121 collection: record.collection,
122 value: record.json,
123 indexed_at: record.indexed_at.to_rfc3339(),
124 };
125
126 results
127 .entry(key)
128 .or_insert_with(Vec::new)
129 .push(indexed_record);
130 }
131 }
132 Err(e) => {
133 tracing::error!(
134 "DataLoader batch query failed for {}/{}: {}",
135 slice_uri,
136 collection,
137 e
138 );
139 // Return empty results for failed queries rather than failing the entire batch
140 }
141 }
142 }
143
144 // Ensure all requested keys have an entry (even if empty)
145 for key in keys {
146 results.entry(key.clone()).or_insert_with(Vec::new);
147 }
148
149 Ok(results)
150 }
151}
152
153/// Key for batching record queries by collection and parent URI (for reverse joins)
154#[derive(Debug, Clone, Hash, Eq, PartialEq)]
155pub struct CollectionUriKey {
156 pub slice_uri: String,
157 pub collection: String,
158 pub parent_uri: String,
159 pub reference_field: String, // Field name that contains the reference (e.g., "subject")
160}
161
162/// Loader for batching record queries by collection and parent URI
163/// Used for reverse joins where we need to find records that reference a parent URI
164pub struct CollectionUriLoader {
165 db: Database,
166}
167
168impl CollectionUriLoader {
169 pub fn new(db: Database) -> Self {
170 Self { db }
171 }
172}
173
174impl Loader<CollectionUriKey> for CollectionUriLoader {
175 type Value = Vec<IndexedRecord>;
176 type Error = Arc<String>;
177
178 async fn load(
179 &self,
180 keys: &[CollectionUriKey],
181 ) -> Result<HashMap<CollectionUriKey, Self::Value>, Self::Error> {
182 // Group keys by (slice_uri, collection, reference_field) for optimal batching
183 let mut grouped: HashMap<(String, String, String), Vec<String>> = HashMap::new();
184
185 for key in keys {
186 grouped
187 .entry((
188 key.slice_uri.clone(),
189 key.collection.clone(),
190 key.reference_field.clone(),
191 ))
192 .or_insert_with(Vec::new)
193 .push(key.parent_uri.clone());
194 }
195
196 let mut results: HashMap<CollectionUriKey, Vec<IndexedRecord>> = HashMap::new();
197
198 // Execute one query per (slice, collection, reference_field) combination
199 for ((slice_uri, collection, reference_field), parent_uris) in grouped {
200 let mut where_clause = WhereClause {
201 conditions: HashMap::new(),
202 or_conditions: None,
203 and: None,
204 or: None,
205 };
206
207 // Filter by collection
208 where_clause.conditions.insert(
209 "collection".to_string(),
210 WhereCondition {
211 eq: Some(serde_json::Value::String(collection.clone())),
212 in_values: None,
213 contains: None,
214 fuzzy: None,
215 gt: None,
216 gte: None,
217 lt: None,
218 lte: None,
219 },
220 );
221
222 // Filter by parent URIs using IN clause on the reference field
223 // This queries: WHERE json->>'reference_field' IN (parent_uri1, parent_uri2, ...)
224 where_clause.conditions.insert(
225 reference_field.clone(),
226 WhereCondition {
227 eq: None,
228 in_values: Some(
229 parent_uris
230 .iter()
231 .map(|uri| serde_json::Value::String(uri.clone()))
232 .collect(),
233 ),
234 contains: None,
235 fuzzy: None,
236 gt: None,
237 gte: None,
238 lt: None,
239 lte: None,
240 },
241 );
242
243 // Query database with no limit - load all records for batched filtering
244 match self
245 .db
246 .get_slice_collections_records(
247 &slice_uri,
248 None, // No limit - load all records matching parent URIs
249 None, // cursor
250 None, // sort
251 Some(&where_clause),
252 )
253 .await
254 {
255 Ok((records, _cursor)) => {
256 // Group results by parent URI (extract from the reference field)
257 for record in records {
258 // Try to extract URI - could be plain string or strongRef object
259 let parent_uri = record.json.get(&reference_field).and_then(|v| {
260 // First try as plain string
261 if let Some(uri_str) = v.as_str() {
262 return Some(uri_str.to_string());
263 }
264 // Then try as strongRef
265 crate::graphql::dataloaders::extract_uri_from_strong_ref(v)
266 });
267
268 if let Some(parent_uri) = parent_uri {
269 let key = CollectionUriKey {
270 slice_uri: slice_uri.clone(),
271 collection: collection.clone(),
272 parent_uri: parent_uri.clone(),
273 reference_field: reference_field.clone(),
274 };
275
276 // Convert Record to IndexedRecord
277 let indexed_record = IndexedRecord {
278 uri: record.uri,
279 cid: record.cid,
280 did: record.did,
281 collection: record.collection,
282 value: record.json,
283 indexed_at: record.indexed_at.to_rfc3339(),
284 };
285
286 results
287 .entry(key)
288 .or_insert_with(Vec::new)
289 .push(indexed_record);
290 }
291 }
292 }
293 Err(e) => {
294 tracing::error!(
295 "CollectionUriLoader batch query failed for {}/{}: {}",
296 slice_uri,
297 collection,
298 e
299 );
300 // Return empty results for failed queries rather than failing the entire batch
301 }
302 }
303 }
304
305 // Ensure all requested keys have an entry (even if empty)
306 for key in keys {
307 results.entry(key.clone()).or_insert_with(Vec::new);
308 }
309
310 Ok(results)
311 }
312}
313
314/// Context data that includes the DataLoader
315#[derive(Clone)]
316pub struct GraphQLContext {
317 #[allow(dead_code)]
318 pub collection_did_loader: Arc<AsyncGraphQLDataLoader<CollectionDidLoader>>,
319 pub collection_uri_loader: Arc<AsyncGraphQLDataLoader<CollectionUriLoader>>,
320 pub auth_token: Option<String>,
321 pub auth_base_url: String,
322 pub auth_cache: Option<Arc<tokio::sync::Mutex<crate::cache::SliceCache>>>,
323}
324
325impl GraphQLContext {
326 pub fn with_auth(
327 db: Database,
328 auth_token: Option<String>,
329 auth_base_url: String,
330 auth_cache: Option<Arc<tokio::sync::Mutex<crate::cache::SliceCache>>>,
331 ) -> Self {
332 Self {
333 collection_did_loader: Arc::new(AsyncGraphQLDataLoader::new(
334 CollectionDidLoader::new(db.clone()),
335 tokio::spawn,
336 )),
337 collection_uri_loader: Arc::new(AsyncGraphQLDataLoader::new(
338 CollectionUriLoader::new(db),
339 tokio::spawn,
340 )),
341 auth_token,
342 auth_base_url,
343 auth_cache,
344 }
345 }
346}