//! DataLoader implementation for batching database queries //! //! This module provides a DataLoader that batches multiple requests for records //! into single database queries, eliminating the N+1 query problem. use async_graphql::dataloader::{DataLoader as AsyncGraphQLDataLoader, Loader}; use std::collections::HashMap; use std::sync::Arc; use crate::database::Database; use crate::models::{IndexedRecord, WhereClause, WhereCondition}; /// Key for batching record queries by collection and DID #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub struct CollectionDidKey { pub slice_uri: String, pub collection: String, pub did: String, } /// Loader for batching record queries by collection and DID pub struct CollectionDidLoader { db: Database, } impl CollectionDidLoader { pub fn new(db: Database) -> Self { Self { db } } } impl Loader for CollectionDidLoader { type Value = Vec; type Error = Arc; async fn load( &self, keys: &[CollectionDidKey], ) -> Result, Self::Error> { // Group keys by slice_uri and collection for optimal batching let mut grouped: HashMap<(String, String), Vec> = HashMap::new(); for key in keys { grouped .entry((key.slice_uri.clone(), key.collection.clone())) .or_insert_with(Vec::new) .push(key.did.clone()); } let mut results: HashMap> = HashMap::new(); // Execute one query per (slice, collection) combination for ((slice_uri, collection), dids) in grouped { let mut where_clause = WhereClause { conditions: HashMap::new(), or_conditions: None, and: None, or: None, }; // Filter by collection where_clause.conditions.insert( "collection".to_string(), WhereCondition { eq: Some(serde_json::Value::String(collection.clone())), in_values: None, contains: None, fuzzy: None, gt: None, gte: None, lt: None, lte: None, }, ); // Filter by DIDs using IN clause for batching where_clause.conditions.insert( "did".to_string(), WhereCondition { eq: None, in_values: Some( dids.iter() .map(|did| serde_json::Value::String(did.clone())) .collect(), ), contains: None, fuzzy: None, gt: None, gte: None, lt: None, lte: None, }, ); // Query database with no limit - load all records for batched filtering match self .db .get_slice_collections_records( &slice_uri, None, // No limit - load all records for this DID None, // cursor None, // sort Some(&where_clause), ) .await { Ok((records, _cursor)) => { // Group results by DID for record in records { let key = CollectionDidKey { slice_uri: slice_uri.clone(), collection: collection.clone(), did: record.did.clone(), }; // Convert Record to IndexedRecord let indexed_record = IndexedRecord { uri: record.uri, cid: record.cid, did: record.did, collection: record.collection, value: record.json, indexed_at: record.indexed_at.to_rfc3339(), }; results .entry(key) .or_insert_with(Vec::new) .push(indexed_record); } } Err(e) => { tracing::error!( "DataLoader batch query failed for {}/{}: {}", slice_uri, collection, e ); // Return empty results for failed queries rather than failing the entire batch } } } // Ensure all requested keys have an entry (even if empty) for key in keys { results.entry(key.clone()).or_insert_with(Vec::new); } Ok(results) } } /// Key for batching record queries by collection and parent URI (for reverse joins) #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub struct CollectionUriKey { pub slice_uri: String, pub collection: String, pub parent_uri: String, pub reference_field: String, // Field name that contains the reference (e.g., "subject") } /// Loader for batching record queries by collection and parent URI /// Used for reverse joins where we need to find records that reference a parent URI pub struct CollectionUriLoader { db: Database, } impl CollectionUriLoader { pub fn new(db: Database) -> Self { Self { db } } } impl Loader for CollectionUriLoader { type Value = Vec; type Error = Arc; async fn load( &self, keys: &[CollectionUriKey], ) -> Result, Self::Error> { // Group keys by (slice_uri, collection, reference_field) for optimal batching let mut grouped: HashMap<(String, String, String), Vec> = HashMap::new(); for key in keys { grouped .entry(( key.slice_uri.clone(), key.collection.clone(), key.reference_field.clone(), )) .or_insert_with(Vec::new) .push(key.parent_uri.clone()); } let mut results: HashMap> = HashMap::new(); // Execute one query per (slice, collection, reference_field) combination for ((slice_uri, collection, reference_field), parent_uris) in grouped { let mut where_clause = WhereClause { conditions: HashMap::new(), or_conditions: None, and: None, or: None, }; // Filter by collection where_clause.conditions.insert( "collection".to_string(), WhereCondition { eq: Some(serde_json::Value::String(collection.clone())), in_values: None, contains: None, fuzzy: None, gt: None, gte: None, lt: None, lte: None, }, ); // Filter by parent URIs using IN clause on the reference field // This queries: WHERE json->>'reference_field' IN (parent_uri1, parent_uri2, ...) where_clause.conditions.insert( reference_field.clone(), WhereCondition { eq: None, in_values: Some( parent_uris .iter() .map(|uri| serde_json::Value::String(uri.clone())) .collect(), ), contains: None, fuzzy: None, gt: None, gte: None, lt: None, lte: None, }, ); // Query database with no limit - load all records for batched filtering match self .db .get_slice_collections_records( &slice_uri, None, // No limit - load all records matching parent URIs None, // cursor None, // sort Some(&where_clause), ) .await { Ok((records, _cursor)) => { // Group results by parent URI (extract from the reference field) for record in records { // Try to extract URI - could be plain string or strongRef object let parent_uri = record.json.get(&reference_field).and_then(|v| { // First try as plain string if let Some(uri_str) = v.as_str() { return Some(uri_str.to_string()); } // Then try as strongRef crate::graphql::dataloaders::extract_uri_from_strong_ref(v) }); if let Some(parent_uri) = parent_uri { let key = CollectionUriKey { slice_uri: slice_uri.clone(), collection: collection.clone(), parent_uri: parent_uri.clone(), reference_field: reference_field.clone(), }; // Convert Record to IndexedRecord let indexed_record = IndexedRecord { uri: record.uri, cid: record.cid, did: record.did, collection: record.collection, value: record.json, indexed_at: record.indexed_at.to_rfc3339(), }; results .entry(key) .or_insert_with(Vec::new) .push(indexed_record); } } } Err(e) => { tracing::error!( "CollectionUriLoader batch query failed for {}/{}: {}", slice_uri, collection, e ); // Return empty results for failed queries rather than failing the entire batch } } } // Ensure all requested keys have an entry (even if empty) for key in keys { results.entry(key.clone()).or_insert_with(Vec::new); } Ok(results) } } /// Context data that includes the DataLoader #[derive(Clone)] pub struct GraphQLContext { #[allow(dead_code)] pub collection_did_loader: Arc>, pub collection_uri_loader: Arc>, pub auth_token: Option, pub auth_base_url: String, pub auth_cache: Option>>, } impl GraphQLContext { pub fn with_auth( db: Database, auth_token: Option, auth_base_url: String, auth_cache: Option>>, ) -> Self { Self { collection_did_loader: Arc::new(AsyncGraphQLDataLoader::new( CollectionDidLoader::new(db.clone()), tokio::spawn, )), collection_uri_loader: Arc::new(AsyncGraphQLDataLoader::new( CollectionUriLoader::new(db), tokio::spawn, )), auth_token, auth_base_url, auth_cache, } } }