···1+//! DataLoader implementation for batching database queries
2+//!
3+//! This module provides a DataLoader that batches multiple requests for records
4+//! into single database queries, eliminating the N+1 query problem.
5+6+use async_graphql::dataloader::{DataLoader as AsyncGraphQLDataLoader, Loader};
7+use std::collections::HashMap;
8+use std::sync::Arc;
9+10+use crate::database::Database;
11+use crate::models::{IndexedRecord, WhereClause, WhereCondition};
12+13+/// Key for batching record queries by collection and DID
14+#[derive(Debug, Clone, Hash, Eq, PartialEq)]
15+pub struct CollectionDidKey {
16+ pub slice_uri: String,
17+ pub collection: String,
18+ pub did: String,
19+}
20+21+/// Loader for batching record queries by collection and DID
22+pub struct CollectionDidLoader {
23+ db: Database,
24+}
25+26+impl CollectionDidLoader {
27+ pub fn new(db: Database) -> Self {
28+ Self { db }
29+ }
30+}
31+32+impl Loader<CollectionDidKey> for CollectionDidLoader {
33+ type Value = Vec<IndexedRecord>;
34+ type Error = Arc<String>;
35+36+ async fn load(&self, keys: &[CollectionDidKey]) -> Result<HashMap<CollectionDidKey, Self::Value>, Self::Error> {
37+ // Group keys by slice_uri and collection for optimal batching
38+ let mut grouped: HashMap<(String, String), Vec<String>> = HashMap::new();
39+40+ for key in keys {
41+ grouped
42+ .entry((key.slice_uri.clone(), key.collection.clone()))
43+ .or_insert_with(Vec::new)
44+ .push(key.did.clone());
45+ }
46+47+ let mut results: HashMap<CollectionDidKey, Vec<IndexedRecord>> = HashMap::new();
48+49+ // Execute one query per (slice, collection) combination
50+ for ((slice_uri, collection), dids) in grouped {
51+ let mut where_clause = WhereClause {
52+ conditions: HashMap::new(),
53+ or_conditions: None,
54+ };
55+56+ // Filter by collection
57+ where_clause.conditions.insert(
58+ "collection".to_string(),
59+ WhereCondition {
60+ gt: None,
61+ gte: None,
62+ lt: None,
63+ lte: None,
64+ eq: Some(serde_json::Value::String(collection.clone())),
65+ in_values: None,
66+ contains: None,
67+ },
68+ );
69+70+ // Filter by DIDs using IN clause for batching
71+ where_clause.conditions.insert(
72+ "did".to_string(),
73+ WhereCondition {
74+ gt: None,
75+ gte: None,
76+ lt: None,
77+ lte: None,
78+ eq: None,
79+ in_values: Some(
80+ dids.iter()
81+ .map(|did| serde_json::Value::String(did.clone()))
82+ .collect()
83+ ),
84+ contains: None,
85+ },
86+ );
87+88+ // Query database with no limit - load all records for batched filtering
89+ match self.db.get_slice_collections_records(
90+ &slice_uri,
91+ None, // No limit - load all records for this DID
92+ None, // cursor
93+ None, // sort
94+ Some(&where_clause),
95+ ).await {
96+ Ok((records, _cursor)) => {
97+ // Group results by DID
98+ for record in records {
99+ let key = CollectionDidKey {
100+ slice_uri: slice_uri.clone(),
101+ collection: collection.clone(),
102+ did: record.did.clone(),
103+ };
104+105+ // Convert Record to IndexedRecord
106+ let indexed_record = IndexedRecord {
107+ uri: record.uri,
108+ cid: record.cid,
109+ did: record.did,
110+ collection: record.collection,
111+ value: record.json,
112+ indexed_at: record.indexed_at.to_rfc3339(),
113+ };
114+115+ results
116+ .entry(key)
117+ .or_insert_with(Vec::new)
118+ .push(indexed_record);
119+ }
120+ }
121+ Err(e) => {
122+ tracing::error!("DataLoader batch query failed for {}/{}: {}", slice_uri, collection, e);
123+ // Return empty results for failed queries rather than failing the entire batch
124+ }
125+ }
126+ }
127+128+ // Ensure all requested keys have an entry (even if empty)
129+ for key in keys {
130+ results.entry(key.clone()).or_insert_with(Vec::new);
131+ }
132+133+ Ok(results)
134+ }
135+}
136+137+/// Key for batching record queries by collection and parent URI (for reverse joins)
138+#[derive(Debug, Clone, Hash, Eq, PartialEq)]
139+pub struct CollectionUriKey {
140+ pub slice_uri: String,
141+ pub collection: String,
142+ pub parent_uri: String,
143+ pub reference_field: String, // Field name that contains the reference (e.g., "subject")
144+}
145+146+/// Loader for batching record queries by collection and parent URI
147+/// Used for reverse joins where we need to find records that reference a parent URI
148+pub struct CollectionUriLoader {
149+ db: Database,
150+}
151+152+impl CollectionUriLoader {
153+ pub fn new(db: Database) -> Self {
154+ Self { db }
155+ }
156+}
157+158+impl Loader<CollectionUriKey> for CollectionUriLoader {
159+ type Value = Vec<IndexedRecord>;
160+ type Error = Arc<String>;
161+162+ async fn load(&self, keys: &[CollectionUriKey]) -> Result<HashMap<CollectionUriKey, Self::Value>, Self::Error> {
163+ // Group keys by (slice_uri, collection, reference_field) for optimal batching
164+ let mut grouped: HashMap<(String, String, String), Vec<String>> = HashMap::new();
165+166+ for key in keys {
167+ grouped
168+ .entry((key.slice_uri.clone(), key.collection.clone(), key.reference_field.clone()))
169+ .or_insert_with(Vec::new)
170+ .push(key.parent_uri.clone());
171+ }
172+173+ let mut results: HashMap<CollectionUriKey, Vec<IndexedRecord>> = HashMap::new();
174+175+ // Execute one query per (slice, collection, reference_field) combination
176+ for ((slice_uri, collection, reference_field), parent_uris) in grouped {
177+ let mut where_clause = WhereClause {
178+ conditions: HashMap::new(),
179+ or_conditions: None,
180+ };
181+182+ // Filter by collection
183+ where_clause.conditions.insert(
184+ "collection".to_string(),
185+ WhereCondition {
186+ gt: None,
187+ gte: None,
188+ lt: None,
189+ lte: None,
190+ eq: Some(serde_json::Value::String(collection.clone())),
191+ in_values: None,
192+ contains: None,
193+ },
194+ );
195+196+ // Filter by parent URIs using IN clause on the reference field
197+ // This queries: WHERE json->>'reference_field' IN (parent_uri1, parent_uri2, ...)
198+ where_clause.conditions.insert(
199+ reference_field.clone(),
200+ WhereCondition {
201+ gt: None,
202+ gte: None,
203+ lt: None,
204+ lte: None,
205+ eq: None,
206+ in_values: Some(
207+ parent_uris.iter()
208+ .map(|uri| serde_json::Value::String(uri.clone()))
209+ .collect()
210+ ),
211+ contains: None,
212+ },
213+ );
214+215+ // Query database with no limit - load all records for batched filtering
216+ match self.db.get_slice_collections_records(
217+ &slice_uri,
218+ None, // No limit - load all records matching parent URIs
219+ None, // cursor
220+ None, // sort
221+ Some(&where_clause),
222+ ).await {
223+ Ok((records, _cursor)) => {
224+ // Group results by parent URI (extract from the reference field)
225+ for record in records {
226+ // Try to extract URI - could be plain string or strongRef object
227+ let parent_uri = record.json.get(&reference_field).and_then(|v| {
228+ // First try as plain string
229+ if let Some(uri_str) = v.as_str() {
230+ return Some(uri_str.to_string());
231+ }
232+ // Then try as strongRef
233+ crate::graphql::dataloaders::extract_uri_from_strong_ref(v)
234+ });
235+236+ if let Some(parent_uri) = parent_uri {
237+ let key = CollectionUriKey {
238+ slice_uri: slice_uri.clone(),
239+ collection: collection.clone(),
240+ parent_uri: parent_uri.clone(),
241+ reference_field: reference_field.clone(),
242+ };
243+244+ // Convert Record to IndexedRecord
245+ let indexed_record = IndexedRecord {
246+ uri: record.uri,
247+ cid: record.cid,
248+ did: record.did,
249+ collection: record.collection,
250+ value: record.json,
251+ indexed_at: record.indexed_at.to_rfc3339(),
252+ };
253+254+ results
255+ .entry(key)
256+ .or_insert_with(Vec::new)
257+ .push(indexed_record);
258+ }
259+ }
260+ }
261+ Err(e) => {
262+ tracing::error!("CollectionUriLoader batch query failed for {}/{}: {}", slice_uri, collection, e);
263+ // Return empty results for failed queries rather than failing the entire batch
264+ }
265+ }
266+ }
267+268+ // Ensure all requested keys have an entry (even if empty)
269+ for key in keys {
270+ results.entry(key.clone()).or_insert_with(Vec::new);
271+ }
272+273+ Ok(results)
274+ }
275+}
276+277+/// Context data that includes the DataLoader
278+#[derive(Clone)]
279+pub struct GraphQLContext {
280+ #[allow(dead_code)]
281+ pub collection_did_loader: Arc<AsyncGraphQLDataLoader<CollectionDidLoader>>,
282+ pub collection_uri_loader: Arc<AsyncGraphQLDataLoader<CollectionUriLoader>>,
283+}
284+285+impl GraphQLContext {
286+ pub fn new(db: Database) -> Self {
287+ Self {
288+ collection_did_loader: Arc::new(AsyncGraphQLDataLoader::new(
289+ CollectionDidLoader::new(db.clone()),
290+ tokio::spawn
291+ )),
292+ collection_uri_loader: Arc::new(AsyncGraphQLDataLoader::new(
293+ CollectionUriLoader::new(db),
294+ tokio::spawn
295+ )),
296+ }
297+ }
298+}
+24-5
api/src/graphql/handler.rs
···1516use crate::errors::AppError;
17use crate::AppState;
01819/// Global schema cache (one schema per slice)
20/// This prevents rebuilding the schema on every request
···65 }
66 };
6768- Ok(schema.execute(req.into_inner()).await.into())
000000069}
7071/// GraphiQL UI handler
···198 }
199 };
200000201 // Upgrade to WebSocket and handle GraphQL subscriptions manually
202 Ok(ws
203 .protocols(["graphql-transport-ws", "graphql-ws"])
204- .on_upgrade(move |socket| handle_graphql_ws(socket, schema)))
205}
206207/// Handle GraphQL WebSocket connection
208-async fn handle_graphql_ws(socket: WebSocket, schema: Schema) {
209 let (ws_sender, ws_receiver) = socket.split();
210211 // Convert axum WebSocket messages to strings for async-graphql
···216 })
217 });
218219- // Create GraphQL WebSocket handler
220- let mut stream = GraphQLWebSocket::new(schema, input, WebSocketProtocols::GraphQLWS);
00000000221222 // Send GraphQL messages back through WebSocket
223 let mut ws_sender = ws_sender;
···1516use crate::errors::AppError;
17use crate::AppState;
18+use crate::graphql::GraphQLContext;
1920/// Global schema cache (one schema per slice)
21/// This prevents rebuilding the schema on every request
···66 }
67 };
6869+ // Create GraphQL context with DataLoader
70+ let gql_context = GraphQLContext::new(state.database.clone());
71+72+ // Execute query with context
73+ Ok(schema
74+ .execute(req.into_inner().data(gql_context))
75+ .await
76+ .into())
77}
7879/// GraphiQL UI handler
···206 }
207 };
208209+ // Create GraphQL context with DataLoader
210+ let gql_context = GraphQLContext::new(state.database.clone());
211+212 // Upgrade to WebSocket and handle GraphQL subscriptions manually
213 Ok(ws
214 .protocols(["graphql-transport-ws", "graphql-ws"])
215+ .on_upgrade(move |socket| handle_graphql_ws(socket, schema, gql_context)))
216}
217218/// Handle GraphQL WebSocket connection
219+async fn handle_graphql_ws(socket: WebSocket, schema: Schema, gql_context: GraphQLContext) {
220 let (ws_sender, ws_receiver) = socket.split();
221222 // Convert axum WebSocket messages to strings for async-graphql
···227 })
228 });
229230+ // Create GraphQL WebSocket handler with context
231+ let mut stream = GraphQLWebSocket::new(schema.clone(), input, WebSocketProtocols::GraphQLWS)
232+ .on_connection_init(move |_| {
233+ let gql_ctx = gql_context.clone();
234+ async move {
235+ let mut data = async_graphql::Data::default();
236+ data.insert(gql_ctx);
237+ Ok(data)
238+ }
239+ });
240241 // Send GraphQL messages back through WebSocket
242 let mut ws_sender = ws_sender;
+2
api/src/graphql/mod.rs
···56mod schema_builder;
7mod dataloaders;
08mod types;
9pub mod handler;
10pub mod pubsub;
···12pub use schema_builder::build_graphql_schema;
13pub use handler::{graphql_handler, graphql_playground, graphql_subscription_handler};
14pub use pubsub::{RecordUpdateEvent, RecordOperation, PUBSUB};
0
···56mod schema_builder;
7mod dataloaders;
8+mod dataloader;
9mod types;
10pub mod handler;
11pub mod pubsub;
···13pub use schema_builder::build_graphql_schema;
14pub use handler::{graphql_handler, graphql_playground, graphql_subscription_handler};
15pub use pubsub::{RecordUpdateEvent, RecordOperation, PUBSUB};
16+pub use dataloader::GraphQLContext;
+658-143
api/src/graphql/schema_builder.rs
···15use crate::database::Database;
16use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType};
17use crate::graphql::PUBSUB;
01819/// Metadata about a collection for cross-referencing
20#[derive(Clone)]
···22 nsid: String,
23 key_type: String, // "tid", "literal:self", or "any"
24 type_name: String, // GraphQL type name for this collection
025}
2627/// Builds a dynamic GraphQL schema from lexicons for a given slice
···52 // Build Query root type and collect all object types
53 let mut query = Object::new("Query");
54 let mut objects_to_register = Vec::new();
005556 // First pass: collect metadata about all collections for cross-referencing
57 let mut all_collections: Vec<CollectionMeta> = Vec::new();
···68 let fields = extract_collection_fields(defs);
69 if !fields.is_empty() {
70 if let Some(key_type) = extract_record_key(defs) {
0000000000000071 all_collections.push(CollectionMeta {
72 nsid: nsid.to_string(),
73 key_type,
74 type_name: nsid_to_type_name(nsid),
075 });
76 }
77 }
···102 let edge_type = create_edge_type(&type_name);
103 let connection_type = create_connection_type(&type_name);
10400000000000000000000000000000000000000000000000000105 // Collect the types to register with schema later
106 objects_to_register.push(record_type);
107 objects_to_register.push(edge_type);
108 objects_to_register.push(connection_type);
0000109110 // Add query field for this collection
111 let collection_query_name = nsid_to_query_name(nsid);
···143 for item in list.iter() {
144 if let Ok(obj) = item.object() {
145 let field = obj.get("field")
146- .and_then(|v| v.string().ok())
147- .unwrap_or("indexedAt")
148- .to_string();
149 let direction = obj.get("direction")
150- .and_then(|v| v.string().ok())
151- .unwrap_or("desc")
152- .to_string();
153 sort_fields.push(crate::models::SortField { field, direction });
154 }
155 }
···171 where_clause.conditions.insert(
172 "collection".to_string(),
173 crate::models::WhereCondition {
0000174 eq: Some(serde_json::Value::String(collection.clone())),
175 in_values: None,
176 contains: None,
···184 for (field_name, condition_val) in where_obj.iter() {
185 if let Ok(condition_obj) = condition_val.object() {
186 let mut where_condition = crate::models::WhereCondition {
0000187 eq: None,
188 in_values: None,
189 contains: None,
···220 }
221 }
222223- where_clause.conditions.insert(field_name.to_string(), where_condition);
0000000000000000000000000000000000000000000224 }
225 }
226 }
···251 // Replace actorHandle condition with did condition
252 let did_condition = if dids.len() == 1 {
253 crate::models::WhereCondition {
0000254 eq: Some(serde_json::Value::String(dids[0].clone())),
255 in_values: None,
256 contains: None,
257 }
258 } else {
259 crate::models::WhereCondition {
0000260 eq: None,
261 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
262 contains: None,
···344 ))
345 .argument(async_graphql::dynamic::InputValue::new(
346 "sortBy",
347- TypeRef::named_list("SortField"),
348 ))
349 .argument(async_graphql::dynamic::InputValue::new(
350 "where",
351- TypeRef::named("JSON"),
352 ))
353 .description(format!("Query {} records", nsid)),
354 );
···376377 FieldFuture::new(async move {
378 // Parse groupBy argument
379- let group_by_fields: Vec<String> = match ctx.args.get("groupBy") {
380 Some(val) => {
381 if let Ok(list) = val.list() {
382- list.iter()
383- .filter_map(|v| v.string().ok().map(|s| s.to_string()))
384- .collect()
000000000000000000000000000000000000385 } else {
386 Vec::new()
387 }
···423 where_clause.conditions.insert(
424 "collection".to_string(),
425 crate::models::WhereCondition {
0000426 eq: Some(serde_json::Value::String(collection.clone())),
427 in_values: None,
428 contains: None,
···435 for (field_name, condition_val) in where_obj.iter() {
436 if let Ok(condition_obj) = condition_val.object() {
437 let mut where_condition = crate::models::WhereCondition {
0000438 eq: None,
439 in_values: None,
440 contains: None,
···471 }
472 }
473474- where_clause.conditions.insert(field_name.to_string(), where_condition);
0000000000000000000000000000000000000000000475 }
476 }
477 }
···499 if !dids.is_empty() {
500 let did_condition = if dids.len() == 1 {
501 crate::models::WhereCondition {
0000502 eq: Some(serde_json::Value::String(dids[0].clone())),
503 in_values: None,
504 contains: None,
505 }
506 } else {
507 crate::models::WhereCondition {
0000508 eq: None,
509 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
510 contains: None,
···546 )
547 .argument(async_graphql::dynamic::InputValue::new(
548 "groupBy",
549- TypeRef::named_nn_list_nn(TypeRef::STRING),
550 ))
551 .argument(async_graphql::dynamic::InputValue::new(
552 "where",
553- TypeRef::named("JSON"),
554 ))
555 .argument(async_graphql::dynamic::InputValue::new(
556 "orderBy",
···571 // Build Subscription type with collection-specific subscriptions
572 let subscription = create_subscription_type(slice_uri.clone(), &lexicons);
573574- // Build and return the schema
575 let mut schema_builder = Schema::build(query.type_name(), Some(mutation.type_name()), Some(subscription.type_name()))
576 .register(query)
577 .register(mutation)
578- .register(subscription);
00579580 // Register JSON scalar type for complex fields
581 let json_scalar = Scalar::new("JSON");
582 schema_builder = schema_builder.register(json_scalar);
5830000000000000000000000000000584 // Register Blob type
585 let blob_type = create_blob_type();
586 schema_builder = schema_builder.register(blob_type);
···608 let aggregation_order_by_input = create_aggregation_order_by_input();
609 schema_builder = schema_builder.register(aggregation_order_by_input);
6100000611 // Register PageInfo type
612 let page_info_type = create_page_info_type();
613 schema_builder = schema_builder.register(page_info_type);
···621 schema_builder = schema_builder.register(obj);
622 }
6230000000000624 schema_builder
625 .finish()
626 .map_err(|e| format!("Schema build error: {:?}", e))
···720 where_clause.conditions.insert(
721 "did".to_string(),
722 crate::models::WhereCondition {
0000723 eq: Some(serde_json::Value::String(did.clone())),
724 in_values: None,
725 contains: None,
···768 if let Some(val) = value {
769 // Check for explicit null value
770 if val.is_null() {
771- return Ok(Some(FieldValue::NULL));
0000000000000000000000000000000000000000000000772 }
773774 // Check if this is a blob field
···804 return Ok(Some(FieldValue::owned_any(blob_container)));
805 }
806807- // If not a proper blob object, return NULL
808- return Ok(Some(FieldValue::NULL));
809 }
810811 // Check if this is a reference field that needs joining
···827 return Ok(Some(FieldValue::value(graphql_val)));
828 }
829 Ok(None) => {
830- return Ok(Some(FieldValue::NULL));
831 }
832 Err(e) => {
833 tracing::error!("Error fetching linked record: {}", e);
834- return Ok(Some(FieldValue::NULL));
835 }
836 }
837 }
···841 let graphql_val = json_to_graphql_value(val);
842 Ok(Some(FieldValue::value(graphql_val)))
843 } else {
844- Ok(Some(FieldValue::NULL))
845 }
846 })
847 }));
848 }
849850- // Add join fields for cross-referencing other collections by DID
851 for collection in all_collections {
852 let field_name = nsid_to_join_field_name(&collection.nsid);
853···856 continue;
857 }
858000000859 let collection_nsid = collection.nsid.clone();
860 let key_type = collection.key_type.clone();
861 let db_for_join = database.clone();
862- let slice_for_join = slice_uri.clone();
000000000000000000000000000000000000000000863864 // Determine type and resolver based on key_type
865 match key_type.as_str() {
···893 ));
894 }
895 "tid" | "any" => {
896- // Multiple records per DID - return array of the collection's type
897- object = object.field(
000898 Field::new(
899 &field_name,
900 TypeRef::named_nn_list_nn(&collection.type_name),
901 move |ctx| {
902- let db = db_for_join.clone();
903 let nsid = collection_nsid.clone();
904 let slice = slice_for_join.clone();
0905 FieldFuture::new(async move {
906 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
907 let did = &container.record.did;
···909 // Get limit from argument, default to 50
910 let limit = ctx.args.get("limit")
911 .and_then(|v| v.i64().ok())
912- .map(|i| i as i32)
913 .unwrap_or(50)
914 .min(100); // Cap at 100 to prevent abuse
915916- // Build where clause to find all records of this collection for this DID
917- let mut where_clause = crate::models::WhereClause {
918- conditions: HashMap::new(),
919- or_conditions: None,
920- };
921- where_clause.conditions.insert(
922- "collection".to_string(),
923- crate::models::WhereCondition {
924- eq: Some(serde_json::Value::String(nsid.clone())),
925- in_values: None,
926- contains: None,
927- },
928- );
929- where_clause.conditions.insert(
930- "did".to_string(),
931- crate::models::WhereCondition {
932- eq: Some(serde_json::Value::String(did.clone())),
933- in_values: None,
934- contains: None,
935- },
936- );
937938- match db.get_slice_collections_records(
939- &slice,
940- Some(limit),
941- None, // cursor
942- None, // sort
943- Some(&where_clause),
944- ).await {
945- Ok((records, _cursor)) => {
946- let values: Vec<FieldValue> = records
947- .into_iter()
948- .map(|record| {
949- // Convert Record to IndexedRecord
950- let indexed_record = crate::models::IndexedRecord {
951- uri: record.uri,
952- cid: record.cid,
953- did: record.did,
954- collection: record.collection,
955- value: record.json,
956- indexed_at: record.indexed_at.to_rfc3339(),
957- };
958- let container = RecordContainer {
959- record: indexed_record,
960- };
961- FieldValue::owned_any(container)
962- })
963- .collect();
964- Ok(Some(FieldValue::list(values)))
965 }
966- Err(e) => {
967- tracing::debug!("Error querying {}: {}", nsid, e);
968- Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
000000000000000000000000000000000000000000000000000000000000969 }
970 }
971 })
···975 "limit",
976 TypeRef::named(TypeRef::INT),
977 ))
978- );
979 }
980 _ => {
981 // Unknown key type, skip
···988 // This enables bidirectional traversal (e.g., profile.plays and play.profile)
989 for collection in all_collections {
990 let reverse_field_name = format!("{}s", nsid_to_join_field_name(&collection.nsid));
991- let db_for_reverse = database.clone();
992 let slice_for_reverse = slice_uri.clone();
993 let collection_nsid = collection.nsid.clone();
994 let collection_type = collection.type_name.clone();
0995996 object = object.field(
997 Field::new(
998 &reverse_field_name,
999 TypeRef::named_nn_list_nn(&collection_type),
1000 move |ctx| {
1001- let db = db_for_reverse.clone();
1002 let slice = slice_for_reverse.clone();
1003 let nsid = collection_nsid.clone();
01004 FieldFuture::new(async move {
1005 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1006- let did = &container.record.did;
10071008 // Get limit from argument, default to 50
1009 let limit = ctx.args.get("limit")
1010 .and_then(|v| v.i64().ok())
1011- .map(|i| i as i32)
1012 .unwrap_or(50)
1013 .min(100); // Cap at 100 to prevent abuse
10141015- // Build where clause to find all records of this collection for this DID
1016- let mut where_clause = crate::models::WhereClause {
1017- conditions: HashMap::new(),
1018- or_conditions: None,
1019- };
1020- where_clause.conditions.insert(
1021- "collection".to_string(),
1022- crate::models::WhereCondition {
1023- eq: Some(serde_json::Value::String(nsid.clone())),
1024- in_values: None,
1025- contains: None,
1026- },
1027- );
1028- where_clause.conditions.insert(
1029- "did".to_string(),
1030- crate::models::WhereCondition {
1031- eq: Some(serde_json::Value::String(did.clone())),
1032- in_values: None,
1033- contains: None,
1034- },
1035- );
00000000000000000010361037- match db.get_slice_collections_records(
1038- &slice,
1039- Some(limit),
1040- None, // cursor
1041- None, // sort
1042- Some(&where_clause),
1043- ).await {
1044- Ok((records, _cursor)) => {
1045- let values: Vec<FieldValue> = records
1046- .into_iter()
1047- .map(|record| {
1048- // Convert Record to IndexedRecord
1049- let indexed_record = crate::models::IndexedRecord {
1050- uri: record.uri,
1051- cid: record.cid,
1052- did: record.did,
1053- collection: record.collection,
1054- value: record.json,
1055- indexed_at: record.indexed_at.to_rfc3339(),
1056- };
1057- let container = RecordContainer {
1058- record: indexed_record,
1059- };
1060- FieldValue::owned_any(container)
1061- })
1062- .collect();
1063- Ok(Some(FieldValue::list(values)))
1064 }
1065- Err(e) => {
1066- tracing::debug!("Error querying {}: {}", nsid, e);
1067- Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
1068- }
1069 }
00001070 })
1071 },
1072 )
···1075 TypeRef::named(TypeRef::INT),
1076 ))
1077 );
0000000000000000000000000000000000000000000000000000000000000000000000001078 }
10791080 object
···1144 }
1145 GraphQLType::Blob => {
1146 // Blob object type with url resolver
1147- if is_required {
1148- TypeRef::named_nn("Blob")
1149- } else {
1150- TypeRef::named("Blob")
1151- }
1152 }
1153 GraphQLType::Json | GraphQLType::Ref | GraphQLType::Object(_) | GraphQLType::Union => {
1154 // JSON scalar type - linked records and complex objects return as JSON
···1175 TypeRef::named_nn_list_nn(inner_ref)
1176 } else {
1177 TypeRef::named_list(inner_ref)
000000001178 }
1179 }
1180 _ => {
···1715fn create_aggregation_order_by_input() -> InputObject {
1716 InputObject::new("AggregationOrderBy")
1717 .field(InputValue::new("count", TypeRef::named("SortDirection")))
00000000000001718}
17191720/// Converts a serde_json::Value to an async_graphql::Value
···15use crate::database::Database;
16use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType};
17use crate::graphql::PUBSUB;
18+use crate::graphql::dataloader::GraphQLContext;
1920/// Metadata about a collection for cross-referencing
21#[derive(Clone)]
···23 nsid: String,
24 key_type: String, // "tid", "literal:self", or "any"
25 type_name: String, // GraphQL type name for this collection
26+ at_uri_fields: Vec<String>, // Fields with format "at-uri" for reverse joins
27}
2829/// Builds a dynamic GraphQL schema from lexicons for a given slice
···54 // Build Query root type and collect all object types
55 let mut query = Object::new("Query");
56 let mut objects_to_register = Vec::new();
57+ let mut where_inputs_to_register = Vec::new();
58+ let mut group_by_enums_to_register = Vec::new();
5960 // First pass: collect metadata about all collections for cross-referencing
61 let mut all_collections: Vec<CollectionMeta> = Vec::new();
···72 let fields = extract_collection_fields(defs);
73 if !fields.is_empty() {
74 if let Some(key_type) = extract_record_key(defs) {
75+ // Extract at-uri field names for reverse joins
76+ let at_uri_fields: Vec<String> = fields.iter()
77+ .filter(|f| f.format.as_deref() == Some("at-uri"))
78+ .map(|f| f.name.clone())
79+ .collect();
80+81+ if !at_uri_fields.is_empty() {
82+ tracing::debug!(
83+ "Collection {} has at-uri fields: {:?}",
84+ nsid,
85+ at_uri_fields
86+ );
87+ }
88+89 all_collections.push(CollectionMeta {
90 nsid: nsid.to_string(),
91 key_type,
92 type_name: nsid_to_type_name(nsid),
93+ at_uri_fields,
94 });
95 }
96 }
···121 let edge_type = create_edge_type(&type_name);
122 let connection_type = create_connection_type(&type_name);
123124+ // Create WhereInput type for this collection
125+ let mut where_input = InputObject::new(format!("{}WhereInput", type_name));
126+127+ // Collect lexicon field names to avoid duplicates
128+ let lexicon_field_names: std::collections::HashSet<&str> =
129+ fields.iter().map(|f| f.name.as_str()).collect();
130+131+ // Add system fields available on all records (skip if already in lexicon)
132+ let system_fields = [
133+ ("indexedAt", "DateTimeFilter"),
134+ ("uri", "StringFilter"),
135+ ("cid", "StringFilter"),
136+ ("did", "StringFilter"),
137+ ("collection", "StringFilter"),
138+ ("actorHandle", "StringFilter"),
139+ ];
140+141+ for (field_name, filter_type) in system_fields {
142+ if !lexicon_field_names.contains(field_name) {
143+ where_input = where_input.field(InputValue::new(field_name, TypeRef::named(filter_type)));
144+ }
145+ }
146+147+ // Add fields from the lexicon
148+ for field in &fields {
149+ let filter_type = match field.field_type {
150+ GraphQLType::Int => "IntFilter",
151+ _ => "StringFilter", // Default to StringFilter for strings and other types
152+ };
153+ where_input = where_input.field(InputValue::new(&field.name, TypeRef::named(filter_type)));
154+ }
155+156+ // Create GroupByField enum for this collection
157+ let mut group_by_enum = Enum::new(format!("{}GroupByField", type_name));
158+ group_by_enum = group_by_enum.item(EnumItem::new("indexedAt"));
159+160+ for field in &fields {
161+ group_by_enum = group_by_enum.item(EnumItem::new(&field.name));
162+ }
163+164+ // Create collection-specific GroupByFieldInput
165+ let group_by_input = InputObject::new(format!("{}GroupByFieldInput", type_name))
166+ .field(InputValue::new("field", TypeRef::named_nn(format!("{}GroupByField", type_name))))
167+ .field(InputValue::new("interval", TypeRef::named("DateInterval")));
168+169+ // Create collection-specific SortFieldInput
170+ let sort_field_input = InputObject::new(format!("{}SortFieldInput", type_name))
171+ .field(InputValue::new("field", TypeRef::named_nn(format!("{}GroupByField", type_name))))
172+ .field(InputValue::new("direction", TypeRef::named("SortDirection")));
173+174 // Collect the types to register with schema later
175 objects_to_register.push(record_type);
176 objects_to_register.push(edge_type);
177 objects_to_register.push(connection_type);
178+ where_inputs_to_register.push(where_input);
179+ where_inputs_to_register.push(group_by_input);
180+ where_inputs_to_register.push(sort_field_input);
181+ group_by_enums_to_register.push(group_by_enum);
182183 // Add query field for this collection
184 let collection_query_name = nsid_to_query_name(nsid);
···216 for item in list.iter() {
217 if let Ok(obj) = item.object() {
218 let field = obj.get("field")
219+ .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
220+ .unwrap_or_else(|| "indexedAt".to_string());
0221 let direction = obj.get("direction")
222+ .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
223+ .unwrap_or_else(|| "desc".to_string());
0224 sort_fields.push(crate::models::SortField { field, direction });
225 }
226 }
···242 where_clause.conditions.insert(
243 "collection".to_string(),
244 crate::models::WhereCondition {
245+ gt: None,
246+ gte: None,
247+ lt: None,
248+ lte: None,
249 eq: Some(serde_json::Value::String(collection.clone())),
250 in_values: None,
251 contains: None,
···259 for (field_name, condition_val) in where_obj.iter() {
260 if let Ok(condition_obj) = condition_val.object() {
261 let mut where_condition = crate::models::WhereCondition {
262+ gt: None,
263+ gte: None,
264+ lt: None,
265+ lte: None,
266 eq: None,
267 in_values: None,
268 contains: None,
···299 }
300 }
301302+ // Parse gt condition
303+ if let Some(gt_val) = condition_obj.get("gt") {
304+ if let Ok(gt_str) = gt_val.string() {
305+ where_condition.gt = Some(serde_json::Value::String(gt_str.to_string()));
306+ } else if let Ok(gt_i64) = gt_val.i64() {
307+ where_condition.gt = Some(serde_json::Value::Number(gt_i64.into()));
308+ }
309+ }
310+311+ // Parse gte condition
312+ if let Some(gte_val) = condition_obj.get("gte") {
313+ if let Ok(gte_str) = gte_val.string() {
314+ where_condition.gte = Some(serde_json::Value::String(gte_str.to_string()));
315+ } else if let Ok(gte_i64) = gte_val.i64() {
316+ where_condition.gte = Some(serde_json::Value::Number(gte_i64.into()));
317+ }
318+ }
319+320+ // Parse lt condition
321+ if let Some(lt_val) = condition_obj.get("lt") {
322+ if let Ok(lt_str) = lt_val.string() {
323+ where_condition.lt = Some(serde_json::Value::String(lt_str.to_string()));
324+ } else if let Ok(lt_i64) = lt_val.i64() {
325+ where_condition.lt = Some(serde_json::Value::Number(lt_i64.into()));
326+ }
327+ }
328+329+ // Parse lte condition
330+ if let Some(lte_val) = condition_obj.get("lte") {
331+ if let Ok(lte_str) = lte_val.string() {
332+ where_condition.lte = Some(serde_json::Value::String(lte_str.to_string()));
333+ } else if let Ok(lte_i64) = lte_val.i64() {
334+ where_condition.lte = Some(serde_json::Value::Number(lte_i64.into()));
335+ }
336+ }
337+338+ // Convert indexedAt to indexed_at for database column
339+ let db_field_name = if field_name == "indexedAt" {
340+ "indexed_at".to_string()
341+ } else {
342+ field_name.to_string()
343+ };
344+345+ where_clause.conditions.insert(db_field_name, where_condition);
346 }
347 }
348 }
···373 // Replace actorHandle condition with did condition
374 let did_condition = if dids.len() == 1 {
375 crate::models::WhereCondition {
376+ gt: None,
377+ gte: None,
378+ lt: None,
379+ lte: None,
380 eq: Some(serde_json::Value::String(dids[0].clone())),
381 in_values: None,
382 contains: None,
383 }
384 } else {
385 crate::models::WhereCondition {
386+ gt: None,
387+ gte: None,
388+ lt: None,
389+ lte: None,
390 eq: None,
391 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
392 contains: None,
···474 ))
475 .argument(async_graphql::dynamic::InputValue::new(
476 "sortBy",
477+ TypeRef::named_list(format!("{}SortFieldInput", type_name)),
478 ))
479 .argument(async_graphql::dynamic::InputValue::new(
480 "where",
481+ TypeRef::named(format!("{}WhereInput", type_name)),
482 ))
483 .description(format!("Query {} records", nsid)),
484 );
···506507 FieldFuture::new(async move {
508 // Parse groupBy argument
509+ let group_by_fields: Vec<crate::models::GroupByField> = match ctx.args.get("groupBy") {
510 Some(val) => {
511 if let Ok(list) = val.list() {
512+ let mut fields = Vec::new();
513+ for item in list.iter() {
514+ if let Ok(obj) = item.object() {
515+ // Get field name from enum
516+ let field_name = obj.get("field")
517+ .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
518+ .ok_or_else(|| Error::new("Missing field name in groupBy"))?;
519+520+ // Get optional interval
521+ if let Some(interval_val) = obj.get("interval") {
522+ if let Ok(interval_str) = interval_val.enum_name() {
523+ // Parse interval string to DateInterval
524+ let interval = match interval_str {
525+ "second" => crate::models::DateInterval::Second,
526+ "minute" => crate::models::DateInterval::Minute,
527+ "hour" => crate::models::DateInterval::Hour,
528+ "day" => crate::models::DateInterval::Day,
529+ "week" => crate::models::DateInterval::Week,
530+ "month" => crate::models::DateInterval::Month,
531+ "quarter" => crate::models::DateInterval::Quarter,
532+ "year" => crate::models::DateInterval::Year,
533+ _ => return Err(Error::new(format!("Invalid interval: {}", interval_str))),
534+ };
535+ fields.push(crate::models::GroupByField::Truncated {
536+ field: field_name,
537+ interval,
538+ });
539+ } else {
540+ return Err(Error::new("Invalid interval value"));
541+ }
542+ } else {
543+ // No interval, simple field
544+ fields.push(crate::models::GroupByField::Simple(field_name));
545+ }
546+ } else {
547+ return Err(Error::new("Invalid groupBy item"));
548+ }
549+ }
550+ fields
551 } else {
552 Vec::new()
553 }
···589 where_clause.conditions.insert(
590 "collection".to_string(),
591 crate::models::WhereCondition {
592+ gt: None,
593+ gte: None,
594+ lt: None,
595+ lte: None,
596 eq: Some(serde_json::Value::String(collection.clone())),
597 in_values: None,
598 contains: None,
···605 for (field_name, condition_val) in where_obj.iter() {
606 if let Ok(condition_obj) = condition_val.object() {
607 let mut where_condition = crate::models::WhereCondition {
608+ gt: None,
609+ gte: None,
610+ lt: None,
611+ lte: None,
612 eq: None,
613 in_values: None,
614 contains: None,
···645 }
646 }
647648+ // Parse gt condition
649+ if let Some(gt_val) = condition_obj.get("gt") {
650+ if let Ok(gt_str) = gt_val.string() {
651+ where_condition.gt = Some(serde_json::Value::String(gt_str.to_string()));
652+ } else if let Ok(gt_i64) = gt_val.i64() {
653+ where_condition.gt = Some(serde_json::Value::Number(gt_i64.into()));
654+ }
655+ }
656+657+ // Parse gte condition
658+ if let Some(gte_val) = condition_obj.get("gte") {
659+ if let Ok(gte_str) = gte_val.string() {
660+ where_condition.gte = Some(serde_json::Value::String(gte_str.to_string()));
661+ } else if let Ok(gte_i64) = gte_val.i64() {
662+ where_condition.gte = Some(serde_json::Value::Number(gte_i64.into()));
663+ }
664+ }
665+666+ // Parse lt condition
667+ if let Some(lt_val) = condition_obj.get("lt") {
668+ if let Ok(lt_str) = lt_val.string() {
669+ where_condition.lt = Some(serde_json::Value::String(lt_str.to_string()));
670+ } else if let Ok(lt_i64) = lt_val.i64() {
671+ where_condition.lt = Some(serde_json::Value::Number(lt_i64.into()));
672+ }
673+ }
674+675+ // Parse lte condition
676+ if let Some(lte_val) = condition_obj.get("lte") {
677+ if let Ok(lte_str) = lte_val.string() {
678+ where_condition.lte = Some(serde_json::Value::String(lte_str.to_string()));
679+ } else if let Ok(lte_i64) = lte_val.i64() {
680+ where_condition.lte = Some(serde_json::Value::Number(lte_i64.into()));
681+ }
682+ }
683+684+ // Convert indexedAt to indexed_at for database column
685+ let db_field_name = if field_name == "indexedAt" {
686+ "indexed_at".to_string()
687+ } else {
688+ field_name.to_string()
689+ };
690+691+ where_clause.conditions.insert(db_field_name, where_condition);
692 }
693 }
694 }
···716 if !dids.is_empty() {
717 let did_condition = if dids.len() == 1 {
718 crate::models::WhereCondition {
719+ gt: None,
720+ gte: None,
721+ lt: None,
722+ lte: None,
723 eq: Some(serde_json::Value::String(dids[0].clone())),
724 in_values: None,
725 contains: None,
726 }
727 } else {
728 crate::models::WhereCondition {
729+ gt: None,
730+ gte: None,
731+ lt: None,
732+ lte: None,
733 eq: None,
734 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
735 contains: None,
···771 )
772 .argument(async_graphql::dynamic::InputValue::new(
773 "groupBy",
774+ TypeRef::named_nn_list(format!("{}GroupByFieldInput", type_name)),
775 ))
776 .argument(async_graphql::dynamic::InputValue::new(
777 "where",
778+ TypeRef::named(format!("{}WhereInput", type_name)),
779 ))
780 .argument(async_graphql::dynamic::InputValue::new(
781 "orderBy",
···796 // Build Subscription type with collection-specific subscriptions
797 let subscription = create_subscription_type(slice_uri.clone(), &lexicons);
798799+ // Build and return the schema with complexity limits
800 let mut schema_builder = Schema::build(query.type_name(), Some(mutation.type_name()), Some(subscription.type_name()))
801 .register(query)
802 .register(mutation)
803+ .register(subscription)
804+ .limit_depth(50) // Higher limit to support GraphiQL introspection with reverse joins
805+ .limit_complexity(5000); // Prevent expensive deeply nested queries
806807 // Register JSON scalar type for complex fields
808 let json_scalar = Scalar::new("JSON");
809 schema_builder = schema_builder.register(json_scalar);
810811+ // Register filter input types for WHERE clauses
812+ let string_filter = InputObject::new("StringFilter")
813+ .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
814+ .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING)))
815+ .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING)))
816+ .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
817+ .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING)))
818+ .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
819+ .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING)));
820+ schema_builder = schema_builder.register(string_filter);
821+822+ let int_filter = InputObject::new("IntFilter")
823+ .field(InputValue::new("eq", TypeRef::named(TypeRef::INT)))
824+ .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT)))
825+ .field(InputValue::new("gt", TypeRef::named(TypeRef::INT)))
826+ .field(InputValue::new("gte", TypeRef::named(TypeRef::INT)))
827+ .field(InputValue::new("lt", TypeRef::named(TypeRef::INT)))
828+ .field(InputValue::new("lte", TypeRef::named(TypeRef::INT)));
829+ schema_builder = schema_builder.register(int_filter);
830+831+ let datetime_filter = InputObject::new("DateTimeFilter")
832+ .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
833+ .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
834+ .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING)))
835+ .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
836+ .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING)));
837+ schema_builder = schema_builder.register(datetime_filter);
838+839 // Register Blob type
840 let blob_type = create_blob_type();
841 schema_builder = schema_builder.register(blob_type);
···863 let aggregation_order_by_input = create_aggregation_order_by_input();
864 schema_builder = schema_builder.register(aggregation_order_by_input);
865866+ // Register DateInterval enum for date truncation
867+ let date_interval_enum = create_date_interval_enum();
868+ schema_builder = schema_builder.register(date_interval_enum);
869+870 // Register PageInfo type
871 let page_info_type = create_page_info_type();
872 schema_builder = schema_builder.register(page_info_type);
···880 schema_builder = schema_builder.register(obj);
881 }
882883+ // Register all WhereInput types
884+ for where_input in where_inputs_to_register {
885+ schema_builder = schema_builder.register(where_input);
886+ }
887+888+ // Register all GroupByField enums
889+ for group_by_enum in group_by_enums_to_register {
890+ schema_builder = schema_builder.register(group_by_enum);
891+ }
892+893 schema_builder
894 .finish()
895 .map_err(|e| format!("Schema build error: {:?}", e))
···989 where_clause.conditions.insert(
990 "did".to_string(),
991 crate::models::WhereCondition {
992+ gt: None,
993+ gte: None,
994+ lt: None,
995+ lte: None,
996 eq: Some(serde_json::Value::String(did.clone())),
997 in_values: None,
998 contains: None,
···1041 if let Some(val) = value {
1042 // Check for explicit null value
1043 if val.is_null() {
1044+ return Ok(None);
1045+ }
1046+1047+ // Check if this is an array of blobs
1048+ if let GraphQLType::Array(inner) = &field_type {
1049+ if matches!(inner.as_ref(), GraphQLType::Blob) {
1050+ if let Some(arr) = val.as_array() {
1051+ let blob_containers: Vec<FieldValue> = arr
1052+ .iter()
1053+ .filter_map(|blob_val| {
1054+ let obj = blob_val.as_object()?;
1055+ let blob_ref = obj
1056+ .get("ref")
1057+ .and_then(|r| r.as_object())
1058+ .and_then(|r| r.get("$link"))
1059+ .and_then(|l| l.as_str())
1060+ .unwrap_or("")
1061+ .to_string();
1062+1063+ let mime_type = obj
1064+ .get("mimeType")
1065+ .and_then(|m| m.as_str())
1066+ .unwrap_or("image/jpeg")
1067+ .to_string();
1068+1069+ let size = obj
1070+ .get("size")
1071+ .and_then(|s| s.as_i64())
1072+ .unwrap_or(0);
1073+1074+ let blob_container = BlobContainer {
1075+ blob_ref,
1076+ mime_type,
1077+ size,
1078+ did: container.record.did.clone(),
1079+ };
1080+1081+ Some(FieldValue::owned_any(blob_container))
1082+ })
1083+ .collect();
1084+1085+ return Ok(Some(FieldValue::list(blob_containers)));
1086+ }
1087+1088+ // If not a proper array, return empty list
1089+ return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
1090+ }
1091 }
10921093 // Check if this is a blob field
···1123 return Ok(Some(FieldValue::owned_any(blob_container)));
1124 }
11251126+ // If not a proper blob object, return None (field is null)
1127+ return Ok(None);
1128 }
11291130 // Check if this is a reference field that needs joining
···1146 return Ok(Some(FieldValue::value(graphql_val)));
1147 }
1148 Ok(None) => {
1149+ return Ok(None);
1150 }
1151 Err(e) => {
1152 tracing::error!("Error fetching linked record: {}", e);
1153+ return Ok(None);
1154 }
1155 }
1156 }
···1160 let graphql_val = json_to_graphql_value(val);
1161 Ok(Some(FieldValue::value(graphql_val)))
1162 } else {
1163+ Ok(None)
1164 }
1165 })
1166 }));
1167 }
11681169+ // Add join fields for cross-referencing other collections
1170 for collection in all_collections {
1171 let field_name = nsid_to_join_field_name(&collection.nsid);
1172···1175 continue;
1176 }
11771178+ // Collect all string fields with format "at-uri" that might reference this collection
1179+ // We'll check each one at runtime to see if it contains a URI to this collection
1180+ let uri_ref_fields: Vec<_> = fields.iter()
1181+ .filter(|f| matches!(f.format.as_deref(), Some("at-uri")))
1182+ .collect();
1183+1184 let collection_nsid = collection.nsid.clone();
1185 let key_type = collection.key_type.clone();
1186 let db_for_join = database.clone();
1187+1188+ // If we found at-uri fields, create a resolver that checks each one at runtime
1189+ if !uri_ref_fields.is_empty() {
1190+ let ref_field_names: Vec<String> = uri_ref_fields.iter().map(|f| f.name.clone()).collect();
1191+ let db_for_uri_join = database.clone();
1192+ let target_collection = collection_nsid.clone();
1193+1194+ object = object.field(Field::new(
1195+ &field_name,
1196+ TypeRef::named(&collection.type_name),
1197+ move |ctx| {
1198+ let db = db_for_uri_join.clone();
1199+ let field_names = ref_field_names.clone();
1200+ let expected_collection = target_collection.clone();
1201+ FieldFuture::new(async move {
1202+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1203+1204+ // Try each at-uri field to find one that references this collection
1205+ for field_name in &field_names {
1206+ if let Some(uri_value) = container.record.value.get(field_name) {
1207+ if let Some(uri) = uri_value.as_str() {
1208+ // Check if the URI is for the expected collection
1209+ if uri.contains(&format!("/{}/", expected_collection)) {
1210+ // Fetch the record at this URI
1211+ match db.get_record(uri).await {
1212+ Ok(Some(record)) => {
1213+ let new_container = RecordContainer { record };
1214+ return Ok(Some(FieldValue::owned_any(new_container)));
1215+ }
1216+ Ok(None) => continue, // Try next field
1217+ Err(_) => continue, // Try next field
1218+ }
1219+ }
1220+ }
1221+ }
1222+ }
1223+ // No matching URI found in any field
1224+ Ok(None)
1225+ })
1226+ },
1227+ ));
1228+ continue; // Skip the normal DID-based join logic
1229+ }
12301231 // Determine type and resolver based on key_type
1232 match key_type.as_str() {
···1260 ));
1261 }
1262 "tid" | "any" => {
1263+ // Skip - these are handled as plural reverse joins below with URI filtering
1264+ continue;
1265+1266+ // Multiple records per DID - return array of the collection's type (DISABLED)
1267+ /*object = object.field(
1268 Field::new(
1269 &field_name,
1270 TypeRef::named_nn_list_nn(&collection.type_name),
1271 move |ctx| {
01272 let nsid = collection_nsid.clone();
1273 let slice = slice_for_join.clone();
1274+ let db_fallback = db_for_join.clone();
1275 FieldFuture::new(async move {
1276 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1277 let did = &container.record.did;
···1279 // Get limit from argument, default to 50
1280 let limit = ctx.args.get("limit")
1281 .and_then(|v| v.i64().ok())
1282+ .map(|i| i as usize)
1283 .unwrap_or(50)
1284 .min(100); // Cap at 100 to prevent abuse
12851286+ // Try to get DataLoader from context
1287+ if let Some(gql_ctx) = ctx.data_opt::<GraphQLContext>() {
1288+ // Use DataLoader for batched loading
1289+ let key = CollectionDidKey {
1290+ slice_uri: slice.clone(),
1291+ collection: nsid.clone(),
1292+ did: did.clone(),
1293+ };
1294+1295+ match gql_ctx.collection_did_loader.load_one(key).await {
1296+ Ok(Some(mut records)) => {
1297+ // Apply limit after loading
1298+ records.truncate(limit);
0000000012991300+ let values: Vec<FieldValue> = records
1301+ .into_iter()
1302+ .map(|indexed_record| {
1303+ let container = RecordContainer {
1304+ record: indexed_record,
1305+ };
1306+ FieldValue::owned_any(container)
1307+ })
1308+ .collect();
1309+ Ok(Some(FieldValue::list(values)))
1310+ }
1311+ Ok(None) => {
1312+ Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
1313+ }
1314+ Err(e) => {
1315+ tracing::debug!("DataLoader error for {}: {:?}", nsid, e);
1316+ Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
1317+ }
0000000001318 }
1319+ } else {
1320+ // Fallback to direct database query if DataLoader not available
1321+ let db = db_fallback.clone();
1322+ let mut where_clause = crate::models::WhereClause {
1323+ conditions: HashMap::new(),
1324+ or_conditions: None,
1325+ };
1326+ where_clause.conditions.insert(
1327+ "collection".to_string(),
1328+ crate::models::WhereCondition {
1329+ gt: None,
1330+ gte: None,
1331+ lt: None,
1332+ lte: None,
1333+ eq: Some(serde_json::Value::String(nsid.clone())),
1334+ in_values: None,
1335+ contains: None,
1336+ },
1337+ );
1338+ where_clause.conditions.insert(
1339+ "did".to_string(),
1340+ crate::models::WhereCondition {
1341+ gt: None,
1342+ gte: None,
1343+ lt: None,
1344+ lte: None,
1345+ eq: Some(serde_json::Value::String(did.clone())),
1346+ in_values: None,
1347+ contains: None,
1348+ },
1349+ );
1350+1351+ match db.get_slice_collections_records(
1352+ &slice,
1353+ Some(limit as i32),
1354+ None, // cursor
1355+ None, // sort
1356+ Some(&where_clause),
1357+ ).await {
1358+ Ok((records, _cursor)) => {
1359+ let values: Vec<FieldValue> = records
1360+ .into_iter()
1361+ .map(|record| {
1362+ let indexed_record = crate::models::IndexedRecord {
1363+ uri: record.uri,
1364+ cid: record.cid,
1365+ did: record.did,
1366+ collection: record.collection,
1367+ value: record.json,
1368+ indexed_at: record.indexed_at.to_rfc3339(),
1369+ };
1370+ let container = RecordContainer {
1371+ record: indexed_record,
1372+ };
1373+ FieldValue::owned_any(container)
1374+ })
1375+ .collect();
1376+ Ok(Some(FieldValue::list(values)))
1377+ }
1378+ Err(e) => {
1379+ tracing::debug!("Error querying {}: {}", nsid, e);
1380+ Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
1381+ }
1382 }
1383 }
1384 })
···1388 "limit",
1389 TypeRef::named(TypeRef::INT),
1390 ))
1391+ );*/
1392 }
1393 _ => {
1394 // Unknown key type, skip
···1401 // This enables bidirectional traversal (e.g., profile.plays and play.profile)
1402 for collection in all_collections {
1403 let reverse_field_name = format!("{}s", nsid_to_join_field_name(&collection.nsid));
01404 let slice_for_reverse = slice_uri.clone();
1405 let collection_nsid = collection.nsid.clone();
1406 let collection_type = collection.type_name.clone();
1407+ let at_uri_fields = collection.at_uri_fields.clone();
14081409 object = object.field(
1410 Field::new(
1411 &reverse_field_name,
1412 TypeRef::named_nn_list_nn(&collection_type),
1413 move |ctx| {
01414 let slice = slice_for_reverse.clone();
1415 let nsid = collection_nsid.clone();
1416+ let ref_fields = at_uri_fields.clone();
1417 FieldFuture::new(async move {
1418 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
014191420 // Get limit from argument, default to 50
1421 let limit = ctx.args.get("limit")
1422 .and_then(|v| v.i64().ok())
1423+ .map(|i| i as usize)
1424 .unwrap_or(50)
1425 .min(100); // Cap at 100 to prevent abuse
14261427+ // Try to get DataLoader from context
1428+ if let Some(gql_ctx) = ctx.data_opt::<GraphQLContext>() {
1429+ let parent_uri = &container.record.uri;
1430+1431+ // Try each at-uri field from the lexicon
1432+ tracing::debug!(
1433+ "Trying reverse join for {} with at-uri fields: {:?}",
1434+ nsid,
1435+ ref_fields
1436+ );
1437+1438+ for ref_field in &ref_fields {
1439+ let key = crate::graphql::dataloader::CollectionUriKey {
1440+ slice_uri: slice.clone(),
1441+ collection: nsid.clone(),
1442+ parent_uri: parent_uri.clone(),
1443+ reference_field: ref_field.clone(),
1444+ };
1445+1446+ tracing::debug!(
1447+ "Querying {} via field '{}' for URI: {}",
1448+ nsid,
1449+ ref_field,
1450+ parent_uri
1451+ );
1452+1453+ match gql_ctx.collection_uri_loader.load_one(key).await {
1454+ Ok(Some(mut records)) => {
1455+ if !records.is_empty() {
1456+ tracing::debug!(
1457+ "Found {} {} records via '{}' field for parent URI: {}",
1458+ records.len(),
1459+ nsid,
1460+ ref_field,
1461+ parent_uri
1462+ );
1463+1464+ // Apply limit
1465+ records.truncate(limit);
14661467+ let values: Vec<FieldValue> = records
1468+ .into_iter()
1469+ .map(|indexed_record| {
1470+ let container = RecordContainer {
1471+ record: indexed_record,
1472+ };
1473+ FieldValue::owned_any(container)
1474+ })
1475+ .collect();
1476+ return Ok(Some(FieldValue::list(values)));
1477+ }
1478+ }
1479+ Ok(None) => continue,
1480+ Err(e) => {
1481+ tracing::debug!("DataLoader error for {} field '{}': {:?}", nsid, ref_field, e);
1482+ continue;
1483+ }
1484+ }
0000000001485 }
1486+1487+ // No records found via any at-uri field
1488+ tracing::debug!("No {} records found for parent URI: {}", nsid, parent_uri);
1489+ return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
1490 }
1491+1492+ // Fallback: DataLoader not available
1493+ tracing::debug!("DataLoader not available for reverse join");
1494+ Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
1495 })
1496 },
1497 )
···1500 TypeRef::named(TypeRef::INT),
1501 ))
1502 );
1503+1504+ // Add count field for the reverse join
1505+ let count_field_name = format!("{}Count", reverse_field_name);
1506+ let db_for_count = database.clone();
1507+ let slice_for_count = slice_uri.clone();
1508+ let collection_for_count = collection.nsid.clone();
1509+ let at_uri_fields_for_count = collection.at_uri_fields.clone();
1510+1511+ object = object.field(
1512+ Field::new(
1513+ &count_field_name,
1514+ TypeRef::named_nn(TypeRef::INT),
1515+ move |ctx| {
1516+ let slice = slice_for_count.clone();
1517+ let nsid = collection_for_count.clone();
1518+ let db = db_for_count.clone();
1519+ let ref_fields = at_uri_fields_for_count.clone();
1520+ FieldFuture::new(async move {
1521+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1522+ let parent_uri = &container.record.uri;
1523+1524+ // Build where clause to count records referencing this URI
1525+ for ref_field in &ref_fields {
1526+ let mut where_clause = crate::models::WhereClause {
1527+ conditions: HashMap::new(),
1528+ or_conditions: None,
1529+ };
1530+1531+ where_clause.conditions.insert(
1532+ "collection".to_string(),
1533+ crate::models::WhereCondition {
1534+ gt: None,
1535+ gte: None,
1536+ lt: None,
1537+ lte: None,
1538+ eq: Some(serde_json::Value::String(nsid.clone())),
1539+ in_values: None,
1540+ contains: None,
1541+ },
1542+ );
1543+1544+ where_clause.conditions.insert(
1545+ ref_field.clone(),
1546+ crate::models::WhereCondition {
1547+ gt: None,
1548+ gte: None,
1549+ lt: None,
1550+ lte: None,
1551+ eq: Some(serde_json::Value::String(parent_uri.clone())),
1552+ in_values: None,
1553+ contains: None,
1554+ },
1555+ );
1556+1557+ match db.count_slice_collections_records(&slice, Some(&where_clause)).await {
1558+ Ok(count) if count > 0 => {
1559+ return Ok(Some(FieldValue::value(count as i32)));
1560+ }
1561+ Ok(_) => continue,
1562+ Err(e) => {
1563+ tracing::debug!("Count error for {}: {}", nsid, e);
1564+ continue;
1565+ }
1566+ }
1567+ }
1568+1569+ // No matching field found, return 0
1570+ Ok(Some(FieldValue::value(0)))
1571+ })
1572+ },
1573+ )
1574+ );
1575 }
15761577 object
···1641 }
1642 GraphQLType::Blob => {
1643 // Blob object type with url resolver
1644+ // Always nullable since blob data might be missing or malformed
1645+ TypeRef::named("Blob")
0001646 }
1647 GraphQLType::Json | GraphQLType::Ref | GraphQLType::Object(_) | GraphQLType::Union => {
1648 // JSON scalar type - linked records and complex objects return as JSON
···1669 TypeRef::named_nn_list_nn(inner_ref)
1670 } else {
1671 TypeRef::named_list(inner_ref)
1672+ }
1673+ }
1674+ GraphQLType::Blob => {
1675+ // Arrays of blobs - return list of Blob objects
1676+ if is_required {
1677+ TypeRef::named_nn_list("Blob")
1678+ } else {
1679+ TypeRef::named_list("Blob")
1680 }
1681 }
1682 _ => {
···2217fn create_aggregation_order_by_input() -> InputObject {
2218 InputObject::new("AggregationOrderBy")
2219 .field(InputValue::new("count", TypeRef::named("SortDirection")))
2220+}
2221+2222+/// Creates the DateInterval enum for date truncation
2223+fn create_date_interval_enum() -> Enum {
2224+ Enum::new("DateInterval")
2225+ .item(EnumItem::new("second"))
2226+ .item(EnumItem::new("minute"))
2227+ .item(EnumItem::new("hour"))
2228+ .item(EnumItem::new("day"))
2229+ .item(EnumItem::new("week"))
2230+ .item(EnumItem::new("month"))
2231+ .item(EnumItem::new("quarter"))
2232+ .item(EnumItem::new("year"))
2233}
22342235/// Converts a serde_json::Value to an async_graphql::Value