Highly ambitious ATProtocol AppView service and sdks
at main 346 lines 12 kB view raw
1//! DataLoader implementation for batching database queries 2//! 3//! This module provides a DataLoader that batches multiple requests for records 4//! into single database queries, eliminating the N+1 query problem. 5 6use async_graphql::dataloader::{DataLoader as AsyncGraphQLDataLoader, Loader}; 7use std::collections::HashMap; 8use std::sync::Arc; 9 10use crate::database::Database; 11use crate::models::{IndexedRecord, WhereClause, WhereCondition}; 12 13/// Key for batching record queries by collection and DID 14#[derive(Debug, Clone, Hash, Eq, PartialEq)] 15pub struct CollectionDidKey { 16 pub slice_uri: String, 17 pub collection: String, 18 pub did: String, 19} 20 21/// Loader for batching record queries by collection and DID 22pub struct CollectionDidLoader { 23 db: Database, 24} 25 26impl CollectionDidLoader { 27 pub fn new(db: Database) -> Self { 28 Self { db } 29 } 30} 31 32impl Loader<CollectionDidKey> for CollectionDidLoader { 33 type Value = Vec<IndexedRecord>; 34 type Error = Arc<String>; 35 36 async fn load( 37 &self, 38 keys: &[CollectionDidKey], 39 ) -> Result<HashMap<CollectionDidKey, Self::Value>, Self::Error> { 40 // Group keys by slice_uri and collection for optimal batching 41 let mut grouped: HashMap<(String, String), Vec<String>> = HashMap::new(); 42 43 for key in keys { 44 grouped 45 .entry((key.slice_uri.clone(), key.collection.clone())) 46 .or_insert_with(Vec::new) 47 .push(key.did.clone()); 48 } 49 50 let mut results: HashMap<CollectionDidKey, Vec<IndexedRecord>> = HashMap::new(); 51 52 // Execute one query per (slice, collection) combination 53 for ((slice_uri, collection), dids) in grouped { 54 let mut where_clause = WhereClause { 55 conditions: HashMap::new(), 56 or_conditions: None, 57 and: None, 58 or: None, 59 }; 60 61 // Filter by collection 62 where_clause.conditions.insert( 63 "collection".to_string(), 64 WhereCondition { 65 eq: Some(serde_json::Value::String(collection.clone())), 66 in_values: None, 67 contains: None, 68 fuzzy: None, 69 gt: None, 70 gte: None, 71 lt: None, 72 lte: None, 73 }, 74 ); 75 76 // Filter by DIDs using IN clause for batching 77 where_clause.conditions.insert( 78 "did".to_string(), 79 WhereCondition { 80 eq: None, 81 in_values: Some( 82 dids.iter() 83 .map(|did| serde_json::Value::String(did.clone())) 84 .collect(), 85 ), 86 contains: None, 87 fuzzy: None, 88 gt: None, 89 gte: None, 90 lt: None, 91 lte: None, 92 }, 93 ); 94 95 // Query database with no limit - load all records for batched filtering 96 match self 97 .db 98 .get_slice_collections_records( 99 &slice_uri, 100 None, // No limit - load all records for this DID 101 None, // cursor 102 None, // sort 103 Some(&where_clause), 104 ) 105 .await 106 { 107 Ok((records, _cursor)) => { 108 // Group results by DID 109 for record in records { 110 let key = CollectionDidKey { 111 slice_uri: slice_uri.clone(), 112 collection: collection.clone(), 113 did: record.did.clone(), 114 }; 115 116 // Convert Record to IndexedRecord 117 let indexed_record = IndexedRecord { 118 uri: record.uri, 119 cid: record.cid, 120 did: record.did, 121 collection: record.collection, 122 value: record.json, 123 indexed_at: record.indexed_at.to_rfc3339(), 124 }; 125 126 results 127 .entry(key) 128 .or_insert_with(Vec::new) 129 .push(indexed_record); 130 } 131 } 132 Err(e) => { 133 tracing::error!( 134 "DataLoader batch query failed for {}/{}: {}", 135 slice_uri, 136 collection, 137 e 138 ); 139 // Return empty results for failed queries rather than failing the entire batch 140 } 141 } 142 } 143 144 // Ensure all requested keys have an entry (even if empty) 145 for key in keys { 146 results.entry(key.clone()).or_insert_with(Vec::new); 147 } 148 149 Ok(results) 150 } 151} 152 153/// Key for batching record queries by collection and parent URI (for reverse joins) 154#[derive(Debug, Clone, Hash, Eq, PartialEq)] 155pub struct CollectionUriKey { 156 pub slice_uri: String, 157 pub collection: String, 158 pub parent_uri: String, 159 pub reference_field: String, // Field name that contains the reference (e.g., "subject") 160} 161 162/// Loader for batching record queries by collection and parent URI 163/// Used for reverse joins where we need to find records that reference a parent URI 164pub struct CollectionUriLoader { 165 db: Database, 166} 167 168impl CollectionUriLoader { 169 pub fn new(db: Database) -> Self { 170 Self { db } 171 } 172} 173 174impl Loader<CollectionUriKey> for CollectionUriLoader { 175 type Value = Vec<IndexedRecord>; 176 type Error = Arc<String>; 177 178 async fn load( 179 &self, 180 keys: &[CollectionUriKey], 181 ) -> Result<HashMap<CollectionUriKey, Self::Value>, Self::Error> { 182 // Group keys by (slice_uri, collection, reference_field) for optimal batching 183 let mut grouped: HashMap<(String, String, String), Vec<String>> = HashMap::new(); 184 185 for key in keys { 186 grouped 187 .entry(( 188 key.slice_uri.clone(), 189 key.collection.clone(), 190 key.reference_field.clone(), 191 )) 192 .or_insert_with(Vec::new) 193 .push(key.parent_uri.clone()); 194 } 195 196 let mut results: HashMap<CollectionUriKey, Vec<IndexedRecord>> = HashMap::new(); 197 198 // Execute one query per (slice, collection, reference_field) combination 199 for ((slice_uri, collection, reference_field), parent_uris) in grouped { 200 let mut where_clause = WhereClause { 201 conditions: HashMap::new(), 202 or_conditions: None, 203 and: None, 204 or: None, 205 }; 206 207 // Filter by collection 208 where_clause.conditions.insert( 209 "collection".to_string(), 210 WhereCondition { 211 eq: Some(serde_json::Value::String(collection.clone())), 212 in_values: None, 213 contains: None, 214 fuzzy: None, 215 gt: None, 216 gte: None, 217 lt: None, 218 lte: None, 219 }, 220 ); 221 222 // Filter by parent URIs using IN clause on the reference field 223 // This queries: WHERE json->>'reference_field' IN (parent_uri1, parent_uri2, ...) 224 where_clause.conditions.insert( 225 reference_field.clone(), 226 WhereCondition { 227 eq: None, 228 in_values: Some( 229 parent_uris 230 .iter() 231 .map(|uri| serde_json::Value::String(uri.clone())) 232 .collect(), 233 ), 234 contains: None, 235 fuzzy: None, 236 gt: None, 237 gte: None, 238 lt: None, 239 lte: None, 240 }, 241 ); 242 243 // Query database with no limit - load all records for batched filtering 244 match self 245 .db 246 .get_slice_collections_records( 247 &slice_uri, 248 None, // No limit - load all records matching parent URIs 249 None, // cursor 250 None, // sort 251 Some(&where_clause), 252 ) 253 .await 254 { 255 Ok((records, _cursor)) => { 256 // Group results by parent URI (extract from the reference field) 257 for record in records { 258 // Try to extract URI - could be plain string or strongRef object 259 let parent_uri = record.json.get(&reference_field).and_then(|v| { 260 // First try as plain string 261 if let Some(uri_str) = v.as_str() { 262 return Some(uri_str.to_string()); 263 } 264 // Then try as strongRef 265 crate::graphql::dataloaders::extract_uri_from_strong_ref(v) 266 }); 267 268 if let Some(parent_uri) = parent_uri { 269 let key = CollectionUriKey { 270 slice_uri: slice_uri.clone(), 271 collection: collection.clone(), 272 parent_uri: parent_uri.clone(), 273 reference_field: reference_field.clone(), 274 }; 275 276 // Convert Record to IndexedRecord 277 let indexed_record = IndexedRecord { 278 uri: record.uri, 279 cid: record.cid, 280 did: record.did, 281 collection: record.collection, 282 value: record.json, 283 indexed_at: record.indexed_at.to_rfc3339(), 284 }; 285 286 results 287 .entry(key) 288 .or_insert_with(Vec::new) 289 .push(indexed_record); 290 } 291 } 292 } 293 Err(e) => { 294 tracing::error!( 295 "CollectionUriLoader batch query failed for {}/{}: {}", 296 slice_uri, 297 collection, 298 e 299 ); 300 // Return empty results for failed queries rather than failing the entire batch 301 } 302 } 303 } 304 305 // Ensure all requested keys have an entry (even if empty) 306 for key in keys { 307 results.entry(key.clone()).or_insert_with(Vec::new); 308 } 309 310 Ok(results) 311 } 312} 313 314/// Context data that includes the DataLoader 315#[derive(Clone)] 316pub struct GraphQLContext { 317 #[allow(dead_code)] 318 pub collection_did_loader: Arc<AsyncGraphQLDataLoader<CollectionDidLoader>>, 319 pub collection_uri_loader: Arc<AsyncGraphQLDataLoader<CollectionUriLoader>>, 320 pub auth_token: Option<String>, 321 pub auth_base_url: String, 322 pub auth_cache: Option<Arc<tokio::sync::Mutex<crate::cache::SliceCache>>>, 323} 324 325impl GraphQLContext { 326 pub fn with_auth( 327 db: Database, 328 auth_token: Option<String>, 329 auth_base_url: String, 330 auth_cache: Option<Arc<tokio::sync::Mutex<crate::cache::SliceCache>>>, 331 ) -> Self { 332 Self { 333 collection_did_loader: Arc::new(AsyncGraphQLDataLoader::new( 334 CollectionDidLoader::new(db.clone()), 335 tokio::spawn, 336 )), 337 collection_uri_loader: Arc::new(AsyncGraphQLDataLoader::new( 338 CollectionUriLoader::new(db), 339 tokio::spawn, 340 )), 341 auth_token, 342 auth_base_url, 343 auth_cache, 344 } 345 } 346}