forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1//! Dynamic GraphQL schema builder from AT Protocol lexicons
2//!
3//! This module generates GraphQL schemas at runtime based on lexicon definitions
4//! stored in the database, enabling flexible querying of slice records.
5
6use async_graphql::dynamic::{
7 Enum, EnumItem, Field, FieldFuture, FieldValue, InputObject, InputValue, Object, Scalar,
8 Schema, Subscription, SubscriptionField, SubscriptionFieldFuture, TypeRef,
9};
10use async_graphql::{Error, Value as GraphQLValue};
11use base64::Engine;
12use base64::engine::general_purpose;
13use serde_json;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::Mutex;
17
18use crate::database::Database;
19use crate::graphql::dataloader::GraphQLContext;
20use crate::graphql::schema_ext::{
21 add_cancel_job_mutation, add_create_oauth_client_mutation, add_delete_job_mutation,
22 add_delete_oauth_client_mutation, add_delete_slice_records_mutation, add_get_sync_summary_query,
23 add_jetstream_logs_query, add_jetstream_logs_subscription, add_oauth_clients_query,
24 add_oauth_clients_field_to_slice, add_slice_records_query, add_sparklines_query,
25 add_sparklines_field_to_slice, add_start_sync_mutation, add_stats_field_to_slice,
26 add_sync_job_logs_query, add_sync_job_query, add_sync_job_subscription, add_sync_jobs_query,
27 add_update_oauth_client_mutation, add_upload_blob_mutation, create_blob_upload_response_type,
28 create_collection_stats_type, create_collection_summary_type,
29 create_delete_slice_records_output_type, create_jetstream_log_entry_type,
30 create_oauth_client_type, create_slice_record_type, create_slice_record_edge_type,
31 create_slice_records_connection_type, create_slice_records_where_input,
32 create_slice_sparkline_type, create_slice_stats_type, create_sparkline_point_type,
33 create_start_sync_output_type, create_sync_job_result_type, create_sync_job_type,
34 create_sync_summary_type,
35};
36use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType};
37use crate::graphql::PUBSUB;
38
39/// Metadata about a collection for cross-referencing
40#[derive(Clone)]
41struct CollectionMeta {
42 nsid: String,
43 key_type: String, // "tid", "literal:self", or "any"
44 type_name: String, // GraphQL type name for this collection
45 at_uri_fields: Vec<String>, // Fields with format "at-uri" for reverse joins
46}
47
48/// Type registry for tracking generated nested object types
49type TypeRegistry = HashMap<String, Object>;
50
51/// Container for nested object field values
52#[derive(Clone)]
53struct NestedObjectContainer {
54 data: serde_json::Value,
55}
56
57/// Generates a unique type name for a nested object field
58fn generate_nested_type_name(parent_type: &str, field_name: &str) -> String {
59 let mut chars = field_name.chars();
60 let capitalized_field = match chars.next() {
61 None => String::new(),
62 Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
63 };
64 format!("{}{}", parent_type, capitalized_field)
65}
66
67/// Resolves a lexicon ref and generates a GraphQL type for it
68/// Returns the generated type name
69fn resolve_lexicon_ref_type(
70 ref_nsid: &str,
71 current_lexicon_nsid: &str,
72 all_lexicons: &[serde_json::Value],
73 type_registry: &mut TypeRegistry,
74 database: &Database,
75) -> String {
76 // Handle different ref formats:
77 // 1. Local ref: #image
78 // 2. External ref with specific def: app.bsky.embed.defs#aspectRatio
79 // 3. External ref to main: community.lexicon.location.hthree
80 let (target_nsid, def_name) = if ref_nsid.starts_with('#') {
81 // Local ref - use current lexicon NSID and the def name without #
82 (current_lexicon_nsid, &ref_nsid[1..])
83 } else if let Some(hash_pos) = ref_nsid.find('#') {
84 // External ref with specific def - split on #
85 (&ref_nsid[..hash_pos], &ref_nsid[hash_pos + 1..])
86 } else {
87 // External ref to main def
88 (ref_nsid, "main")
89 };
90
91 // Generate type name from NSID and def name
92 let type_name = if def_name == "main" {
93 // For refs to main: CommunityLexiconLocationHthree
94 nsid_to_type_name(target_nsid)
95 } else {
96 // For refs to specific def: AppBskyEmbedDefsAspectRatio
97 format!("{}{}", nsid_to_type_name(target_nsid), capitalize_first(def_name))
98 };
99
100 // Check if already generated
101 if type_registry.contains_key(&type_name) {
102 return type_name;
103 }
104
105 // Find the lexicon definition
106 let lexicon = all_lexicons.iter().find(|lex| {
107 lex.get("id").and_then(|id| id.as_str()) == Some(target_nsid)
108 });
109
110 if let Some(lex) = lexicon {
111 // Extract the definition (either "main" or specific def like "image")
112 if let Some(defs) = lex.get("defs") {
113 if let Some(def) = defs.get(def_name) {
114 // Extract fields from this specific definition
115 if let Some(properties) = def.get("properties") {
116 let fields = extract_fields_from_properties(properties);
117
118 if !fields.is_empty() {
119 // Generate the type using existing nested object generator
120 generate_nested_object_type(&type_name, &fields, type_registry, database);
121 return type_name;
122 }
123 }
124 }
125 }
126 }
127
128 // Fallback: couldn't resolve the ref, will use JSON
129 tracing::warn!("Could not resolve lexicon ref: {} (target: {}, def: {})", ref_nsid, target_nsid, def_name);
130 type_name
131}
132
133/// Capitalizes the first character of a string
134fn capitalize_first(s: &str) -> String {
135 let mut chars = s.chars();
136 match chars.next() {
137 None => String::new(),
138 Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
139 }
140}
141
142/// Extracts fields from a lexicon properties object
143fn extract_fields_from_properties(properties: &serde_json::Value) -> Vec<GraphQLField> {
144 let mut fields = Vec::new();
145
146 if let Some(props) = properties.as_object() {
147 for (field_name, field_def) in props {
148 let field_type_str = field_def.get("type").and_then(|t| t.as_str()).unwrap_or("unknown");
149 let field_type = crate::graphql::types::map_lexicon_type_to_graphql(field_type_str, field_def);
150
151 // Check if field is required
152 let is_required = false; // We'd need the parent's "required" array to know this
153
154 // Extract format if present
155 let format = field_def.get("format").and_then(|f| f.as_str()).map(|s| s.to_string());
156
157 fields.push(GraphQLField {
158 name: field_name.clone(),
159 field_type,
160 is_required,
161 format,
162 });
163 }
164 }
165
166 fields
167}
168
169/// Recursively generates GraphQL object types for nested objects
170/// Returns the type name of the generated object type
171fn generate_nested_object_type(
172 type_name: &str,
173 fields: &[GraphQLField],
174 type_registry: &mut TypeRegistry,
175 database: &Database,
176) -> String {
177 // Check if type already exists in registry
178 if type_registry.contains_key(type_name) {
179 return type_name.to_string();
180 }
181
182 let mut object = Object::new(type_name);
183
184 // Add fields to the object
185 for field in fields {
186 let field_name = field.name.clone();
187 let field_name_for_field = field_name.clone(); // Clone for Field::new
188 let field_type = field.field_type.clone();
189
190 // Determine the TypeRef for this field
191 let type_ref = match &field.field_type {
192 GraphQLType::Object(nested_fields) => {
193 // Generate nested object type recursively
194 let nested_type_name = generate_nested_type_name(type_name, &field_name);
195 let actual_type_name = generate_nested_object_type(
196 &nested_type_name,
197 nested_fields,
198 type_registry,
199 database,
200 );
201
202 if field.is_required {
203 TypeRef::named_nn(actual_type_name)
204 } else {
205 TypeRef::named(actual_type_name)
206 }
207 }
208 GraphQLType::Array(inner) => {
209 if let GraphQLType::Object(nested_fields) = inner.as_ref() {
210 // Generate nested object type for array items
211 let nested_type_name = generate_nested_type_name(type_name, &field_name);
212 let actual_type_name = generate_nested_object_type(
213 &nested_type_name,
214 nested_fields,
215 type_registry,
216 database,
217 );
218
219 if field.is_required {
220 TypeRef::named_nn_list(actual_type_name)
221 } else {
222 TypeRef::named_list(actual_type_name)
223 }
224 } else {
225 // Use standard type ref for arrays of primitives
226 graphql_type_to_typeref(&field.field_type, field.is_required)
227 }
228 }
229 _ => {
230 // Use standard type ref for other types
231 graphql_type_to_typeref(&field.field_type, field.is_required)
232 }
233 };
234
235 // Add field with resolver
236 object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| {
237 let field_name = field_name.clone();
238 let field_type = field_type.clone();
239
240 FieldFuture::new(async move {
241 // Get parent container
242 let container = ctx.parent_value.try_downcast_ref::<NestedObjectContainer>()?;
243 let value = container.data.get(&field_name);
244
245 if let Some(val) = value {
246 if val.is_null() {
247 return Ok(None);
248 }
249
250 // For nested objects, wrap in container
251 if matches!(field_type, GraphQLType::Object(_)) {
252 let nested_container = NestedObjectContainer {
253 data: val.clone(),
254 };
255 return Ok(Some(FieldValue::owned_any(nested_container)));
256 }
257
258 // For arrays of objects, wrap each item
259 if let GraphQLType::Array(inner) = &field_type {
260 if matches!(inner.as_ref(), GraphQLType::Object(_)) {
261 if let Some(arr) = val.as_array() {
262 let containers: Vec<FieldValue> = arr
263 .iter()
264 .map(|item| {
265 let nested_container = NestedObjectContainer {
266 data: item.clone(),
267 };
268 FieldValue::owned_any(nested_container)
269 })
270 .collect();
271 return Ok(Some(FieldValue::list(containers)));
272 }
273 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
274 }
275 }
276
277 // For other types, return the GraphQL value
278 let graphql_val = json_to_graphql_value(val);
279 Ok(Some(FieldValue::value(graphql_val)))
280 } else {
281 Ok(None)
282 }
283 })
284 }));
285 }
286
287 // Store the generated type in registry
288 type_registry.insert(type_name.to_string(), object);
289 type_name.to_string()
290}
291
292/// Builds a dynamic GraphQL schema from lexicons for a given slice
293pub async fn build_graphql_schema(database: Database, slice_uri: String, auth_base_url: String) -> Result<Schema, String> {
294 // Fetch all lexicons for this slice
295 let all_lexicons = database
296 .get_lexicons_by_slice(&slice_uri)
297 .await
298 .map_err(|e| format!("Failed to load lexicons: {}", e))?;
299
300 // Deduplicate by NSID for schema building (keep most recent due to ORDER BY indexed_at DESC)
301 // This prevents duplicate type registration errors without hiding duplicates from users
302 let mut seen_nsids = std::collections::HashSet::new();
303 let lexicons: Vec<serde_json::Value> = all_lexicons
304 .into_iter()
305 .filter(|lexicon| {
306 if let Some(nsid) = lexicon.get("id").and_then(|n| n.as_str()) {
307 seen_nsids.insert(nsid.to_string())
308 } else {
309 true // Keep lexicons without an ID (will fail validation later)
310 }
311 })
312 .collect();
313
314 // Build Query root type and collect all object types
315 let mut query = Object::new("Query");
316 let mut objects_to_register = Vec::new();
317 let mut where_inputs_to_register = Vec::new();
318 let mut group_by_enums_to_register = Vec::new();
319 let mut mutation_inputs_to_register = Vec::new();
320
321 // First pass: collect metadata about all collections for cross-referencing
322 let mut all_collections: Vec<CollectionMeta> = Vec::new();
323 for lexicon in &lexicons {
324 let nsid = lexicon
325 .get("id")
326 .and_then(|n| n.as_str())
327 .ok_or_else(|| "Lexicon missing id".to_string())?;
328
329 let defs = lexicon
330 .get("defs")
331 .ok_or_else(|| format!("Lexicon {} missing defs", nsid))?;
332
333 let fields = extract_collection_fields(defs);
334 if !fields.is_empty() {
335 if let Some(key_type) = extract_record_key(defs) {
336 // Extract at-uri field names for reverse joins
337 let at_uri_fields: Vec<String> = fields
338 .iter()
339 .filter(|f| f.format.as_deref() == Some("at-uri"))
340 .map(|f| f.name.clone())
341 .collect();
342
343 if !at_uri_fields.is_empty() {
344 tracing::debug!("Collection {} has at-uri fields: {:?}", nsid, at_uri_fields);
345 }
346
347 all_collections.push(CollectionMeta {
348 nsid: nsid.to_string(),
349 key_type,
350 type_name: nsid_to_type_name(nsid),
351 at_uri_fields,
352 });
353 }
354 }
355 }
356
357 // Initialize type registry for nested object types
358 let mut type_registry: TypeRegistry = HashMap::new();
359
360 // Second pass: create types and queries
361 for lexicon in &lexicons {
362 // get_lexicons_by_slice returns {lexicon: 1, id: "nsid", defs: {...}}
363 let nsid = lexicon
364 .get("id")
365 .and_then(|n| n.as_str())
366 .ok_or_else(|| "Lexicon missing id".to_string())?;
367
368 let defs = lexicon
369 .get("defs")
370 .ok_or_else(|| format!("Lexicon {} missing defs", nsid))?
371 .clone();
372
373 // Extract fields from lexicon
374 let fields = extract_collection_fields(&defs);
375
376 if !fields.is_empty() {
377 // Create a GraphQL type for this collection
378 let type_name = nsid_to_type_name(nsid);
379 let record_type = create_record_type(
380 &type_name,
381 &fields,
382 database.clone(),
383 slice_uri.clone(),
384 &all_collections,
385 auth_base_url.clone(),
386 &mut type_registry,
387 &lexicons,
388 nsid,
389 );
390
391 // Create edge and connection types for this collection (Relay standard)
392 let edge_type = create_edge_type(&type_name);
393 let connection_type = create_connection_type(&type_name);
394
395 // Create WhereInput type for this collection
396 let mut where_input = InputObject::new(format!("{}WhereInput", type_name));
397
398 // Collect lexicon field names to avoid duplicates
399 let lexicon_field_names: std::collections::HashSet<&str> =
400 fields.iter().map(|f| f.name.as_str()).collect();
401
402 // Add system fields available on all records (skip if already in lexicon)
403 let system_fields = [
404 ("indexedAt", "DateTimeFilter"),
405 ("uri", "StringFilter"),
406 ("cid", "StringFilter"),
407 ("did", "StringFilter"),
408 ("collection", "StringFilter"),
409 ("actorHandle", "StringFilter"),
410 ];
411
412 for (field_name, filter_type) in system_fields {
413 if !lexicon_field_names.contains(field_name) {
414 where_input =
415 where_input.field(InputValue::new(field_name, TypeRef::named(filter_type)));
416 }
417 }
418
419 // Add fields from the lexicon
420 for field in &fields {
421 let filter_type = match field.field_type {
422 GraphQLType::Int => "IntFilter",
423 _ => "StringFilter", // Default to StringFilter for strings and other types
424 };
425 where_input =
426 where_input.field(InputValue::new(&field.name, TypeRef::named(filter_type)));
427 }
428
429 // Add generic json field for querying record content
430 where_input = where_input.field(InputValue::new("json", TypeRef::named("StringFilter")));
431
432 // Add nested and/or support
433 where_input = where_input
434 .field(InputValue::new(
435 "and",
436 TypeRef::named_list(format!("{}WhereInput", type_name)),
437 ))
438 .field(InputValue::new(
439 "or",
440 TypeRef::named_list(format!("{}WhereInput", type_name)),
441 ));
442
443 // Create GroupByField enum for this collection
444 let mut group_by_enum = Enum::new(format!("{}GroupByField", type_name));
445 group_by_enum = group_by_enum.item(EnumItem::new("indexedAt"));
446
447 for field in &fields {
448 group_by_enum = group_by_enum.item(EnumItem::new(&field.name));
449 }
450
451 // Create collection-specific GroupByFieldInput
452 let group_by_input = InputObject::new(format!("{}GroupByFieldInput", type_name))
453 .field(InputValue::new(
454 "field",
455 TypeRef::named_nn(format!("{}GroupByField", type_name)),
456 ))
457 .field(InputValue::new("interval", TypeRef::named("DateInterval")));
458
459 // Create collection-specific SortFieldInput
460 let sort_field_input = InputObject::new(format!("{}SortFieldInput", type_name))
461 .field(InputValue::new(
462 "field",
463 TypeRef::named_nn(format!("{}GroupByField", type_name)),
464 ))
465 .field(InputValue::new(
466 "direction",
467 TypeRef::named("SortDirection"),
468 ));
469
470 // Create mutation input type for this collection
471 let mutation_input = create_mutation_input_type(&type_name, &fields);
472
473 // Collect the types to register with schema later
474 objects_to_register.push(record_type);
475 objects_to_register.push(edge_type);
476 objects_to_register.push(connection_type);
477 where_inputs_to_register.push(where_input);
478 where_inputs_to_register.push(group_by_input);
479 where_inputs_to_register.push(sort_field_input);
480 group_by_enums_to_register.push(group_by_enum);
481 mutation_inputs_to_register.push(mutation_input);
482
483 // Add query field for this collection
484 let collection_query_name = nsid_to_query_name(nsid);
485 let db_clone = database.clone();
486 let slice_clone = slice_uri.clone();
487 let nsid_clone = nsid.to_string();
488
489 let connection_type_name = format!("{}Connection", &type_name);
490 query = query.field(
491 Field::new(
492 &collection_query_name,
493 TypeRef::named_nn(&connection_type_name),
494 move |ctx| {
495 let db = db_clone.clone();
496 let slice = slice_clone.clone();
497 let collection = nsid_clone.clone();
498
499 FieldFuture::new(async move {
500 // Get Relay-standard pagination arguments
501 let first: i32 = match ctx.args.get("first") {
502 Some(val) => val.i64().unwrap_or(50) as i32,
503 None => 50,
504 };
505
506 let after: Option<&str> = match ctx.args.get("after") {
507 Some(val) => val.string().ok(),
508 None => None,
509 };
510
511 // Parse sortBy argument
512 let sort_by: Option<Vec<crate::models::SortField>> = match ctx
513 .args
514 .get("sortBy")
515 {
516 Some(val) => {
517 if let Ok(list) = val.list() {
518 let mut sort_fields = Vec::new();
519 for item in list.iter() {
520 if let Ok(obj) = item.object() {
521 let field = obj
522 .get("field")
523 .and_then(|v| {
524 v.enum_name().ok().map(|s| s.to_string())
525 })
526 .unwrap_or_else(|| "indexedAt".to_string());
527 let direction = obj
528 .get("direction")
529 .and_then(|v| {
530 v.enum_name().ok().map(|s| s.to_string())
531 })
532 .unwrap_or_else(|| "desc".to_string());
533 sort_fields.push(crate::models::SortField {
534 field,
535 direction,
536 });
537 }
538 }
539 Some(sort_fields)
540 } else {
541 None
542 }
543 }
544 None => None,
545 };
546
547 // Build where clause for this collection
548 let mut where_clause = crate::models::WhereClause {
549 conditions: HashMap::new(),
550 or_conditions: None,
551 and: None,
552 or: None,
553 };
554
555 // Always filter by collection
556 where_clause.conditions.insert(
557 "collection".to_string(),
558 crate::models::WhereCondition {
559 gt: None,
560 gte: None,
561 lt: None,
562 lte: None,
563 eq: Some(serde_json::Value::String(collection.clone())),
564 in_values: None,
565 contains: None,
566 fuzzy: None,
567 },
568 );
569
570 // Parse where argument if provided
571 if let Some(where_val) = ctx.args.get("where") {
572 if let Ok(where_obj) = where_val.object() {
573 let parsed_where = parse_where_clause(where_obj);
574 // Merge parsed conditions with existing collection filter
575 where_clause.conditions.extend(parsed_where.conditions);
576 where_clause.or_conditions = parsed_where.or_conditions;
577 where_clause.and = parsed_where.and;
578 where_clause.or = parsed_where.or;
579 }
580 }
581
582 // Resolve actorHandle to did if present
583 if let Some(actor_handle_condition) =
584 where_clause.conditions.remove("actorHandle")
585 {
586 // Handle different filter types
587 let dids = if let Some(pattern) = &actor_handle_condition.contains {
588 // Pattern matching (contains)
589 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok()
590 } else if let Some(pattern) = &actor_handle_condition.fuzzy {
591 // Fuzzy matching
592 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok()
593 } else {
594 // Exact matching (eq / in_values)
595 let mut handles = Vec::new();
596 if let Some(eq_value) = &actor_handle_condition.eq {
597 if let Some(handle_str) = eq_value.as_str() {
598 handles.push(handle_str.to_string());
599 }
600 }
601 if let Some(in_values) = &actor_handle_condition.in_values {
602 for value in in_values {
603 if let Some(handle_str) = value.as_str() {
604 handles.push(handle_str.to_string());
605 }
606 }
607 }
608
609 if !handles.is_empty() {
610 db.resolve_handles_to_dids(&handles, &slice).await.ok()
611 } else {
612 None
613 }
614 };
615
616 // Replace actorHandle condition with did condition if we found matches
617 if let Some(dids) = dids {
618 if !dids.is_empty() {
619 let did_condition = if dids.len() == 1 {
620 crate::models::WhereCondition {
621 gt: None,
622 gte: None,
623 lt: None,
624 lte: None,
625 eq: Some(serde_json::Value::String(
626 dids[0].clone(),
627 )),
628 in_values: None,
629 contains: None,
630 fuzzy: None,
631 }
632 } else {
633 crate::models::WhereCondition {
634 gt: None,
635 gte: None,
636 lt: None,
637 lte: None,
638 eq: None,
639 in_values: Some(
640 dids.into_iter()
641 .map(|d| {
642 serde_json::Value::String(d)
643 })
644 .collect(),
645 ),
646 contains: None,
647 fuzzy: None,
648 }
649 };
650 where_clause
651 .conditions
652 .insert("did".to_string(), did_condition);
653 }
654 // If no DIDs found, the query will return 0 results naturally
655 }
656 }
657
658 // Resolve actorHandle to did in OR clauses
659 if let Some(or_clauses) = &mut where_clause.or {
660 for or_clause in or_clauses.iter_mut() {
661 if let Some(actor_handle_condition) =
662 or_clause.conditions.remove("actorHandle")
663 {
664 // Handle different filter types
665 let dids = if let Some(pattern) = &actor_handle_condition.contains {
666 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok()
667 } else if let Some(pattern) = &actor_handle_condition.fuzzy {
668 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok()
669 } else {
670 let mut handles = Vec::new();
671 if let Some(eq_value) = &actor_handle_condition.eq {
672 if let Some(handle_str) = eq_value.as_str() {
673 handles.push(handle_str.to_string());
674 }
675 }
676 if let Some(in_values) = &actor_handle_condition.in_values {
677 for value in in_values {
678 if let Some(handle_str) = value.as_str() {
679 handles.push(handle_str.to_string());
680 }
681 }
682 }
683 if !handles.is_empty() {
684 db.resolve_handles_to_dids(&handles, &slice).await.ok()
685 } else {
686 None
687 }
688 };
689
690 // Replace actorHandle with did condition
691 if let Some(dids) = dids {
692 if !dids.is_empty() {
693 let did_condition = if dids.len() == 1 {
694 crate::models::WhereCondition {
695 gt: None, gte: None, lt: None, lte: None,
696 eq: Some(serde_json::Value::String(dids[0].clone())),
697 in_values: None, contains: None, fuzzy: None,
698 }
699 } else {
700 crate::models::WhereCondition {
701 gt: None, gte: None, lt: None, lte: None, eq: None,
702 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
703 contains: None, fuzzy: None,
704 }
705 };
706 or_clause.conditions.insert("did".to_string(), did_condition);
707 }
708 }
709 }
710 }
711 }
712
713 // Query database for records
714 let (records, next_cursor) = db
715 .get_slice_collections_records(
716 &slice,
717 Some(first),
718 after,
719 sort_by.as_ref(),
720 Some(&where_clause),
721 )
722 .await
723 .map_err(|e| Error::new(format!("Database query failed: {}", e)))?;
724
725 // Query database for total count
726 let total_count = db
727 .count_slice_collections_records(&slice, Some(&where_clause))
728 .await
729 .map_err(|e| Error::new(format!("Count query failed: {}", e)))?
730 as i32;
731
732 // Convert records to RecordContainers
733 let record_containers: Vec<RecordContainer> = records
734 .into_iter()
735 .map(|record| {
736 // Convert Record to IndexedRecord
737 let indexed_record = crate::models::IndexedRecord {
738 uri: record.uri,
739 cid: record.cid,
740 did: record.did,
741 collection: record.collection,
742 value: record.json,
743 indexed_at: record.indexed_at.to_rfc3339(),
744 };
745 RecordContainer {
746 record: indexed_record,
747 }
748 })
749 .collect();
750
751 // Build Connection data
752 let connection_data = ConnectionData {
753 total_count,
754 has_next_page: next_cursor.is_some(),
755 end_cursor: next_cursor,
756 nodes: record_containers,
757 };
758
759 Ok(Some(FieldValue::owned_any(connection_data)))
760 })
761 },
762 )
763 .argument(async_graphql::dynamic::InputValue::new(
764 "first",
765 TypeRef::named(TypeRef::INT),
766 ))
767 .argument(async_graphql::dynamic::InputValue::new(
768 "after",
769 TypeRef::named(TypeRef::STRING),
770 ))
771 .argument(async_graphql::dynamic::InputValue::new(
772 "last",
773 TypeRef::named(TypeRef::INT),
774 ))
775 .argument(async_graphql::dynamic::InputValue::new(
776 "before",
777 TypeRef::named(TypeRef::STRING),
778 ))
779 .argument(async_graphql::dynamic::InputValue::new(
780 "sortBy",
781 TypeRef::named_list(format!("{}SortFieldInput", type_name)),
782 ))
783 .argument(async_graphql::dynamic::InputValue::new(
784 "where",
785 TypeRef::named(format!("{}WhereInput", type_name)),
786 ))
787 .description(format!("Query {} records", nsid)),
788 );
789
790 // Add aggregated query field for this collection
791 let aggregated_query_name = format!("{}Aggregated", collection_query_name);
792 let aggregated_type_name = format!("{}Aggregated", &type_name);
793
794 // Create aggregated type
795 let aggregated_type = create_aggregated_type(&aggregated_type_name, &fields);
796 objects_to_register.push(aggregated_type);
797
798 let db_clone_agg = database.clone();
799 let slice_clone_agg = slice_uri.clone();
800 let nsid_clone_agg = nsid.to_string();
801
802 query = query.field(
803 Field::new(
804 &aggregated_query_name,
805 TypeRef::named_nn_list_nn(&aggregated_type_name),
806 move |ctx| {
807 let db = db_clone_agg.clone();
808 let slice = slice_clone_agg.clone();
809 let collection = nsid_clone_agg.clone();
810
811 FieldFuture::new(async move {
812 // Parse groupBy argument
813 let group_by_fields: Vec<crate::models::GroupByField> = match ctx.args.get("groupBy") {
814 Some(val) => {
815 if let Ok(list) = val.list() {
816 let mut fields = Vec::new();
817 for item in list.iter() {
818 if let Ok(obj) = item.object() {
819 // Get field name from enum
820 let field_name = obj.get("field")
821 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
822 .ok_or_else(|| Error::new("Missing field name in groupBy"))?;
823
824 // Get optional interval
825 if let Some(interval_val) = obj.get("interval") {
826 if let Ok(interval_str) = interval_val.enum_name() {
827 // Parse interval string to DateInterval
828 let interval = match interval_str {
829 "second" => crate::models::DateInterval::Second,
830 "minute" => crate::models::DateInterval::Minute,
831 "hour" => crate::models::DateInterval::Hour,
832 "day" => crate::models::DateInterval::Day,
833 "week" => crate::models::DateInterval::Week,
834 "month" => crate::models::DateInterval::Month,
835 "quarter" => crate::models::DateInterval::Quarter,
836 "year" => crate::models::DateInterval::Year,
837 _ => return Err(Error::new(format!("Invalid interval: {}", interval_str))),
838 };
839 fields.push(crate::models::GroupByField::Truncated {
840 field: field_name,
841 interval,
842 });
843 } else {
844 return Err(Error::new("Invalid interval value"));
845 }
846 } else {
847 // No interval, simple field
848 fields.push(crate::models::GroupByField::Simple(field_name));
849 }
850 } else {
851 return Err(Error::new("Invalid groupBy item"));
852 }
853 }
854 fields
855 } else {
856 Vec::new()
857 }
858 }
859 None => Vec::new(),
860 };
861
862 if group_by_fields.is_empty() {
863 return Err(Error::new("groupBy is required for aggregated queries"));
864 }
865
866 // Parse limit argument
867 let limit: i32 = match ctx.args.get("limit") {
868 Some(val) => val.i64().unwrap_or(50) as i32,
869 None => 50,
870 };
871
872 // Parse orderBy argument
873 let order_by_count: Option<String> = match ctx.args.get("orderBy") {
874 Some(val) => {
875 if let Ok(obj) = val.object() {
876 obj.get("count")
877 .and_then(|v| v.string().ok())
878 .map(|s| s.to_string())
879 } else {
880 None
881 }
882 }
883 None => None,
884 };
885
886 // Build where clause for this collection
887 let mut where_clause = crate::models::WhereClause {
888 conditions: HashMap::new(),
889 or_conditions: None,
890 and: None,
891 or: None,
892 };
893
894 // Always filter by collection
895 where_clause.conditions.insert(
896 "collection".to_string(),
897 crate::models::WhereCondition {
898 gt: None,
899 gte: None,
900 lt: None,
901 lte: None,
902 eq: Some(serde_json::Value::String(collection.clone())),
903 in_values: None,
904 contains: None,
905 fuzzy: None,
906 },
907 );
908
909 // Parse where argument if provided
910 if let Some(where_val) = ctx.args.get("where") {
911 if let Ok(where_obj) = where_val.object() {
912 let parsed_where = parse_where_clause(where_obj);
913 // Merge parsed conditions with existing collection filter
914 where_clause.conditions.extend(parsed_where.conditions);
915 where_clause.or_conditions = parsed_where.or_conditions;
916 where_clause.and = parsed_where.and;
917 where_clause.or = parsed_where.or;
918 }
919 }
920
921 // Resolve actorHandle to did if present
922 if let Some(actor_handle_condition) = where_clause.conditions.remove("actorHandle") {
923 // Handle different filter types
924 let dids = if let Some(pattern) = &actor_handle_condition.contains {
925 // Pattern matching (contains)
926 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok()
927 } else if let Some(pattern) = &actor_handle_condition.fuzzy {
928 // Fuzzy matching
929 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok()
930 } else {
931 // Exact matching (eq / in_values)
932 let mut handles = Vec::new();
933 if let Some(eq_value) = &actor_handle_condition.eq {
934 if let Some(handle_str) = eq_value.as_str() {
935 handles.push(handle_str.to_string());
936 }
937 }
938 if let Some(in_values) = &actor_handle_condition.in_values {
939 for value in in_values {
940 if let Some(handle_str) = value.as_str() {
941 handles.push(handle_str.to_string());
942 }
943 }
944 }
945
946 if !handles.is_empty() {
947 db.resolve_handles_to_dids(&handles, &slice).await.ok()
948 } else {
949 None
950 }
951 };
952
953 // Replace actorHandle condition with did condition if we found matches
954 if let Some(dids) = dids {
955 if !dids.is_empty() {
956 let did_condition = if dids.len() == 1 {
957 crate::models::WhereCondition {
958 gt: None,
959 gte: None,
960 lt: None,
961 lte: None,
962 eq: Some(serde_json::Value::String(dids[0].clone())),
963 in_values: None,
964 contains: None,
965 fuzzy: None,
966 }
967 } else {
968 crate::models::WhereCondition {
969 gt: None,
970 gte: None,
971 lt: None,
972 lte: None,
973 eq: None,
974 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
975 contains: None,
976 fuzzy: None,
977 }
978 };
979 where_clause.conditions.insert("did".to_string(), did_condition);
980 }
981 // If no DIDs found, the query will return 0 results naturally
982 }
983 }
984
985 // Resolve actorHandle to did in OR clauses
986 if let Some(or_clauses) = &mut where_clause.or {
987 for or_clause in or_clauses.iter_mut() {
988 if let Some(actor_handle_condition) =
989 or_clause.conditions.remove("actorHandle")
990 {
991 // Handle different filter types
992 let dids = if let Some(pattern) = &actor_handle_condition.contains {
993 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok()
994 } else if let Some(pattern) = &actor_handle_condition.fuzzy {
995 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok()
996 } else {
997 let mut handles = Vec::new();
998 if let Some(eq_value) = &actor_handle_condition.eq {
999 if let Some(handle_str) = eq_value.as_str() {
1000 handles.push(handle_str.to_string());
1001 }
1002 }
1003 if let Some(in_values) = &actor_handle_condition.in_values {
1004 for value in in_values {
1005 if let Some(handle_str) = value.as_str() {
1006 handles.push(handle_str.to_string());
1007 }
1008 }
1009 }
1010 if !handles.is_empty() {
1011 db.resolve_handles_to_dids(&handles, &slice).await.ok()
1012 } else {
1013 None
1014 }
1015 };
1016
1017 // Replace actorHandle with did condition
1018 if let Some(dids) = dids {
1019 if !dids.is_empty() {
1020 let did_condition = if dids.len() == 1 {
1021 crate::models::WhereCondition {
1022 gt: None, gte: None, lt: None, lte: None,
1023 eq: Some(serde_json::Value::String(dids[0].clone())),
1024 in_values: None, contains: None, fuzzy: None,
1025 }
1026 } else {
1027 crate::models::WhereCondition {
1028 gt: None, gte: None, lt: None, lte: None, eq: None,
1029 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
1030 contains: None, fuzzy: None,
1031 }
1032 };
1033 or_clause.conditions.insert("did".to_string(), did_condition);
1034 }
1035 }
1036 }
1037 }
1038 }
1039
1040 // Query database for aggregated records
1041 let results = db
1042 .get_aggregated_records(
1043 &slice,
1044 &group_by_fields,
1045 Some(&where_clause),
1046 order_by_count.as_deref(),
1047 Some(limit),
1048 )
1049 .await
1050 .map_err(|e| {
1051 Error::new(format!("Aggregation query failed: {}", e))
1052 })?;
1053
1054 // Convert JSON values to GraphQL values
1055 let field_values: Vec<FieldValue<'_>> = results
1056 .into_iter()
1057 .map(|json_val| FieldValue::owned_any(json_val))
1058 .collect();
1059
1060 Ok(Some(FieldValue::list(field_values)))
1061 })
1062 },
1063 )
1064 .argument(async_graphql::dynamic::InputValue::new(
1065 "groupBy",
1066 TypeRef::named_nn_list(format!("{}GroupByFieldInput", type_name)),
1067 ))
1068 .argument(async_graphql::dynamic::InputValue::new(
1069 "where",
1070 TypeRef::named(format!("{}WhereInput", type_name)),
1071 ))
1072 .argument(async_graphql::dynamic::InputValue::new(
1073 "orderBy",
1074 TypeRef::named("AggregationOrderBy"),
1075 ))
1076 .argument(async_graphql::dynamic::InputValue::new(
1077 "limit",
1078 TypeRef::named(TypeRef::INT),
1079 ))
1080 .description(format!("Aggregated query for {} records with GROUP BY support", nsid)),
1081 );
1082 }
1083 }
1084
1085 // Add jetstreamLogs query to root query type
1086 query = add_jetstream_logs_query(query, database.clone(), slice_uri.clone());
1087
1088 // Add sync job queries
1089 query = add_sync_job_query(query);
1090 query = add_sync_jobs_query(query, slice_uri.clone());
1091 query = add_sync_job_logs_query(query);
1092 query = add_get_sync_summary_query(query);
1093
1094 // Add sparklines query
1095 query = add_sparklines_query(query);
1096
1097 // Add slice records query
1098 query = add_slice_records_query(query, database.clone());
1099
1100 // Add OAuth clients query
1101 query = add_oauth_clients_query(query, slice_uri.clone(), auth_base_url.clone());
1102
1103 // Build Mutation type
1104 let mut mutation = create_mutation_type(database.clone(), slice_uri.clone(), &lexicons);
1105
1106 // Add startSync mutation
1107 mutation = add_start_sync_mutation(mutation, slice_uri.clone());
1108
1109 // Add cancelJob mutation
1110 mutation = add_cancel_job_mutation(mutation);
1111
1112 // Add deleteJob mutation
1113 mutation = add_delete_job_mutation(mutation);
1114
1115 // Add uploadBlob mutation
1116 mutation = add_upload_blob_mutation(mutation, auth_base_url.clone());
1117
1118 // Add OAuth client mutations
1119 mutation = add_create_oauth_client_mutation(mutation, auth_base_url.clone());
1120 mutation = add_update_oauth_client_mutation(mutation, auth_base_url.clone());
1121 mutation = add_delete_oauth_client_mutation(mutation, auth_base_url.clone());
1122
1123 // Add deleteSliceRecords mutation
1124 mutation = add_delete_slice_records_mutation(mutation, slice_uri.clone());
1125
1126 // Build Subscription type with collection-specific subscriptions
1127 let mut subscription = create_subscription_type(slice_uri.clone(), &lexicons);
1128
1129 // Add jetstreamLogsCreated subscription
1130 subscription = add_jetstream_logs_subscription(subscription, slice_uri.clone());
1131
1132 // Add syncJobUpdated subscription
1133 subscription = add_sync_job_subscription(subscription);
1134
1135 // Build and return the schema with complexity limits
1136 let mut schema_builder = Schema::build(
1137 query.type_name(),
1138 Some(mutation.type_name()),
1139 Some(subscription.type_name()),
1140 )
1141 .register(query)
1142 .register(mutation)
1143 .register(subscription)
1144 .limit_depth(50) // Higher limit to support GraphiQL introspection with reverse joins
1145 .limit_complexity(5000); // Prevent expensive deeply nested queries
1146
1147 // Register JSON scalar type for complex fields
1148 let json_scalar = Scalar::new("JSON");
1149 schema_builder = schema_builder.register(json_scalar);
1150
1151 // Register filter input types for WHERE clauses
1152 let string_filter = InputObject::new("StringFilter")
1153 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
1154 .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING)))
1155 .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING)))
1156 .field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING)))
1157 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
1158 .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING)))
1159 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
1160 .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING)));
1161 schema_builder = schema_builder.register(string_filter);
1162
1163 let int_filter = InputObject::new("IntFilter")
1164 .field(InputValue::new("eq", TypeRef::named(TypeRef::INT)))
1165 .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT)))
1166 .field(InputValue::new("gt", TypeRef::named(TypeRef::INT)))
1167 .field(InputValue::new("gte", TypeRef::named(TypeRef::INT)))
1168 .field(InputValue::new("lt", TypeRef::named(TypeRef::INT)))
1169 .field(InputValue::new("lte", TypeRef::named(TypeRef::INT)));
1170 schema_builder = schema_builder.register(int_filter);
1171
1172 let datetime_filter = InputObject::new("DateTimeFilter")
1173 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
1174 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
1175 .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING)))
1176 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
1177 .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING)));
1178 schema_builder = schema_builder.register(datetime_filter);
1179
1180 // Register Blob type
1181 let blob_type = create_blob_type();
1182 schema_builder = schema_builder.register(blob_type);
1183
1184 // Register SyncResult type for mutations
1185 let sync_result_type = create_sync_result_type();
1186 schema_builder = schema_builder.register(sync_result_type);
1187
1188 // Register BlobUploadResponse type for blob upload mutation
1189 let blob_upload_response_type = create_blob_upload_response_type();
1190 schema_builder = schema_builder.register(blob_upload_response_type);
1191
1192 // Register OAuthClient type for OAuth operations
1193 let oauth_client_type = create_oauth_client_type();
1194 schema_builder = schema_builder.register(oauth_client_type);
1195
1196 // Register MutationResponse type for dynamic mutations
1197 let mutation_response_type = create_mutation_response_type();
1198 schema_builder = schema_builder.register(mutation_response_type);
1199
1200 // Register SortDirection enum
1201 let sort_direction_enum = create_sort_direction_enum();
1202 schema_builder = schema_builder.register(sort_direction_enum);
1203
1204 // Register SortField input type
1205 let sort_field_input = create_sort_field_input();
1206 schema_builder = schema_builder.register(sort_field_input);
1207
1208 // Register condition input types for where clauses
1209 let string_condition_input = create_string_condition_input();
1210 schema_builder = schema_builder.register(string_condition_input);
1211
1212 let int_condition_input = create_int_condition_input();
1213 schema_builder = schema_builder.register(int_condition_input);
1214
1215 // Register AggregationOrderBy input type
1216 let aggregation_order_by_input = create_aggregation_order_by_input();
1217 schema_builder = schema_builder.register(aggregation_order_by_input);
1218
1219 // Register DateInterval enum for date truncation
1220 let date_interval_enum = create_date_interval_enum();
1221 schema_builder = schema_builder.register(date_interval_enum);
1222
1223 // Register PageInfo type
1224 let page_info_type = create_page_info_type();
1225 schema_builder = schema_builder.register(page_info_type);
1226
1227 // Register RecordUpdate type for subscriptions
1228 let record_update_type = create_record_update_type();
1229 schema_builder = schema_builder.register(record_update_type);
1230
1231 // Register JetstreamLogEntry type
1232 let jetstream_log_entry_type = create_jetstream_log_entry_type();
1233 schema_builder = schema_builder.register(jetstream_log_entry_type);
1234
1235 // Register SyncJob and SyncJobResult types
1236 let sync_job_type = create_sync_job_type();
1237 schema_builder = schema_builder.register(sync_job_type);
1238
1239 let sync_job_result_type = create_sync_job_result_type();
1240 schema_builder = schema_builder.register(sync_job_result_type);
1241
1242 // Register StartSyncOutput type (created via schema_ext)
1243 let start_sync_output = create_start_sync_output_type();
1244 schema_builder = schema_builder.register(start_sync_output);
1245
1246 // Register SyncSummary and CollectionSummary types
1247 let sync_summary_type = create_sync_summary_type();
1248 schema_builder = schema_builder.register(sync_summary_type);
1249
1250 let collection_summary_type = create_collection_summary_type();
1251 schema_builder = schema_builder.register(collection_summary_type);
1252
1253 // Register Sparklines types
1254 let sparkline_point_type = create_sparkline_point_type();
1255 schema_builder = schema_builder.register(sparkline_point_type);
1256
1257 let slice_sparkline_type = create_slice_sparkline_type();
1258 schema_builder = schema_builder.register(slice_sparkline_type);
1259
1260 // Register Stats types
1261 let collection_stats_type = create_collection_stats_type();
1262 schema_builder = schema_builder.register(collection_stats_type);
1263
1264 let slice_stats_type = create_slice_stats_type();
1265 schema_builder = schema_builder.register(slice_stats_type);
1266
1267 // Register Records types
1268 let slice_record_type = create_slice_record_type();
1269 schema_builder = schema_builder.register(slice_record_type);
1270
1271 let slice_record_edge_type = create_slice_record_edge_type();
1272 schema_builder = schema_builder.register(slice_record_edge_type);
1273
1274 let slice_records_connection_type = create_slice_records_connection_type();
1275 schema_builder = schema_builder.register(slice_records_connection_type);
1276
1277 let slice_records_where_input = create_slice_records_where_input();
1278 schema_builder = schema_builder.register(slice_records_where_input);
1279
1280 // Register DeleteSliceRecordsOutput type
1281 let delete_slice_records_output_type = create_delete_slice_records_output_type();
1282 schema_builder = schema_builder.register(delete_slice_records_output_type);
1283
1284 // Register all object types
1285 for obj in objects_to_register {
1286 schema_builder = schema_builder.register(obj);
1287 }
1288
1289 // Register all WhereInput types
1290 for where_input in where_inputs_to_register {
1291 schema_builder = schema_builder.register(where_input);
1292 }
1293
1294 // Register all GroupByField enums
1295 for group_by_enum in group_by_enums_to_register {
1296 schema_builder = schema_builder.register(group_by_enum);
1297 }
1298
1299 // Register all mutation input types
1300 for mutation_input in mutation_inputs_to_register {
1301 schema_builder = schema_builder.register(mutation_input);
1302 }
1303
1304 // Register all nested object types from the type registry
1305 for (_, nested_type) in type_registry {
1306 schema_builder = schema_builder.register(nested_type);
1307 }
1308
1309 schema_builder
1310 .finish()
1311 .map_err(|e| format!("Schema build error: {:?}", e))
1312}
1313
1314/// Container to hold record data for resolvers
1315#[derive(Clone)]
1316pub struct RecordContainer {
1317 pub record: crate::models::IndexedRecord,
1318}
1319
1320/// Container to hold blob data and DID for URL generation
1321#[derive(Clone)]
1322pub struct BlobContainer {
1323 pub blob_ref: String, // CID reference
1324 pub mime_type: String, // MIME type
1325 pub size: i64, // Size in bytes
1326 pub did: String, // DID for CDN URL generation
1327}
1328
1329/// Creates a GraphQL Object type for a record collection
1330fn create_record_type(
1331 type_name: &str,
1332 fields: &[GraphQLField],
1333 database: Database,
1334 slice_uri: String,
1335 all_collections: &[CollectionMeta],
1336 auth_base_url: String,
1337 type_registry: &mut TypeRegistry,
1338 all_lexicons: &[serde_json::Value],
1339 lexicon_nsid: &str,
1340) -> Object {
1341 let mut object = Object::new(type_name);
1342
1343 // Check which field names exist in lexicon to avoid conflicts
1344 let lexicon_field_names: std::collections::HashSet<&str> =
1345 fields.iter().map(|f| f.name.as_str()).collect();
1346
1347 // Add global ID field for Relay (base64 encoded typename:uri)
1348 // Only add if lexicon doesn't already have an "id" field
1349 if !lexicon_field_names.contains("id") {
1350 let type_name_for_id = type_name.to_string();
1351 object = object.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), move |ctx| {
1352 let typename = type_name_for_id.clone();
1353 FieldFuture::new(async move {
1354 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1355 let global_id = format!("{}:{}", typename, container.record.uri);
1356 let encoded_id = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, global_id.as_bytes());
1357 Ok(Some(GraphQLValue::from(encoded_id)))
1358 })
1359 }));
1360 }
1361
1362 // Add standard AT Protocol fields only if they don't conflict with lexicon fields
1363 if !lexicon_field_names.contains("uri") {
1364 object = object.field(Field::new(
1365 "uri",
1366 TypeRef::named_nn(TypeRef::STRING),
1367 |ctx| {
1368 FieldFuture::new(async move {
1369 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1370 Ok(Some(GraphQLValue::from(container.record.uri.clone())))
1371 })
1372 },
1373 ));
1374 }
1375
1376 if !lexicon_field_names.contains("cid") {
1377 object = object.field(Field::new(
1378 "cid",
1379 TypeRef::named_nn(TypeRef::STRING),
1380 |ctx| {
1381 FieldFuture::new(async move {
1382 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1383 Ok(Some(GraphQLValue::from(container.record.cid.clone())))
1384 })
1385 },
1386 ));
1387 }
1388
1389 if !lexicon_field_names.contains("did") {
1390 object = object.field(Field::new(
1391 "did",
1392 TypeRef::named_nn(TypeRef::STRING),
1393 |ctx| {
1394 FieldFuture::new(async move {
1395 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1396 Ok(Some(GraphQLValue::from(container.record.did.clone())))
1397 })
1398 },
1399 ));
1400 }
1401
1402 if !lexicon_field_names.contains("indexedAt") {
1403 object = object.field(Field::new(
1404 "indexedAt",
1405 TypeRef::named_nn(TypeRef::STRING),
1406 |ctx| {
1407 FieldFuture::new(async move {
1408 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1409 Ok(Some(GraphQLValue::from(
1410 container.record.indexed_at.clone(),
1411 )))
1412 })
1413 },
1414 ));
1415 }
1416
1417 // Add actor metadata field (handle from actors table)
1418 // Always add as "actorHandle" to avoid conflicts with lexicon fields
1419 let db_for_actor = database.clone();
1420 let slice_for_actor = slice_uri.clone();
1421 object = object.field(Field::new(
1422 "actorHandle",
1423 TypeRef::named(TypeRef::STRING),
1424 move |ctx| {
1425 let db = db_for_actor.clone();
1426 let slice = slice_for_actor.clone();
1427 FieldFuture::new(async move {
1428 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1429 let did = &container.record.did;
1430
1431 // Build where clause to find actor by DID
1432 let mut where_clause = crate::models::WhereClause {
1433 conditions: std::collections::HashMap::new(),
1434 or_conditions: None,
1435 and: None,
1436 or: None,
1437 };
1438 where_clause.conditions.insert(
1439 "did".to_string(),
1440 crate::models::WhereCondition {
1441 gt: None,
1442 gte: None,
1443 lt: None,
1444 lte: None,
1445 eq: Some(serde_json::Value::String(did.clone())),
1446 in_values: None,
1447 contains: None,
1448 fuzzy: None,
1449 },
1450 );
1451
1452 match db
1453 .get_slice_actors(&slice, Some(1), None, Some(&where_clause))
1454 .await
1455 {
1456 Ok((actors, _cursor)) => {
1457 if let Some(actor) = actors.first() {
1458 if let Some(handle) = &actor.handle {
1459 Ok(Some(GraphQLValue::from(handle.clone())))
1460 } else {
1461 Ok(None)
1462 }
1463 } else {
1464 Ok(None)
1465 }
1466 }
1467 Err(e) => {
1468 tracing::debug!("Actor not found for {}: {}", did, e);
1469 Ok(None)
1470 }
1471 }
1472 })
1473 },
1474 ));
1475
1476 // Add fields from lexicon
1477 for field in fields {
1478 let field_name = field.name.clone();
1479 let field_name_for_field = field_name.clone(); // Need separate clone for Field::new
1480 let field_type = field.field_type.clone();
1481 let db_clone = database.clone();
1482
1483 // Determine type ref - handle nested objects and lexicon refs specially
1484 let type_ref = match &field.field_type {
1485 GraphQLType::LexiconRef(ref_nsid) => {
1486 // Resolve lexicon ref and generate type for it
1487 let resolved_type_name = resolve_lexicon_ref_type(
1488 ref_nsid,
1489 lexicon_nsid,
1490 all_lexicons,
1491 type_registry,
1492 &database,
1493 );
1494
1495 if field.is_required {
1496 TypeRef::named_nn(resolved_type_name)
1497 } else {
1498 TypeRef::named(resolved_type_name)
1499 }
1500 }
1501 GraphQLType::Object(nested_fields) => {
1502 // Generate nested object type
1503 let nested_type_name = generate_nested_type_name(type_name, &field_name);
1504 let actual_type_name = generate_nested_object_type(
1505 &nested_type_name,
1506 nested_fields,
1507 type_registry,
1508 &database,
1509 );
1510
1511 if field.is_required {
1512 TypeRef::named_nn(actual_type_name)
1513 } else {
1514 TypeRef::named(actual_type_name)
1515 }
1516 }
1517 GraphQLType::Array(inner) => {
1518 match inner.as_ref() {
1519 GraphQLType::LexiconRef(ref_nsid) => {
1520 // Resolve lexicon ref for array items
1521 let resolved_type_name = resolve_lexicon_ref_type(
1522 ref_nsid,
1523 lexicon_nsid,
1524 all_lexicons,
1525 type_registry,
1526 &database,
1527 );
1528
1529 if field.is_required {
1530 TypeRef::named_nn_list(resolved_type_name)
1531 } else {
1532 TypeRef::named_list(resolved_type_name)
1533 }
1534 }
1535 GraphQLType::Object(nested_fields) => {
1536 // Generate nested object type for array items
1537 let nested_type_name = generate_nested_type_name(type_name, &field_name);
1538 let actual_type_name = generate_nested_object_type(
1539 &nested_type_name,
1540 nested_fields,
1541 type_registry,
1542 &database,
1543 );
1544
1545 if field.is_required {
1546 TypeRef::named_nn_list(actual_type_name)
1547 } else {
1548 TypeRef::named_list(actual_type_name)
1549 }
1550 }
1551 _ => graphql_type_to_typeref(&field.field_type, field.is_required),
1552 }
1553 }
1554 _ => graphql_type_to_typeref(&field.field_type, field.is_required),
1555 };
1556
1557 object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| {
1558 let field_name = field_name.clone();
1559 let field_type = field_type.clone();
1560 let db = db_clone.clone();
1561
1562 FieldFuture::new(async move {
1563 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1564 let value = container.record.value.get(&field_name);
1565
1566 if let Some(val) = value {
1567 // Check for explicit null value
1568 if val.is_null() {
1569 return Ok(None);
1570 }
1571
1572 // Check if this is an array of blobs
1573 if let GraphQLType::Array(inner) = &field_type {
1574 if matches!(inner.as_ref(), GraphQLType::Blob) {
1575 if let Some(arr) = val.as_array() {
1576 let blob_containers: Vec<FieldValue> = arr
1577 .iter()
1578 .filter_map(|blob_val| {
1579 let obj = blob_val.as_object()?;
1580 let blob_ref = obj
1581 .get("ref")
1582 .and_then(|r| r.as_object())
1583 .and_then(|r| r.get("$link"))
1584 .and_then(|l| l.as_str())
1585 .unwrap_or("")
1586 .to_string();
1587
1588 let mime_type = obj
1589 .get("mimeType")
1590 .and_then(|m| m.as_str())
1591 .unwrap_or("image/jpeg")
1592 .to_string();
1593
1594 let size =
1595 obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0);
1596
1597 let blob_container = BlobContainer {
1598 blob_ref,
1599 mime_type,
1600 size,
1601 did: container.record.did.clone(),
1602 };
1603
1604 Some(FieldValue::owned_any(blob_container))
1605 })
1606 .collect();
1607
1608 return Ok(Some(FieldValue::list(blob_containers)));
1609 }
1610
1611 // If not a proper array, return empty list
1612 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
1613 }
1614 }
1615
1616 // Check if this is a blob field
1617 if matches!(field_type, GraphQLType::Blob) {
1618 // Extract blob fields from JSON object
1619 if let Some(obj) = val.as_object() {
1620 let blob_ref = obj
1621 .get("ref")
1622 .and_then(|r| r.as_object())
1623 .and_then(|r| r.get("$link"))
1624 .and_then(|l| l.as_str())
1625 .unwrap_or("")
1626 .to_string();
1627
1628 let mime_type = obj
1629 .get("mimeType")
1630 .and_then(|m| m.as_str())
1631 .unwrap_or("image/jpeg")
1632 .to_string();
1633
1634 let size = obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0);
1635
1636 let blob_container = BlobContainer {
1637 blob_ref,
1638 mime_type,
1639 size,
1640 did: container.record.did.clone(),
1641 };
1642
1643 return Ok(Some(FieldValue::owned_any(blob_container)));
1644 }
1645
1646 // If not a proper blob object, return None (field is null)
1647 return Ok(None);
1648 }
1649
1650 // Check if this is a reference field that needs joining
1651 if matches!(field_type, GraphQLType::Ref) {
1652 // Extract URI from strongRef and fetch the linked record
1653 if let Some(uri) =
1654 crate::graphql::dataloaders::extract_uri_from_strong_ref(val)
1655 {
1656 match db.get_record(&uri).await {
1657 Ok(Some(linked_record)) => {
1658 // Convert the linked record to a JSON value
1659 let record_json =
1660 serde_json::to_value(linked_record).map_err(|e| {
1661 Error::new(format!("Serialization error: {}", e))
1662 })?;
1663
1664 // Convert serde_json::Value to async_graphql::Value
1665 let graphql_val = json_to_graphql_value(&record_json);
1666 return Ok(Some(FieldValue::value(graphql_val)));
1667 }
1668 Ok(None) => {
1669 return Ok(None);
1670 }
1671 Err(e) => {
1672 tracing::error!("Error fetching linked record: {}", e);
1673 return Ok(None);
1674 }
1675 }
1676 }
1677 }
1678
1679 // Check if this is a lexicon ref field
1680 if matches!(field_type, GraphQLType::LexiconRef(_)) {
1681 let nested_container = NestedObjectContainer {
1682 data: val.clone(),
1683 };
1684 return Ok(Some(FieldValue::owned_any(nested_container)));
1685 }
1686
1687 // Check if this is a nested object field
1688 if matches!(field_type, GraphQLType::Object(_)) {
1689 let nested_container = NestedObjectContainer {
1690 data: val.clone(),
1691 };
1692 return Ok(Some(FieldValue::owned_any(nested_container)));
1693 }
1694
1695 // Check if this is an array of nested objects or lexicon refs
1696 if let GraphQLType::Array(inner) = &field_type {
1697 if matches!(inner.as_ref(), GraphQLType::LexiconRef(_)) || matches!(inner.as_ref(), GraphQLType::Object(_)) {
1698 if let Some(arr) = val.as_array() {
1699 let containers: Vec<FieldValue> = arr
1700 .iter()
1701 .map(|item| {
1702 let nested_container = NestedObjectContainer {
1703 data: item.clone(),
1704 };
1705 FieldValue::owned_any(nested_container)
1706 })
1707 .collect();
1708 return Ok(Some(FieldValue::list(containers)));
1709 }
1710 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
1711 }
1712 }
1713
1714 // For non-ref, non-object fields, return the raw JSON value
1715 let graphql_val = json_to_graphql_value(val);
1716 Ok(Some(FieldValue::value(graphql_val)))
1717 } else {
1718 Ok(None)
1719 }
1720 })
1721 }));
1722 }
1723
1724 // Add join fields for cross-referencing other collections
1725 for collection in all_collections {
1726 let field_name = nsid_to_join_field_name(&collection.nsid);
1727
1728 // Skip if this would conflict with existing field
1729 if lexicon_field_names.contains(field_name.as_str()) {
1730 continue;
1731 }
1732
1733 // Skip self-referencing join fields (collection referencing itself)
1734 if collection.type_name == type_name {
1735 continue;
1736 }
1737
1738 // Collect all string fields with format "at-uri" that might reference this collection
1739 // We'll check each one at runtime to see if it contains a URI to this collection
1740 let uri_ref_fields: Vec<_> = fields
1741 .iter()
1742 .filter(|f| matches!(f.format.as_deref(), Some("at-uri")))
1743 .collect();
1744
1745 let collection_nsid = collection.nsid.clone();
1746 let key_type = collection.key_type.clone();
1747 let db_for_join = database.clone();
1748
1749 // If we found at-uri fields, create a resolver that checks each one at runtime
1750 // But skip this for literal:self collections - they should use DID-based joins instead
1751 if !uri_ref_fields.is_empty() && key_type != "literal:self" {
1752 let ref_field_names: Vec<String> =
1753 uri_ref_fields.iter().map(|f| f.name.clone()).collect();
1754 let db_for_uri_join = database.clone();
1755 let target_collection = collection_nsid.clone();
1756
1757 object = object.field(Field::new(
1758 &field_name,
1759 TypeRef::named(&collection.type_name),
1760 move |ctx| {
1761 let db = db_for_uri_join.clone();
1762 let field_names = ref_field_names.clone();
1763 let expected_collection = target_collection.clone();
1764 FieldFuture::new(async move {
1765 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1766
1767 // Try each at-uri field to find one that references this collection
1768 for field_name in &field_names {
1769 if let Some(uri_value) = container.record.value.get(field_name) {
1770 if let Some(uri) = uri_value.as_str() {
1771 // Check if the URI is for the expected collection
1772 if uri.contains(&format!("/{}/", expected_collection)) {
1773 // Fetch the record at this URI
1774 match db.get_record(uri).await {
1775 Ok(Some(record)) => {
1776 let new_container = RecordContainer { record };
1777 return Ok(Some(FieldValue::owned_any(
1778 new_container,
1779 )));
1780 }
1781 Ok(None) => continue, // Try next field
1782 Err(_) => continue, // Try next field
1783 }
1784 }
1785 }
1786 }
1787 }
1788 // No matching URI found in any field
1789 Ok(None)
1790 })
1791 },
1792 ));
1793 // Skip DID-based join logic for non-literal:self collections
1794 continue;
1795 }
1796
1797 // Determine type and resolver based on key_type
1798 match key_type.as_str() {
1799 "literal:self" => {
1800 // Single record per DID - return nullable object of the collection's type
1801 let slice_for_self_join = slice_uri.clone();
1802 object = object.field(Field::new(
1803 &field_name,
1804 TypeRef::named(&collection.type_name),
1805 move |ctx| {
1806 let db = db_for_join.clone();
1807 let nsid = collection_nsid.clone();
1808 let slice = slice_for_self_join.clone();
1809 FieldFuture::new(async move {
1810 let container =
1811 ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1812 let did = container.record.did.clone();
1813 let uri = format!("at://{}/{}/self", did, nsid);
1814
1815 // Query with slice_uri filter to ensure we get the right record
1816 let mut where_clause = crate::models::WhereClause {
1817 conditions: std::collections::HashMap::new(),
1818 or_conditions: None,
1819 and: None,
1820 or: None,
1821 };
1822 where_clause.conditions.insert(
1823 "collection".to_string(),
1824 crate::models::WhereCondition {
1825 gt: None,
1826 gte: None,
1827 lt: None,
1828 lte: None,
1829 eq: Some(serde_json::Value::String(nsid.clone())),
1830 in_values: None,
1831 contains: None,
1832 fuzzy: None,
1833 },
1834 );
1835 where_clause.conditions.insert(
1836 "did".to_string(),
1837 crate::models::WhereCondition {
1838 gt: None,
1839 gte: None,
1840 lt: None,
1841 lte: None,
1842 eq: Some(serde_json::Value::String(did.clone())),
1843 in_values: None,
1844 contains: None,
1845 fuzzy: None,
1846 },
1847 );
1848
1849 match db.get_slice_collections_records(
1850 &slice,
1851 Some(1),
1852 None,
1853 None,
1854 Some(&where_clause),
1855 ).await {
1856 Ok((records, _cursor)) => {
1857 if let Some(record) = records.into_iter().next() {
1858 let indexed_record = crate::models::IndexedRecord {
1859 uri: record.uri,
1860 cid: record.cid,
1861 did: record.did,
1862 collection: record.collection,
1863 value: record.json,
1864 indexed_at: record.indexed_at.to_rfc3339(),
1865 };
1866 let new_container = RecordContainer { record: indexed_record };
1867 Ok(Some(FieldValue::owned_any(new_container)))
1868 } else {
1869 Ok(None)
1870 }
1871 }
1872 Err(e) => {
1873 tracing::debug!("Join query error for {}: {}", uri, e);
1874 Ok(None)
1875 }
1876 }
1877 })
1878 },
1879 ));
1880 }
1881 "tid" | "any" => {
1882 // Skip - these are handled as plural reverse joins below with URI filtering
1883 continue;
1884
1885 // Multiple records per DID - return array of the collection's type (DISABLED)
1886 /*object = object.field(
1887 Field::new(
1888 &field_name,
1889 TypeRef::named_nn_list_nn(&collection.type_name),
1890 move |ctx| {
1891 let nsid = collection_nsid.clone();
1892 let slice = slice_for_join.clone();
1893 let db_fallback = db_for_join.clone();
1894 FieldFuture::new(async move {
1895 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1896 let did = &container.record.did;
1897
1898 // Get limit from argument, default to 50
1899 let limit = ctx.args.get("limit")
1900 .and_then(|v| v.i64().ok())
1901 .map(|i| i as usize)
1902 .unwrap_or(50)
1903 .min(100); // Cap at 100 to prevent abuse
1904
1905 // Try to get DataLoader from context
1906 if let Some(gql_ctx) = ctx.data_opt::<GraphQLContext>() {
1907 // Use DataLoader for batched loading
1908 let key = CollectionDidKey {
1909 slice_uri: slice.clone(),
1910 collection: nsid.clone(),
1911 did: did.clone(),
1912 };
1913
1914 match gql_ctx.collection_did_loader.load_one(key).await {
1915 Ok(Some(mut records)) => {
1916 // Apply limit after loading
1917 records.truncate(limit);
1918
1919 let values: Vec<FieldValue> = records
1920 .into_iter()
1921 .map(|indexed_record| {
1922 let container = RecordContainer {
1923 record: indexed_record,
1924 };
1925 FieldValue::owned_any(container)
1926 })
1927 .collect();
1928 Ok(Some(FieldValue::list(values)))
1929 }
1930 Ok(None) => {
1931 Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
1932 }
1933 Err(e) => {
1934 tracing::debug!("DataLoader error for {}: {:?}", nsid, e);
1935 Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
1936 }
1937 }
1938 } else {
1939 // Fallback to direct database query if DataLoader not available
1940 let db = db_fallback.clone();
1941 let mut where_clause = crate::models::WhereClause {
1942 conditions: HashMap::new(),
1943 or_conditions: None,
1944 and: None,
1945 or: None,
1946 };
1947 where_clause.conditions.insert(
1948 "collection".to_string(),
1949 crate::models::WhereCondition {
1950 gt: None,
1951 gte: None,
1952 lt: None,
1953 lte: None,
1954 eq: Some(serde_json::Value::String(nsid.clone())),
1955 in_values: None,
1956 contains: None,
1957 fuzzy: None,
1958 },
1959 );
1960 where_clause.conditions.insert(
1961 "did".to_string(),
1962 crate::models::WhereCondition {
1963 gt: None,
1964 gte: None,
1965 lt: None,
1966 lte: None,
1967 eq: Some(serde_json::Value::String(did.clone())),
1968 in_values: None,
1969 contains: None,
1970 fuzzy: None,
1971 },
1972 );
1973
1974 match db.get_slice_collections_records(
1975 &slice,
1976 Some(limit as i32),
1977 None, // cursor
1978 None, // sort
1979 Some(&where_clause),
1980 ).await {
1981 Ok((records, _cursor)) => {
1982 let values: Vec<FieldValue> = records
1983 .into_iter()
1984 .map(|record| {
1985 let indexed_record = crate::models::IndexedRecord {
1986 uri: record.uri,
1987 cid: record.cid,
1988 did: record.did,
1989 collection: record.collection,
1990 value: record.json,
1991 indexed_at: record.indexed_at.to_rfc3339(),
1992 };
1993 let container = RecordContainer {
1994 record: indexed_record,
1995 };
1996 FieldValue::owned_any(container)
1997 })
1998 .collect();
1999 Ok(Some(FieldValue::list(values)))
2000 }
2001 Err(e) => {
2002 tracing::debug!("Error querying {}: {}", nsid, e);
2003 Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
2004 }
2005 }
2006 }
2007 })
2008 },
2009 )
2010 .argument(async_graphql::dynamic::InputValue::new(
2011 "limit",
2012 TypeRef::named(TypeRef::INT),
2013 ))
2014 );*/
2015 }
2016 _ => {
2017 // Unknown key type, skip
2018 continue;
2019 }
2020 }
2021 }
2022
2023 // Add reverse joins: for every other collection, add a field to query records by DID
2024 // This enables bidirectional traversal (e.g., profile.plays and play.profile)
2025 for collection in all_collections {
2026 let reverse_field_name = format!("{}s", nsid_to_join_field_name(&collection.nsid));
2027 let slice_for_reverse = slice_uri.clone();
2028 let collection_nsid = collection.nsid.clone();
2029 let collection_type = collection.type_name.clone();
2030 let at_uri_fields = collection.at_uri_fields.clone();
2031
2032 object = object.field(
2033 Field::new(
2034 &reverse_field_name,
2035 TypeRef::named_nn_list_nn(&collection_type),
2036 move |ctx| {
2037 let slice = slice_for_reverse.clone();
2038 let nsid = collection_nsid.clone();
2039 let ref_fields = at_uri_fields.clone();
2040 FieldFuture::new(async move {
2041 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
2042
2043 // Get limit from argument, default to 50
2044 let limit = ctx.args.get("limit")
2045 .and_then(|v| v.i64().ok())
2046 .map(|i| i as usize)
2047 .unwrap_or(50)
2048 .min(100); // Cap at 100 to prevent abuse
2049
2050 // Try to get DataLoader from context
2051 if let Some(gql_ctx) = ctx.data_opt::<GraphQLContext>() {
2052 let parent_uri = &container.record.uri;
2053
2054 // Try each at-uri field from the lexicon
2055 tracing::debug!(
2056 "Trying reverse join for {} with at-uri fields: {:?}",
2057 nsid,
2058 ref_fields
2059 );
2060
2061 for ref_field in &ref_fields {
2062 let key = crate::graphql::dataloader::CollectionUriKey {
2063 slice_uri: slice.clone(),
2064 collection: nsid.clone(),
2065 parent_uri: parent_uri.clone(),
2066 reference_field: ref_field.clone(),
2067 };
2068
2069 tracing::debug!(
2070 "Querying {} via field '{}' for URI: {}",
2071 nsid,
2072 ref_field,
2073 parent_uri
2074 );
2075
2076 match gql_ctx.collection_uri_loader.load_one(key).await {
2077 Ok(Some(mut records)) => {
2078 if !records.is_empty() {
2079 tracing::debug!(
2080 "Found {} {} records via '{}' field for parent URI: {}",
2081 records.len(),
2082 nsid,
2083 ref_field,
2084 parent_uri
2085 );
2086
2087 // Apply limit
2088 records.truncate(limit);
2089
2090 let values: Vec<FieldValue> = records
2091 .into_iter()
2092 .map(|indexed_record| {
2093 let container = RecordContainer {
2094 record: indexed_record,
2095 };
2096 FieldValue::owned_any(container)
2097 })
2098 .collect();
2099 return Ok(Some(FieldValue::list(values)));
2100 }
2101 }
2102 Ok(None) => continue,
2103 Err(e) => {
2104 tracing::debug!("DataLoader error for {} field '{}': {:?}", nsid, ref_field, e);
2105 continue;
2106 }
2107 }
2108 }
2109
2110 // No records found via any at-uri field
2111 tracing::debug!("No {} records found for parent URI: {}", nsid, parent_uri);
2112 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
2113 }
2114
2115 // Fallback: DataLoader not available
2116 tracing::debug!("DataLoader not available for reverse join");
2117 Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
2118 })
2119 },
2120 )
2121 .argument(async_graphql::dynamic::InputValue::new(
2122 "limit",
2123 TypeRef::named(TypeRef::INT),
2124 ))
2125 );
2126
2127 // Add count field for the reverse join
2128 let count_field_name = format!("{}Count", reverse_field_name);
2129 let db_for_count = database.clone();
2130 let slice_for_count = slice_uri.clone();
2131 let collection_for_count = collection.nsid.clone();
2132 let at_uri_fields_for_count = collection.at_uri_fields.clone();
2133
2134 object = object.field(Field::new(
2135 &count_field_name,
2136 TypeRef::named_nn(TypeRef::INT),
2137 move |ctx| {
2138 let slice = slice_for_count.clone();
2139 let nsid = collection_for_count.clone();
2140 let db = db_for_count.clone();
2141 let ref_fields = at_uri_fields_for_count.clone();
2142 FieldFuture::new(async move {
2143 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
2144 let parent_uri = &container.record.uri;
2145
2146 // Build where clause to count records referencing this URI
2147 for ref_field in &ref_fields {
2148 let mut where_clause = crate::models::WhereClause {
2149 conditions: HashMap::new(),
2150 or_conditions: None,
2151 and: None,
2152 or: None,
2153 };
2154
2155 where_clause.conditions.insert(
2156 "collection".to_string(),
2157 crate::models::WhereCondition {
2158 gt: None,
2159 gte: None,
2160 lt: None,
2161 lte: None,
2162 eq: Some(serde_json::Value::String(nsid.clone())),
2163 in_values: None,
2164 contains: None,
2165 fuzzy: None,
2166 },
2167 );
2168
2169 where_clause.conditions.insert(
2170 ref_field.clone(),
2171 crate::models::WhereCondition {
2172 gt: None,
2173 gte: None,
2174 lt: None,
2175 lte: None,
2176 eq: Some(serde_json::Value::String(parent_uri.clone())),
2177 in_values: None,
2178 contains: None,
2179 fuzzy: None,
2180 },
2181 );
2182
2183 match db
2184 .count_slice_collections_records(&slice, Some(&where_clause))
2185 .await
2186 {
2187 Ok(count) if count > 0 => {
2188 return Ok(Some(FieldValue::value(count as i32)));
2189 }
2190 Ok(_) => continue,
2191 Err(e) => {
2192 tracing::debug!("Count error for {}: {}", nsid, e);
2193 continue;
2194 }
2195 }
2196 }
2197
2198 // No matching field found, return 0
2199 Ok(Some(FieldValue::value(0)))
2200 })
2201 },
2202 ));
2203 }
2204
2205 // Add sparklines, stats, and oauth clients fields for NetworkSlicesSlice type
2206 if type_name == "NetworkSlicesSlice" {
2207 object = add_sparklines_field_to_slice(object, database.clone());
2208 object = add_stats_field_to_slice(object, database.clone());
2209 object = add_oauth_clients_field_to_slice(object, auth_base_url);
2210 }
2211
2212 object
2213}
2214
2215/// Convert serde_json::Value to async_graphql::Value
2216fn json_to_graphql_value(val: &serde_json::Value) -> GraphQLValue {
2217 match val {
2218 serde_json::Value::Null => GraphQLValue::Null,
2219 serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b),
2220 serde_json::Value::Number(n) => {
2221 if let Some(i) = n.as_i64() {
2222 GraphQLValue::Number((i as i32).into())
2223 } else if let Some(f) = n.as_f64() {
2224 GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into())
2225 } else {
2226 GraphQLValue::Null
2227 }
2228 }
2229 serde_json::Value::String(s) => GraphQLValue::String(s.clone()),
2230 serde_json::Value::Array(arr) => {
2231 GraphQLValue::List(arr.iter().map(json_to_graphql_value).collect())
2232 }
2233 serde_json::Value::Object(obj) => {
2234 let mut map = async_graphql::indexmap::IndexMap::new();
2235 for (k, v) in obj {
2236 map.insert(
2237 async_graphql::Name::new(k.as_str()),
2238 json_to_graphql_value(v),
2239 );
2240 }
2241 GraphQLValue::Object(map)
2242 }
2243 }
2244}
2245
2246/// Converts GraphQL type to TypeRef for async-graphql
2247fn graphql_type_to_typeref(gql_type: &GraphQLType, is_required: bool) -> TypeRef {
2248 match gql_type {
2249 GraphQLType::String => {
2250 if is_required {
2251 TypeRef::named_nn(TypeRef::STRING)
2252 } else {
2253 TypeRef::named(TypeRef::STRING)
2254 }
2255 }
2256 GraphQLType::Int => {
2257 if is_required {
2258 TypeRef::named_nn(TypeRef::INT)
2259 } else {
2260 TypeRef::named(TypeRef::INT)
2261 }
2262 }
2263 GraphQLType::Boolean => {
2264 if is_required {
2265 TypeRef::named_nn(TypeRef::BOOLEAN)
2266 } else {
2267 TypeRef::named(TypeRef::BOOLEAN)
2268 }
2269 }
2270 GraphQLType::Float => {
2271 if is_required {
2272 TypeRef::named_nn(TypeRef::FLOAT)
2273 } else {
2274 TypeRef::named(TypeRef::FLOAT)
2275 }
2276 }
2277 GraphQLType::Blob => {
2278 // Blob object type with url resolver
2279 // Always nullable since blob data might be missing or malformed
2280 TypeRef::named("Blob")
2281 }
2282 GraphQLType::Json | GraphQLType::Ref | GraphQLType::LexiconRef(_) | GraphQLType::Object(_) | GraphQLType::Union => {
2283 // JSON scalar type - linked records, lexicon refs, and complex objects return as JSON (fallback)
2284 if is_required {
2285 TypeRef::named_nn("JSON")
2286 } else {
2287 TypeRef::named("JSON")
2288 }
2289 }
2290 GraphQLType::Array(inner) => {
2291 // For arrays of primitives, use typed arrays
2292 // For arrays of complex types, use JSON scalar
2293 match inner.as_ref() {
2294 GraphQLType::String
2295 | GraphQLType::Int
2296 | GraphQLType::Boolean
2297 | GraphQLType::Float => {
2298 let inner_ref = match inner.as_ref() {
2299 GraphQLType::String => TypeRef::STRING,
2300 GraphQLType::Int => TypeRef::INT,
2301 GraphQLType::Boolean => TypeRef::BOOLEAN,
2302 GraphQLType::Float => TypeRef::FLOAT,
2303 _ => unreachable!(),
2304 };
2305
2306 if is_required {
2307 TypeRef::named_nn_list_nn(inner_ref)
2308 } else {
2309 TypeRef::named_list(inner_ref)
2310 }
2311 }
2312 GraphQLType::Blob => {
2313 // Arrays of blobs - return list of Blob objects
2314 if is_required {
2315 TypeRef::named_nn_list("Blob")
2316 } else {
2317 TypeRef::named_list("Blob")
2318 }
2319 }
2320 _ => {
2321 // Arrays of complex types (objects, etc.) are just JSON
2322 if is_required {
2323 TypeRef::named_nn("JSON")
2324 } else {
2325 TypeRef::named("JSON")
2326 }
2327 }
2328 }
2329 }
2330 }
2331}
2332
2333/// Creates the Blob GraphQL type with url resolver
2334fn create_blob_type() -> Object {
2335 let mut blob = Object::new("Blob");
2336
2337 // ref field - CID reference
2338 blob = blob.field(Field::new(
2339 "ref",
2340 TypeRef::named_nn(TypeRef::STRING),
2341 |ctx| {
2342 FieldFuture::new(async move {
2343 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
2344 Ok(Some(GraphQLValue::from(container.blob_ref.clone())))
2345 })
2346 },
2347 ));
2348
2349 // mimeType field
2350 blob = blob.field(Field::new(
2351 "mimeType",
2352 TypeRef::named_nn(TypeRef::STRING),
2353 |ctx| {
2354 FieldFuture::new(async move {
2355 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
2356 Ok(Some(GraphQLValue::from(container.mime_type.clone())))
2357 })
2358 },
2359 ));
2360
2361 // size field
2362 blob = blob.field(Field::new("size", TypeRef::named_nn(TypeRef::INT), |ctx| {
2363 FieldFuture::new(async move {
2364 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
2365 Ok(Some(GraphQLValue::from(container.size as i32)))
2366 })
2367 }));
2368
2369 // url(preset) field with argument
2370 blob = blob.field(
2371 Field::new("url", TypeRef::named_nn(TypeRef::STRING), |ctx| {
2372 FieldFuture::new(async move {
2373 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
2374
2375 // Get preset argument, default to "feed_fullsize"
2376 let preset: String = match ctx.args.get("preset") {
2377 Some(val) => val.string().unwrap_or("feed_fullsize").to_string(),
2378 None => "feed_fullsize".to_string(),
2379 };
2380
2381 // Build CDN URL: https://cdn.bsky.app/img/{preset}/plain/{did}/{cid}@jpeg
2382 let cdn_base_url = "https://cdn.bsky.app/img";
2383 let url = format!(
2384 "{}/{}/plain/{}/{}@jpeg",
2385 cdn_base_url,
2386 preset,
2387 container.did,
2388 container.blob_ref
2389 );
2390
2391 Ok(Some(GraphQLValue::from(url)))
2392 })
2393 })
2394 .argument(async_graphql::dynamic::InputValue::new(
2395 "preset",
2396 TypeRef::named(TypeRef::STRING),
2397 ))
2398 .description("Generate CDN URL for the blob with the specified preset (avatar, banner, feed_thumbnail, feed_fullsize)"),
2399 );
2400
2401 blob
2402}
2403
2404/// Creates the SyncResult GraphQL type for mutation responses
2405fn create_sync_result_type() -> Object {
2406 let mut sync_result = Object::new("SyncResult");
2407
2408 sync_result = sync_result.field(Field::new(
2409 "success",
2410 TypeRef::named_nn(TypeRef::BOOLEAN),
2411 |ctx| {
2412 FieldFuture::new(async move {
2413 let value = ctx
2414 .parent_value
2415 .downcast_ref::<GraphQLValue>()
2416 .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
2417 if let GraphQLValue::Object(obj) = value {
2418 if let Some(success) = obj.get("success") {
2419 return Ok(Some(success.clone()));
2420 }
2421 }
2422 Ok(None)
2423 })
2424 },
2425 ));
2426
2427 sync_result = sync_result.field(Field::new(
2428 "reposProcessed",
2429 TypeRef::named_nn(TypeRef::INT),
2430 |ctx| {
2431 FieldFuture::new(async move {
2432 let value = ctx
2433 .parent_value
2434 .downcast_ref::<GraphQLValue>()
2435 .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
2436 if let GraphQLValue::Object(obj) = value {
2437 if let Some(repos) = obj.get("reposProcessed") {
2438 return Ok(Some(repos.clone()));
2439 }
2440 }
2441 Ok(None)
2442 })
2443 },
2444 ));
2445
2446 sync_result = sync_result.field(Field::new(
2447 "recordsSynced",
2448 TypeRef::named_nn(TypeRef::INT),
2449 |ctx| {
2450 FieldFuture::new(async move {
2451 let value = ctx
2452 .parent_value
2453 .downcast_ref::<GraphQLValue>()
2454 .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
2455 if let GraphQLValue::Object(obj) = value {
2456 if let Some(records) = obj.get("recordsSynced") {
2457 return Ok(Some(records.clone()));
2458 }
2459 }
2460 Ok(None)
2461 })
2462 },
2463 ));
2464
2465 sync_result = sync_result.field(Field::new(
2466 "timedOut",
2467 TypeRef::named_nn(TypeRef::BOOLEAN),
2468 |ctx| {
2469 FieldFuture::new(async move {
2470 let value = ctx
2471 .parent_value
2472 .downcast_ref::<GraphQLValue>()
2473 .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
2474 if let GraphQLValue::Object(obj) = value {
2475 if let Some(timed_out) = obj.get("timedOut") {
2476 return Ok(Some(timed_out.clone()));
2477 }
2478 }
2479 Ok(None)
2480 })
2481 },
2482 ));
2483
2484 sync_result = sync_result.field(Field::new(
2485 "message",
2486 TypeRef::named_nn(TypeRef::STRING),
2487 |ctx| {
2488 FieldFuture::new(async move {
2489 let value = ctx
2490 .parent_value
2491 .downcast_ref::<GraphQLValue>()
2492 .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
2493 if let GraphQLValue::Object(obj) = value {
2494 if let Some(message) = obj.get("message") {
2495 return Ok(Some(message.clone()));
2496 }
2497 }
2498 Ok(None)
2499 })
2500 },
2501 ));
2502
2503 sync_result
2504}
2505
2506/// Creates the SortDirection enum type
2507fn create_sort_direction_enum() -> Enum {
2508 Enum::new("SortDirection")
2509 .item(EnumItem::new("asc"))
2510 .item(EnumItem::new("desc"))
2511}
2512
2513/// Creates the SortField input type
2514fn create_sort_field_input() -> InputObject {
2515 InputObject::new("SortField")
2516 .field(InputValue::new("field", TypeRef::named_nn(TypeRef::STRING)))
2517 .field(InputValue::new(
2518 "direction",
2519 TypeRef::named_nn("SortDirection"),
2520 ))
2521}
2522
2523/// Creates the StringCondition input type for string field filtering
2524fn create_string_condition_input() -> InputObject {
2525 InputObject::new("StringCondition")
2526 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
2527 .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING)))
2528 .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING)))
2529 .field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING)))
2530}
2531
2532/// Creates the IntCondition input type for int field filtering
2533fn create_int_condition_input() -> InputObject {
2534 InputObject::new("IntCondition")
2535 .field(InputValue::new("eq", TypeRef::named(TypeRef::INT)))
2536 .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT)))
2537}
2538
2539/// Creates the PageInfo type for connection pagination
2540fn create_page_info_type() -> Object {
2541 let mut page_info = Object::new("PageInfo");
2542
2543 page_info = page_info.field(Field::new(
2544 "hasNextPage",
2545 TypeRef::named_nn(TypeRef::BOOLEAN),
2546 |ctx| {
2547 FieldFuture::new(async move {
2548 let value = ctx
2549 .parent_value
2550 .downcast_ref::<GraphQLValue>()
2551 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
2552 if let GraphQLValue::Object(obj) = value {
2553 if let Some(has_next) = obj.get("hasNextPage") {
2554 return Ok(Some(has_next.clone()));
2555 }
2556 }
2557 Ok(Some(GraphQLValue::from(false)))
2558 })
2559 },
2560 ));
2561
2562 page_info = page_info.field(Field::new(
2563 "hasPreviousPage",
2564 TypeRef::named_nn(TypeRef::BOOLEAN),
2565 |ctx| {
2566 FieldFuture::new(async move {
2567 let value = ctx
2568 .parent_value
2569 .downcast_ref::<GraphQLValue>()
2570 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
2571 if let GraphQLValue::Object(obj) = value {
2572 if let Some(has_prev) = obj.get("hasPreviousPage") {
2573 return Ok(Some(has_prev.clone()));
2574 }
2575 }
2576 Ok(Some(GraphQLValue::from(false)))
2577 })
2578 },
2579 ));
2580
2581 page_info = page_info.field(Field::new(
2582 "startCursor",
2583 TypeRef::named(TypeRef::STRING),
2584 |ctx| {
2585 FieldFuture::new(async move {
2586 let value = ctx
2587 .parent_value
2588 .downcast_ref::<GraphQLValue>()
2589 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
2590 if let GraphQLValue::Object(obj) = value {
2591 if let Some(cursor) = obj.get("startCursor") {
2592 return Ok(Some(cursor.clone()));
2593 }
2594 }
2595 Ok(None)
2596 })
2597 },
2598 ));
2599
2600 page_info = page_info.field(Field::new(
2601 "endCursor",
2602 TypeRef::named(TypeRef::STRING),
2603 |ctx| {
2604 FieldFuture::new(async move {
2605 let value = ctx
2606 .parent_value
2607 .downcast_ref::<GraphQLValue>()
2608 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
2609 if let GraphQLValue::Object(obj) = value {
2610 if let Some(cursor) = obj.get("endCursor") {
2611 return Ok(Some(cursor.clone()));
2612 }
2613 }
2614 Ok(None)
2615 })
2616 },
2617 ));
2618
2619 page_info
2620}
2621
2622/// Connection data structure that holds all connection fields
2623#[derive(Clone)]
2624struct ConnectionData {
2625 total_count: i32,
2626 has_next_page: bool,
2627 end_cursor: Option<String>,
2628 nodes: Vec<RecordContainer>,
2629}
2630
2631/// Edge data structure for Relay connections
2632#[derive(Clone)]
2633struct EdgeData {
2634 node: RecordContainer,
2635 cursor: String,
2636}
2637
2638/// Creates an Edge type for a given record type
2639/// Example: For "Post" creates "PostEdge" with node and cursor
2640fn create_edge_type(record_type_name: &str) -> Object {
2641 let edge_name = format!("{}Edge", record_type_name);
2642 let mut edge = Object::new(&edge_name);
2643
2644 // Add node field
2645 let record_type = record_type_name.to_string();
2646 edge = edge.field(Field::new("node", TypeRef::named_nn(&record_type), |ctx| {
2647 FieldFuture::new(async move {
2648 let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?;
2649 Ok(Some(FieldValue::owned_any(edge_data.node.clone())))
2650 })
2651 }));
2652
2653 // Add cursor field
2654 edge = edge.field(Field::new(
2655 "cursor",
2656 TypeRef::named_nn(TypeRef::STRING),
2657 |ctx| {
2658 FieldFuture::new(async move {
2659 let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?;
2660 Ok(Some(GraphQLValue::from(edge_data.cursor.clone())))
2661 })
2662 },
2663 ));
2664
2665 edge
2666}
2667
2668/// Creates a Connection type for a given record type
2669/// Example: For "Post" creates "PostConnection" with edges, pageInfo, and totalCount
2670fn create_connection_type(record_type_name: &str) -> Object {
2671 let connection_name = format!("{}Connection", record_type_name);
2672 let mut connection = Object::new(&connection_name);
2673
2674 // Add totalCount field
2675 connection = connection.field(Field::new(
2676 "totalCount",
2677 TypeRef::named_nn(TypeRef::INT),
2678 |ctx| {
2679 FieldFuture::new(async move {
2680 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
2681 Ok(Some(GraphQLValue::from(data.total_count)))
2682 })
2683 },
2684 ));
2685
2686 // Add pageInfo field
2687 connection = connection.field(Field::new(
2688 "pageInfo",
2689 TypeRef::named_nn("PageInfo"),
2690 |ctx| {
2691 FieldFuture::new(async move {
2692 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
2693
2694 let mut page_info = async_graphql::indexmap::IndexMap::new();
2695 page_info.insert(
2696 async_graphql::Name::new("hasNextPage"),
2697 GraphQLValue::from(data.has_next_page),
2698 );
2699 // For forward pagination only, hasPreviousPage is always false
2700 page_info.insert(
2701 async_graphql::Name::new("hasPreviousPage"),
2702 GraphQLValue::from(false),
2703 );
2704
2705 // Add startCursor (first node's cid if available)
2706 if !data.nodes.is_empty() {
2707 if let Some(first_record) = data.nodes.first() {
2708 let start_cursor = general_purpose::URL_SAFE_NO_PAD
2709 .encode(first_record.record.cid.clone());
2710 page_info.insert(
2711 async_graphql::Name::new("startCursor"),
2712 GraphQLValue::from(start_cursor),
2713 );
2714 }
2715 }
2716
2717 // Add endCursor
2718 if let Some(ref cursor) = data.end_cursor {
2719 page_info.insert(
2720 async_graphql::Name::new("endCursor"),
2721 GraphQLValue::from(cursor.clone()),
2722 );
2723 }
2724
2725 Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info))))
2726 })
2727 },
2728 ));
2729
2730 // Add edges field (Relay standard)
2731 let edge_type = format!("{}Edge", record_type_name);
2732 connection = connection.field(Field::new(
2733 "edges",
2734 TypeRef::named_nn_list_nn(&edge_type),
2735 |ctx| {
2736 FieldFuture::new(async move {
2737 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
2738
2739 let field_values: Vec<FieldValue<'_>> = data
2740 .nodes
2741 .iter()
2742 .map(|node| {
2743 // Use base64-encoded CID as cursor
2744 let cursor =
2745 general_purpose::URL_SAFE_NO_PAD.encode(node.record.cid.clone());
2746 let edge = EdgeData {
2747 node: node.clone(),
2748 cursor,
2749 };
2750 FieldValue::owned_any(edge)
2751 })
2752 .collect();
2753
2754 Ok(Some(FieldValue::list(field_values)))
2755 })
2756 },
2757 ));
2758
2759 // Add nodes field (convenience, direct access to records without edges wrapper)
2760 connection = connection.field(Field::new(
2761 "nodes",
2762 TypeRef::named_nn_list_nn(record_type_name),
2763 |ctx| {
2764 FieldFuture::new(async move {
2765 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
2766
2767 let field_values: Vec<FieldValue<'_>> = data
2768 .nodes
2769 .iter()
2770 .map(|node| FieldValue::owned_any(node.clone()))
2771 .collect();
2772
2773 Ok(Some(FieldValue::list(field_values)))
2774 })
2775 },
2776 ));
2777
2778 connection
2779}
2780
2781/// Creates the Mutation root type with sync operations and dynamic collection mutations
2782fn create_mutation_type(
2783 database: Database,
2784 slice_uri: String,
2785 lexicons: &[serde_json::Value],
2786) -> Object {
2787 let mut mutation = Object::new("Mutation");
2788
2789 // Add syncUserCollections mutation
2790 let db_clone = database.clone();
2791 let slice_clone = slice_uri.clone();
2792
2793 mutation = mutation.field(
2794 Field::new(
2795 "syncUserCollections",
2796 TypeRef::named_nn("SyncResult"),
2797 move |ctx| {
2798 let db = db_clone.clone();
2799 let slice = slice_clone.clone();
2800
2801 FieldFuture::new(async move {
2802 let did = ctx
2803 .args
2804 .get("did")
2805 .and_then(|v| v.string().ok())
2806 .ok_or_else(|| Error::new("did argument is required"))?;
2807
2808 // Create sync service and call sync_user_collections
2809 let cache_backend = crate::cache::CacheFactory::create_cache(
2810 crate::cache::CacheBackend::InMemory { ttl_seconds: None },
2811 )
2812 .await
2813 .map_err(|e| Error::new(format!("Failed to create cache: {}", e)))?;
2814 let cache = Arc::new(Mutex::new(crate::cache::SliceCache::new(cache_backend)));
2815 let sync_service = crate::sync::SyncService::with_cache(
2816 db.clone(),
2817 std::env::var("RELAY_ENDPOINT")
2818 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()),
2819 cache,
2820 );
2821
2822 let result = sync_service
2823 .sync_user_collections(did, &slice, 30) // 30 second timeout
2824 .await
2825 .map_err(|e| Error::new(format!("Sync failed: {}", e)))?;
2826
2827 // Convert result to GraphQL object
2828 let mut obj = async_graphql::indexmap::IndexMap::new();
2829 obj.insert(
2830 async_graphql::Name::new("success"),
2831 GraphQLValue::from(result.success),
2832 );
2833 obj.insert(
2834 async_graphql::Name::new("reposProcessed"),
2835 GraphQLValue::from(result.repos_processed),
2836 );
2837 obj.insert(
2838 async_graphql::Name::new("recordsSynced"),
2839 GraphQLValue::from(result.records_synced),
2840 );
2841 obj.insert(
2842 async_graphql::Name::new("timedOut"),
2843 GraphQLValue::from(result.timed_out),
2844 );
2845 obj.insert(
2846 async_graphql::Name::new("message"),
2847 GraphQLValue::from(result.message),
2848 );
2849
2850 Ok(Some(FieldValue::owned_any(GraphQLValue::Object(obj))))
2851 })
2852 },
2853 )
2854 .argument(async_graphql::dynamic::InputValue::new(
2855 "did",
2856 TypeRef::named_nn(TypeRef::STRING),
2857 ))
2858 .description("Sync user collections for a given DID"),
2859 );
2860
2861 // Add dynamic mutations for each collection
2862 for lexicon in lexicons {
2863 if let (Some(nsid), Some(defs)) = (
2864 lexicon.get("id").and_then(|n| n.as_str()),
2865 lexicon.get("defs"),
2866 ) {
2867 let fields = extract_collection_fields(defs);
2868 if !fields.is_empty() {
2869 let type_name = nsid_to_type_name(nsid);
2870
2871 // Add create mutation
2872 mutation = add_create_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone());
2873
2874 // Add update mutation
2875 mutation = add_update_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone());
2876
2877 // Add delete mutation
2878 mutation = add_delete_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone());
2879 }
2880 }
2881 }
2882
2883 mutation
2884}
2885
2886/// Converts NSID to GraphQL type name
2887/// Example: app.bsky.feed.post -> AppBskyFeedPost
2888fn nsid_to_type_name(nsid: &str) -> String {
2889 nsid.split('.')
2890 .map(|part| {
2891 let mut chars = part.chars();
2892 match chars.next() {
2893 None => String::new(),
2894 Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
2895 }
2896 })
2897 .collect::<Vec<_>>()
2898 .join("")
2899}
2900
2901/// Converts NSID to GraphQL query name in camelCase and pluralized
2902/// Example: app.bsky.feed.post -> appBskyFeedPosts
2903/// Example: fm.teal.alpha.feed.play -> fmTealAlphaFeedPlays
2904fn nsid_to_query_name(nsid: &str) -> String {
2905 // First convert to camelCase like join fields
2906 let camel_case = nsid_to_join_field_name(nsid);
2907
2908 // Then pluralize the end
2909 if camel_case.ends_with("s")
2910 || camel_case.ends_with("x")
2911 || camel_case.ends_with("ch")
2912 || camel_case.ends_with("sh")
2913 {
2914 format!("{}es", camel_case) // status -> statuses, box -> boxes
2915 } else if camel_case.ends_with("y") && camel_case.len() > 1 {
2916 let chars: Vec<char> = camel_case.chars().collect();
2917 if chars.len() > 1 && !['a', 'e', 'i', 'o', 'u'].contains(&chars[chars.len() - 2]) {
2918 format!("{}ies", &camel_case[..camel_case.len() - 1]) // party -> parties
2919 } else {
2920 format!("{}s", camel_case) // day -> days
2921 }
2922 } else {
2923 format!("{}s", camel_case) // post -> posts
2924 }
2925}
2926
2927/// Converts NSID to GraphQL join field name in camelCase
2928/// Example: app.bsky.actor.profile -> appBskyActorProfile
2929fn nsid_to_join_field_name(nsid: &str) -> String {
2930 let parts: Vec<&str> = nsid.split('.').collect();
2931 if parts.is_empty() {
2932 return nsid.to_string();
2933 }
2934
2935 // First part is lowercase, rest are capitalized
2936 let mut result = parts[0].to_string();
2937 for part in &parts[1..] {
2938 let mut chars = part.chars();
2939 if let Some(first) = chars.next() {
2940 result.push_str(&first.to_uppercase().collect::<String>());
2941 result.push_str(chars.as_str());
2942 }
2943 }
2944
2945 result
2946}
2947
2948/// Creates an aggregated type for GROUP BY queries
2949/// Returns a dynamic object with the grouped fields plus a count field
2950fn create_aggregated_type(type_name: &str, fields: &[GraphQLField]) -> Object {
2951 let mut aggregated = Object::new(type_name);
2952
2953 // Add fields from the lexicon that can be grouped
2954 // Use JSON type for all fields to support both strings and complex types
2955 for field in fields {
2956 let field_name = field.name.clone();
2957 let field_name_clone = field_name.clone();
2958 aggregated = aggregated.field(Field::new(
2959 &field_name,
2960 TypeRef::named("JSON"),
2961 move |ctx| {
2962 let field_name = field_name_clone.clone();
2963 FieldFuture::new(async move {
2964 let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?;
2965 if let Some(obj) = json_value.as_object() {
2966 if let Some(value) = obj.get(&field_name) {
2967 // Convert serde_json::Value to async_graphql::Value
2968 let graphql_value = serde_json_to_graphql_value(value);
2969 return Ok(Some(graphql_value));
2970 }
2971 }
2972 Ok(None)
2973 })
2974 },
2975 ));
2976 }
2977
2978 // Add count field
2979 aggregated = aggregated.field(Field::new(
2980 "count",
2981 TypeRef::named_nn(TypeRef::INT),
2982 |ctx| {
2983 FieldFuture::new(async move {
2984 let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?;
2985 if let Some(obj) = json_value.as_object() {
2986 if let Some(count) = obj.get("count") {
2987 if let Some(count_i64) = count.as_i64() {
2988 return Ok(Some(GraphQLValue::from(count_i64 as i32)));
2989 }
2990 }
2991 }
2992 Ok(Some(GraphQLValue::from(0)))
2993 })
2994 },
2995 ));
2996
2997 aggregated
2998}
2999
3000/// Creates the AggregationOrderBy input type for ordering by count
3001fn create_aggregation_order_by_input() -> InputObject {
3002 InputObject::new("AggregationOrderBy")
3003 .field(InputValue::new("count", TypeRef::named("SortDirection")))
3004}
3005
3006/// Creates the DateInterval enum for date truncation
3007fn create_date_interval_enum() -> Enum {
3008 Enum::new("DateInterval")
3009 .item(EnumItem::new("second"))
3010 .item(EnumItem::new("minute"))
3011 .item(EnumItem::new("hour"))
3012 .item(EnumItem::new("day"))
3013 .item(EnumItem::new("week"))
3014 .item(EnumItem::new("month"))
3015 .item(EnumItem::new("quarter"))
3016 .item(EnumItem::new("year"))
3017}
3018
3019/// Converts a serde_json::Value to an async_graphql::Value
3020fn serde_json_to_graphql_value(value: &serde_json::Value) -> GraphQLValue {
3021 match value {
3022 serde_json::Value::Null => GraphQLValue::Null,
3023 serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b),
3024 serde_json::Value::Number(n) => {
3025 if let Some(i) = n.as_i64() {
3026 GraphQLValue::Number(i.into())
3027 } else if let Some(f) = n.as_f64() {
3028 GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into())
3029 } else {
3030 GraphQLValue::Null
3031 }
3032 }
3033 serde_json::Value::String(s) => GraphQLValue::String(s.clone()),
3034 serde_json::Value::Array(arr) => {
3035 let values: Vec<GraphQLValue> = arr.iter().map(serde_json_to_graphql_value).collect();
3036 GraphQLValue::List(values)
3037 }
3038 serde_json::Value::Object(obj) => {
3039 let mut map = async_graphql::indexmap::IndexMap::new();
3040 for (k, v) in obj {
3041 map.insert(async_graphql::Name::new(k), serde_json_to_graphql_value(v));
3042 }
3043 GraphQLValue::Object(map)
3044 }
3045 }
3046}
3047
3048/// Creates the RecordUpdate type for subscription events
3049fn create_record_update_type() -> Object {
3050 let mut record_update = Object::new("RecordUpdate");
3051
3052 record_update = record_update.field(Field::new(
3053 "uri",
3054 TypeRef::named_nn(TypeRef::STRING),
3055 |ctx| {
3056 FieldFuture::new(async move {
3057 let value = ctx
3058 .parent_value
3059 .downcast_ref::<GraphQLValue>()
3060 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
3061 if let GraphQLValue::Object(obj) = value {
3062 if let Some(uri) = obj.get("uri") {
3063 return Ok(Some(uri.clone()));
3064 }
3065 }
3066 Ok(None)
3067 })
3068 },
3069 ));
3070
3071 record_update = record_update.field(Field::new(
3072 "cid",
3073 TypeRef::named_nn(TypeRef::STRING),
3074 |ctx| {
3075 FieldFuture::new(async move {
3076 let value = ctx
3077 .parent_value
3078 .downcast_ref::<GraphQLValue>()
3079 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
3080 if let GraphQLValue::Object(obj) = value {
3081 if let Some(cid) = obj.get("cid") {
3082 return Ok(Some(cid.clone()));
3083 }
3084 }
3085 Ok(None)
3086 })
3087 },
3088 ));
3089
3090 record_update = record_update.field(Field::new(
3091 "did",
3092 TypeRef::named_nn(TypeRef::STRING),
3093 |ctx| {
3094 FieldFuture::new(async move {
3095 let value = ctx
3096 .parent_value
3097 .downcast_ref::<GraphQLValue>()
3098 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
3099 if let GraphQLValue::Object(obj) = value {
3100 if let Some(did) = obj.get("did") {
3101 return Ok(Some(did.clone()));
3102 }
3103 }
3104 Ok(None)
3105 })
3106 },
3107 ));
3108
3109 record_update = record_update.field(Field::new(
3110 "collection",
3111 TypeRef::named_nn(TypeRef::STRING),
3112 |ctx| {
3113 FieldFuture::new(async move {
3114 let value = ctx
3115 .parent_value
3116 .downcast_ref::<GraphQLValue>()
3117 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
3118 if let GraphQLValue::Object(obj) = value {
3119 if let Some(collection) = obj.get("collection") {
3120 return Ok(Some(collection.clone()));
3121 }
3122 }
3123 Ok(None)
3124 })
3125 },
3126 ));
3127
3128 record_update = record_update.field(Field::new(
3129 "indexedAt",
3130 TypeRef::named_nn(TypeRef::STRING),
3131 |ctx| {
3132 FieldFuture::new(async move {
3133 let value = ctx
3134 .parent_value
3135 .downcast_ref::<GraphQLValue>()
3136 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
3137 if let GraphQLValue::Object(obj) = value {
3138 if let Some(indexed_at) = obj.get("indexedAt") {
3139 return Ok(Some(indexed_at.clone()));
3140 }
3141 }
3142 Ok(None)
3143 })
3144 },
3145 ));
3146
3147 record_update = record_update.field(Field::new(
3148 "operation",
3149 TypeRef::named_nn(TypeRef::STRING),
3150 |ctx| {
3151 FieldFuture::new(async move {
3152 let value = ctx
3153 .parent_value
3154 .downcast_ref::<GraphQLValue>()
3155 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
3156 if let GraphQLValue::Object(obj) = value {
3157 if let Some(operation) = obj.get("operation") {
3158 return Ok(Some(operation.clone()));
3159 }
3160 }
3161 Ok(None)
3162 })
3163 },
3164 ));
3165
3166 record_update = record_update.field(Field::new("value", TypeRef::named_nn("JSON"), |ctx| {
3167 FieldFuture::new(async move {
3168 let value = ctx
3169 .parent_value
3170 .downcast_ref::<GraphQLValue>()
3171 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
3172 if let GraphQLValue::Object(obj) = value {
3173 if let Some(val) = obj.get("value") {
3174 return Ok(Some(val.clone()));
3175 }
3176 }
3177 Ok(None)
3178 })
3179 }));
3180
3181 record_update
3182}
3183
3184/// Creates the Subscription root type with collection-specific subscriptions
3185fn create_subscription_type(slice_uri: String, lexicons: &[serde_json::Value]) -> Subscription {
3186 let mut subscription = Subscription::new("Subscription");
3187
3188 // For each record collection, create {collection}Created, {collection}Updated, {collection}Deleted subscriptions
3189 for lexicon in lexicons {
3190 let nsid = match lexicon.get("id").and_then(|n| n.as_str()) {
3191 Some(n) => n,
3192 None => continue,
3193 };
3194
3195 let defs = match lexicon.get("defs") {
3196 Some(d) => d,
3197 None => continue,
3198 };
3199
3200 // Only process record types (skip queries, procedures, etc.)
3201 let is_record = defs
3202 .get("main")
3203 .and_then(|m| m.get("type"))
3204 .and_then(|t| t.as_str())
3205 == Some("record");
3206
3207 if !is_record {
3208 continue;
3209 }
3210
3211 let fields = extract_collection_fields(defs);
3212 if fields.is_empty() {
3213 continue;
3214 }
3215
3216 let type_name = nsid_to_type_name(nsid);
3217 let field_base_name = nsid_to_join_field_name(nsid);
3218
3219 // {collection}Created subscription
3220 let created_field_name = format!("{}Created", field_base_name);
3221 let slice_for_created = slice_uri.clone();
3222 let nsid_for_created = nsid.to_string();
3223 let type_name_for_created = type_name.clone();
3224
3225 subscription = subscription.field(SubscriptionField::new(
3226 &created_field_name,
3227 TypeRef::named_nn(&type_name_for_created),
3228 move |_ctx| {
3229 let slice_uri = slice_for_created.clone();
3230 let collection = nsid_for_created.clone();
3231
3232 SubscriptionFieldFuture::new(async move {
3233 let mut receiver = PUBSUB.subscribe(&slice_uri).await;
3234
3235 let stream = async_stream::stream! {
3236 while let Ok(event) = receiver.recv().await {
3237 // Filter by collection and operation
3238 if event.collection != collection || event.operation != crate::graphql::RecordOperation::Create {
3239 continue;
3240 }
3241
3242 // Convert to RecordContainer and yield
3243 let indexed_record = crate::models::IndexedRecord {
3244 uri: event.uri,
3245 cid: event.cid,
3246 did: event.did,
3247 collection: event.collection,
3248 value: event.value,
3249 indexed_at: event.indexed_at,
3250 };
3251 let container = RecordContainer {
3252 record: indexed_record,
3253 };
3254 yield Ok(FieldValue::owned_any(container));
3255 }
3256 };
3257
3258 Ok(stream)
3259 })
3260 },
3261 )
3262 .description(format!("Subscribe to {} record creation events", nsid)));
3263
3264 // {collection}Updated subscription
3265 let updated_field_name = format!("{}Updated", field_base_name);
3266 let slice_for_updated = slice_uri.clone();
3267 let nsid_for_updated = nsid.to_string();
3268 let type_name_for_updated = type_name.clone();
3269
3270 subscription = subscription.field(SubscriptionField::new(
3271 &updated_field_name,
3272 TypeRef::named_nn(&type_name_for_updated),
3273 move |_ctx| {
3274 let slice_uri = slice_for_updated.clone();
3275 let collection = nsid_for_updated.clone();
3276
3277 SubscriptionFieldFuture::new(async move {
3278 let mut receiver = PUBSUB.subscribe(&slice_uri).await;
3279
3280 let stream = async_stream::stream! {
3281 while let Ok(event) = receiver.recv().await {
3282 // Filter by collection and operation
3283 if event.collection != collection || event.operation != crate::graphql::RecordOperation::Update {
3284 continue;
3285 }
3286
3287 // Convert to RecordContainer and yield
3288 let indexed_record = crate::models::IndexedRecord {
3289 uri: event.uri,
3290 cid: event.cid,
3291 did: event.did,
3292 collection: event.collection,
3293 value: event.value,
3294 indexed_at: event.indexed_at,
3295 };
3296 let container = RecordContainer {
3297 record: indexed_record,
3298 };
3299 yield Ok(FieldValue::owned_any(container));
3300 }
3301 };
3302
3303 Ok(stream)
3304 })
3305 },
3306 )
3307 .description(format!("Subscribe to {} record update events", nsid)));
3308
3309 // {collection}Deleted subscription - returns just the URI string
3310 let deleted_field_name = format!("{}Deleted", field_base_name);
3311 let slice_for_deleted = slice_uri.clone();
3312 let nsid_for_deleted = nsid.to_string();
3313
3314 subscription = subscription.field(SubscriptionField::new(
3315 &deleted_field_name,
3316 TypeRef::named_nn(TypeRef::STRING),
3317 move |_ctx| {
3318 let slice_uri = slice_for_deleted.clone();
3319 let collection = nsid_for_deleted.clone();
3320
3321 SubscriptionFieldFuture::new(async move {
3322 let mut receiver = PUBSUB.subscribe(&slice_uri).await;
3323
3324 let stream = async_stream::stream! {
3325 while let Ok(event) = receiver.recv().await {
3326 // Filter by collection and operation
3327 if event.collection != collection || event.operation != crate::graphql::RecordOperation::Delete {
3328 continue;
3329 }
3330
3331 // For deletes, just return the URI
3332 yield Ok(FieldValue::value(GraphQLValue::String(event.uri)));
3333 }
3334 };
3335
3336 Ok(stream)
3337 })
3338 },
3339 )
3340 .description(format!("Subscribe to {} record deletion events. Returns the URI of deleted records.", nsid)));
3341 }
3342
3343 subscription
3344}
3345
3346/// Helper function to parse GraphQL where clause recursively
3347fn parse_where_clause(
3348 where_obj: async_graphql::dynamic::ObjectAccessor,
3349) -> crate::models::WhereClause {
3350 let mut where_clause = crate::models::WhereClause {
3351 conditions: HashMap::new(),
3352 or_conditions: None,
3353 and: None,
3354 or: None,
3355 };
3356
3357 for (field_name, condition_val) in where_obj.iter() {
3358 let field_str = field_name.as_str();
3359
3360 // Handle nested AND array
3361 if field_str == "and" {
3362 if let Ok(and_list) = condition_val.list() {
3363 let mut and_clauses = Vec::new();
3364 for item in and_list.iter() {
3365 if let Ok(obj) = item.object() {
3366 and_clauses.push(parse_where_clause(obj));
3367 }
3368 }
3369 if !and_clauses.is_empty() {
3370 where_clause.and = Some(and_clauses);
3371 }
3372 }
3373 continue;
3374 }
3375
3376 // Handle nested OR array
3377 if field_str == "or" {
3378 if let Ok(or_list) = condition_val.list() {
3379 let mut or_clauses = Vec::new();
3380 for item in or_list.iter() {
3381 if let Ok(obj) = item.object() {
3382 or_clauses.push(parse_where_clause(obj));
3383 }
3384 }
3385 if !or_clauses.is_empty() {
3386 where_clause.or = Some(or_clauses);
3387 }
3388 }
3389 continue;
3390 }
3391
3392 // Handle regular field conditions
3393 if let Ok(condition_obj) = condition_val.object() {
3394 let mut where_condition = crate::models::WhereCondition {
3395 eq: None,
3396 in_values: None,
3397 contains: None,
3398 fuzzy: None,
3399 gt: None,
3400 gte: None,
3401 lt: None,
3402 lte: None,
3403 };
3404
3405 // Parse eq condition
3406 if let Some(eq_val) = condition_obj.get("eq") {
3407 if let Ok(eq_str) = eq_val.string() {
3408 where_condition.eq = Some(serde_json::Value::String(eq_str.to_string()));
3409 } else if let Ok(eq_i64) = eq_val.i64() {
3410 where_condition.eq = Some(serde_json::Value::Number(eq_i64.into()));
3411 }
3412 }
3413
3414 // Parse in condition
3415 if let Some(in_val) = condition_obj.get("in") {
3416 if let Ok(in_list) = in_val.list() {
3417 let mut values = Vec::new();
3418 for item in in_list.iter() {
3419 if let Ok(s) = item.string() {
3420 values.push(serde_json::Value::String(s.to_string()));
3421 } else if let Ok(i) = item.i64() {
3422 values.push(serde_json::Value::Number(i.into()));
3423 }
3424 }
3425 where_condition.in_values = Some(values);
3426 }
3427 }
3428
3429 // Parse contains condition
3430 if let Some(contains_val) = condition_obj.get("contains") {
3431 if let Ok(contains_str) = contains_val.string() {
3432 where_condition.contains = Some(contains_str.to_string());
3433 }
3434 }
3435
3436 // Parse fuzzy condition
3437 if let Some(fuzzy_val) = condition_obj.get("fuzzy") {
3438 if let Ok(fuzzy_str) = fuzzy_val.string() {
3439 where_condition.fuzzy = Some(fuzzy_str.to_string());
3440 }
3441 }
3442
3443 // Parse gt condition
3444 if let Some(gt_val) = condition_obj.get("gt") {
3445 if let Ok(gt_str) = gt_val.string() {
3446 where_condition.gt = Some(serde_json::Value::String(gt_str.to_string()));
3447 } else if let Ok(gt_i64) = gt_val.i64() {
3448 where_condition.gt = Some(serde_json::Value::Number(gt_i64.into()));
3449 }
3450 }
3451
3452 // Parse gte condition
3453 if let Some(gte_val) = condition_obj.get("gte") {
3454 if let Ok(gte_str) = gte_val.string() {
3455 where_condition.gte = Some(serde_json::Value::String(gte_str.to_string()));
3456 } else if let Ok(gte_i64) = gte_val.i64() {
3457 where_condition.gte = Some(serde_json::Value::Number(gte_i64.into()));
3458 }
3459 }
3460
3461 // Parse lt condition
3462 if let Some(lt_val) = condition_obj.get("lt") {
3463 if let Ok(lt_str) = lt_val.string() {
3464 where_condition.lt = Some(serde_json::Value::String(lt_str.to_string()));
3465 } else if let Ok(lt_i64) = lt_val.i64() {
3466 where_condition.lt = Some(serde_json::Value::Number(lt_i64.into()));
3467 }
3468 }
3469
3470 // Parse lte condition
3471 if let Some(lte_val) = condition_obj.get("lte") {
3472 if let Ok(lte_str) = lte_val.string() {
3473 where_condition.lte = Some(serde_json::Value::String(lte_str.to_string()));
3474 } else if let Ok(lte_i64) = lte_val.i64() {
3475 where_condition.lte = Some(serde_json::Value::Number(lte_i64.into()));
3476 }
3477 }
3478
3479 // Convert indexedAt to indexed_at for database column
3480 let db_field_name = if field_str == "indexedAt" {
3481 "indexed_at".to_string()
3482 } else {
3483 field_str.to_string()
3484 };
3485
3486 where_clause
3487 .conditions
3488 .insert(db_field_name, where_condition);
3489 }
3490 }
3491
3492 where_clause
3493}
3494
3495/// Creates an input type for mutations from lexicon fields
3496fn create_mutation_input_type(type_name: &str, fields: &[GraphQLField]) -> InputObject {
3497 let mut input = InputObject::new(format!("{}Input", type_name));
3498
3499 for field in fields {
3500 let field_type_ref = match &field.field_type {
3501 GraphQLType::String => TypeRef::named(TypeRef::STRING),
3502 GraphQLType::Int => TypeRef::named(TypeRef::INT),
3503 GraphQLType::Boolean => TypeRef::named(TypeRef::BOOLEAN),
3504 GraphQLType::Float => TypeRef::named(TypeRef::FLOAT),
3505 GraphQLType::Array(inner) => match **inner {
3506 GraphQLType::String => TypeRef::named_list(TypeRef::STRING),
3507 GraphQLType::Int => TypeRef::named_list(TypeRef::INT),
3508 _ => TypeRef::named("JSON"),
3509 },
3510 _ => TypeRef::named("JSON"),
3511 };
3512
3513 let field_type_ref = if field.is_required {
3514 TypeRef::NonNull(Box::new(field_type_ref))
3515 } else {
3516 field_type_ref
3517 };
3518
3519 input = input.field(InputValue::new(&field.name, field_type_ref));
3520 }
3521
3522 input
3523}
3524
3525/// Transforms fields in record data from GraphQL format to AT Protocol format
3526///
3527/// Blob fields:
3528/// - GraphQL format: `{ref: "bafyrei...", mimeType: "...", size: 123}`
3529/// - AT Protocol format: `{$type: "blob", ref: {$link: "bafyrei..."}, mimeType: "...", size: 123}`
3530///
3531/// Lexicon ref fields:
3532/// - Adds `$type: "{ref_nsid}"` to objects (e.g., `{$type: "community.lexicon.location.hthree#main", ...}`)
3533///
3534/// Nested objects:
3535/// - Recursively processes nested objects and arrays
3536fn transform_fields_for_atproto(
3537 mut data: serde_json::Value,
3538 fields: &[GraphQLField],
3539) -> serde_json::Value {
3540 if let serde_json::Value::Object(ref mut map) = data {
3541 for field in fields {
3542 if let Some(field_value) = map.get_mut(&field.name) {
3543 match &field.field_type {
3544 GraphQLType::Blob => {
3545 // Transform single blob field
3546 if let Some(blob_obj) = field_value.as_object_mut() {
3547 // Add $type: "blob"
3548 blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string()));
3549
3550 // Check if ref is a string (GraphQL format)
3551 if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") {
3552 // Transform to {$link: "cid"} (AT Protocol format)
3553 let link_obj = serde_json::json!({
3554 "$link": cid
3555 });
3556 blob_obj.insert("ref".to_string(), link_obj);
3557 }
3558 }
3559 }
3560 GraphQLType::LexiconRef(ref_nsid) => {
3561 // Transform lexicon ref field by adding $type
3562 if let Some(ref_obj) = field_value.as_object_mut() {
3563 ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone()));
3564 }
3565 }
3566 GraphQLType::Object(nested_fields) => {
3567 // Recursively transform nested objects
3568 *field_value = transform_fields_for_atproto(field_value.clone(), nested_fields);
3569 }
3570 GraphQLType::Array(inner) => {
3571 match inner.as_ref() {
3572 GraphQLType::Blob => {
3573 // Transform array of blobs
3574 if let Some(arr) = field_value.as_array_mut() {
3575 for blob_value in arr {
3576 if let Some(blob_obj) = blob_value.as_object_mut() {
3577 // Add $type: "blob"
3578 blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string()));
3579
3580 if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") {
3581 let link_obj = serde_json::json!({
3582 "$link": cid
3583 });
3584 blob_obj.insert("ref".to_string(), link_obj);
3585 }
3586 }
3587 }
3588 }
3589 }
3590 GraphQLType::LexiconRef(ref_nsid) => {
3591 // Transform array of lexicon refs
3592 if let Some(arr) = field_value.as_array_mut() {
3593 for ref_value in arr {
3594 if let Some(ref_obj) = ref_value.as_object_mut() {
3595 ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone()));
3596 }
3597 }
3598 }
3599 }
3600 GraphQLType::Object(nested_fields) => {
3601 // Transform array of objects recursively
3602 if let Some(arr) = field_value.as_array_mut() {
3603 for item in arr {
3604 *item = transform_fields_for_atproto(item.clone(), nested_fields);
3605 }
3606 }
3607 }
3608 _ => {} // Other array types don't need transformation
3609 }
3610 }
3611 _ => {} // Other field types don't need transformation
3612 }
3613 }
3614 }
3615 }
3616
3617 data
3618}
3619
3620/// Adds a create mutation for a collection
3621fn add_create_mutation(
3622 mutation: Object,
3623 type_name: &str,
3624 nsid: &str,
3625 fields: &[GraphQLField],
3626 database: Database,
3627 slice_uri: String,
3628) -> Object {
3629 let mutation_name = format!("create{}", type_name);
3630 let nsid = nsid.to_string();
3631 let nsid_clone = nsid.clone();
3632 let fields = fields.to_vec();
3633
3634 mutation.field(
3635 Field::new(
3636 mutation_name,
3637 TypeRef::named_nn(type_name),
3638 move |ctx| {
3639 let db = database.clone();
3640 let slice = slice_uri.clone();
3641 let collection = nsid.clone();
3642 let fields = fields.clone();
3643
3644 FieldFuture::new(async move {
3645 // Get GraphQL context which contains auth info
3646 let gql_ctx = ctx.data::<crate::graphql::GraphQLContext>()
3647 .map_err(|_| Error::new("Missing GraphQL context"))?;
3648
3649 // Check if user is authenticated
3650 let token = gql_ctx.auth_token.as_ref()
3651 .ok_or_else(|| Error::new("Authentication required"))?;
3652
3653 // Extract input data
3654 let input = ctx.args.get("input")
3655 .ok_or_else(|| Error::new("Missing input argument"))?;
3656
3657 // Convert GraphQL value to JSON using deserialize
3658 let mut record_data: serde_json::Value = input.deserialize()
3659 .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?;
3660
3661 // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs)
3662 record_data = transform_fields_for_atproto(record_data, &fields);
3663
3664 // Optional rkey argument
3665 let rkey = ctx.args.get("rkey")
3666 .and_then(|v| v.string().ok())
3667 .filter(|s| !s.is_empty())
3668 .map(|s| s.to_string());
3669
3670 // Verify OAuth token and get user info
3671 let user_info = crate::auth::verify_oauth_token_cached(
3672 token,
3673 &gql_ctx.auth_base_url,
3674 gql_ctx.auth_cache.clone(),
3675 )
3676 .await
3677 .map_err(|e| Error::new(format!("Invalid token: {}", e)))?;
3678
3679 // Get AT Protocol DPoP auth and PDS URL
3680 let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached(
3681 token,
3682 &gql_ctx.auth_base_url,
3683 gql_ctx.auth_cache.clone(),
3684 )
3685 .await
3686 .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?;
3687
3688 let repo = user_info.did.unwrap_or(user_info.sub);
3689
3690 // Validate record against lexicon if not network.slices.lexicon
3691 let validation_slice_uri = if collection == "network.slices.lexicon" {
3692 std::env::var("SYSTEM_SLICE_URI").unwrap_or_default()
3693 } else {
3694 slice.clone()
3695 };
3696
3697 if !validation_slice_uri.is_empty() {
3698 if let Ok(lexicons) = db.get_lexicons_by_slice(&validation_slice_uri).await {
3699 if !lexicons.is_empty() {
3700 if let Err(e) = slices_lexicon::validate_record(
3701 lexicons,
3702 &collection,
3703 record_data.clone()
3704 ) {
3705 return Err(Error::new(format!("Validation error: {}", e)));
3706 }
3707 }
3708 }
3709 }
3710
3711 // Create record using AT Protocol
3712 let http_client = reqwest::Client::new();
3713 let create_request = atproto_client::com::atproto::repo::CreateRecordRequest {
3714 repo: repo.clone(),
3715 collection: collection.clone(),
3716 record_key: rkey,
3717 record: record_data.clone(),
3718 swap_commit: None,
3719 validate: false,
3720 };
3721
3722 let result = atproto_client::com::atproto::repo::create_record(
3723 &http_client,
3724 &atproto_client::client::Auth::DPoP(dpop_auth),
3725 &pds_url,
3726 create_request,
3727 )
3728 .await
3729 .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?;
3730
3731 // Extract URI and CID from the response
3732 let (uri, cid) = match result {
3733 atproto_client::com::atproto::repo::CreateRecordResponse::StrongRef { uri, cid, .. } => (uri, cid),
3734 atproto_client::com::atproto::repo::CreateRecordResponse::Error(e) => {
3735 return Err(Error::new(format!("AT Protocol error: {} - {}", e.error.unwrap_or_default(), e.message.unwrap_or_default())));
3736 }
3737 };
3738
3739 // Extract the target slice URI for lexicon records before moving record_data
3740 let target_slice_for_invalidation = if collection == "network.slices.lexicon" {
3741 record_data.get("slice")
3742 .and_then(|v| v.as_str())
3743 .map(|s| s.to_string())
3744 } else {
3745 None
3746 };
3747
3748 // Store in local database for indexing
3749 let record = crate::models::Record {
3750 uri: uri.clone(),
3751 cid: cid.clone(),
3752 did: repo,
3753 collection: collection.clone(),
3754 json: record_data,
3755 indexed_at: chrono::Utc::now(),
3756 slice_uri: Some(slice.clone()),
3757 };
3758
3759 let _ = db.insert_record(&record).await;
3760
3761 // Invalidate GraphQL schema cache if this is a lexicon record
3762 if let Some(target_slice) = target_slice_for_invalidation {
3763 crate::graphql::invalidate_schema_cache(&target_slice).await;
3764 }
3765
3766 // Fetch the created record and return it so Relay can update its cache
3767 let created_record = db.get_record(&uri).await
3768 .map_err(|e| Error::new(format!("Failed to fetch created record: {}", e)))?
3769 .ok_or_else(|| Error::new("Created record not found"))?;
3770
3771 // Wrap the record in a RecordContainer so the type fields can access it
3772 let container = RecordContainer {
3773 record: created_record,
3774 };
3775
3776 Ok(Some(FieldValue::owned_any(container)))
3777 })
3778 },
3779 )
3780 .argument(InputValue::new("input", TypeRef::named_nn(format!("{}Input", type_name))))
3781 .argument(InputValue::new("rkey", TypeRef::named(TypeRef::STRING)))
3782 .description(format!("Create a new {} record", nsid_clone))
3783 )
3784}
3785
3786/// Adds an update mutation for a collection
3787fn add_update_mutation(
3788 mutation: Object,
3789 type_name: &str,
3790 nsid: &str,
3791 fields: &[GraphQLField],
3792 database: Database,
3793 slice_uri: String,
3794) -> Object {
3795 let mutation_name = format!("update{}", type_name);
3796 let nsid = nsid.to_string();
3797 let nsid_clone = nsid.clone();
3798 let fields = fields.to_vec();
3799
3800 mutation.field(
3801 Field::new(
3802 mutation_name,
3803 TypeRef::named_nn(type_name),
3804 move |ctx| {
3805 let db = database.clone();
3806 let slice = slice_uri.clone();
3807 let collection = nsid.clone();
3808 let fields = fields.clone();
3809
3810 FieldFuture::new(async move {
3811 // Get GraphQL context which contains auth info
3812 let gql_ctx = ctx.data::<crate::graphql::GraphQLContext>()
3813 .map_err(|_| Error::new("Missing GraphQL context"))?;
3814
3815 // Check if user is authenticated
3816 let token = gql_ctx.auth_token.as_ref()
3817 .ok_or_else(|| Error::new("Authentication required"))?;
3818
3819 // Get rkey
3820 let rkey = ctx.args.get("rkey")
3821 .and_then(|v| v.string().ok())
3822 .ok_or_else(|| Error::new("Missing rkey argument"))?
3823 .to_string();
3824
3825 // Extract input data
3826 let input = ctx.args.get("input")
3827 .ok_or_else(|| Error::new("Missing input argument"))?;
3828
3829 // Convert GraphQL value to JSON using deserialize
3830 let mut record_data: serde_json::Value = input.deserialize()
3831 .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?;
3832
3833 // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs)
3834 record_data = transform_fields_for_atproto(record_data, &fields);
3835
3836 // Verify OAuth token and get user info
3837 let user_info = crate::auth::verify_oauth_token_cached(
3838 token,
3839 &gql_ctx.auth_base_url,
3840 gql_ctx.auth_cache.clone(),
3841 )
3842 .await
3843 .map_err(|e| Error::new(format!("Invalid token: {}", e)))?;
3844
3845 // Get AT Protocol DPoP auth and PDS URL
3846 let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached(
3847 token,
3848 &gql_ctx.auth_base_url,
3849 gql_ctx.auth_cache.clone(),
3850 )
3851 .await
3852 .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?;
3853
3854 let repo = user_info.did.unwrap_or(user_info.sub);
3855
3856 // Validate record against lexicon
3857 let validation_slice_uri = if collection == "network.slices.lexicon" {
3858 std::env::var("SYSTEM_SLICE_URI").unwrap_or_default()
3859 } else {
3860 slice.clone()
3861 };
3862
3863 if !validation_slice_uri.is_empty() {
3864 if let Ok(lexicons) = db.get_lexicons_by_slice(&validation_slice_uri).await {
3865 if !lexicons.is_empty() {
3866 if let Err(e) = slices_lexicon::validate_record(
3867 lexicons,
3868 &collection,
3869 record_data.clone()
3870 ) {
3871 return Err(Error::new(format!("Validation error: {}", e)));
3872 }
3873 }
3874 }
3875 }
3876
3877 // Update record using AT Protocol
3878 let http_client = reqwest::Client::new();
3879 let put_request = atproto_client::com::atproto::repo::PutRecordRequest {
3880 repo: repo.clone(),
3881 collection: collection.clone(),
3882 record_key: rkey.clone(),
3883 record: record_data.clone(),
3884 swap_record: None,
3885 swap_commit: None,
3886 validate: false,
3887 };
3888
3889 let result = atproto_client::com::atproto::repo::put_record(
3890 &http_client,
3891 &atproto_client::client::Auth::DPoP(dpop_auth),
3892 &pds_url,
3893 put_request,
3894 )
3895 .await
3896 .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?;
3897
3898 // Extract URI and CID from the response
3899 let (uri, cid) = match result {
3900 atproto_client::com::atproto::repo::PutRecordResponse::StrongRef { uri, cid, .. } => (uri, cid),
3901 atproto_client::com::atproto::repo::PutRecordResponse::Error(e) => {
3902 return Err(Error::new(format!("AT Protocol error: {} - {}", e.error.unwrap_or_default(), e.message.unwrap_or_default())));
3903 }
3904 };
3905
3906 // Extract the target slice URI for lexicon records before moving record_data
3907 let target_slice_for_invalidation = if collection == "network.slices.lexicon" {
3908 record_data.get("slice")
3909 .and_then(|v| v.as_str())
3910 .map(|s| s.to_string())
3911 } else {
3912 None
3913 };
3914
3915 // Update in local database
3916 let record = crate::models::Record {
3917 uri: uri.clone(),
3918 cid: cid.clone(),
3919 did: repo,
3920 collection: collection.clone(),
3921 json: record_data,
3922 indexed_at: chrono::Utc::now(),
3923 slice_uri: Some(slice.clone()),
3924 };
3925
3926 let _ = db.update_record(&record).await;
3927
3928 // Invalidate GraphQL schema cache if this is a lexicon record
3929 if let Some(target_slice) = target_slice_for_invalidation {
3930 crate::graphql::invalidate_schema_cache(&target_slice).await;
3931 }
3932
3933 // Fetch the updated record and return it so Relay can update its cache
3934 let updated_record = db.get_record(&uri).await
3935 .map_err(|e| Error::new(format!("Failed to fetch updated record: {}", e)))?
3936 .ok_or_else(|| Error::new("Updated record not found"))?;
3937
3938 // Wrap the record in a RecordContainer so the type fields can access it
3939 let container = RecordContainer {
3940 record: updated_record,
3941 };
3942
3943 Ok(Some(FieldValue::owned_any(container)))
3944 })
3945 },
3946 )
3947 .argument(InputValue::new("rkey", TypeRef::named_nn(TypeRef::STRING)))
3948 .argument(InputValue::new("input", TypeRef::named_nn(format!("{}Input", type_name))))
3949 .description(format!("Update a {} record", nsid_clone))
3950 )
3951}
3952
3953/// Adds a delete mutation for a collection
3954fn add_delete_mutation(
3955 mutation: Object,
3956 type_name: &str,
3957 nsid: &str,
3958 database: Database,
3959 slice_uri: String,
3960) -> Object {
3961 let mutation_name = format!("delete{}", type_name);
3962 let nsid = nsid.to_string();
3963 let nsid_clone = nsid.clone();
3964
3965 mutation.field(
3966 Field::new(
3967 mutation_name,
3968 TypeRef::named_nn(type_name),
3969 move |ctx| {
3970 let _db = database.clone();
3971 let _slice = slice_uri.clone();
3972 let _collection = nsid.clone();
3973
3974 FieldFuture::new(async move {
3975 // Get GraphQL context which contains auth info
3976 let gql_ctx = ctx.data::<crate::graphql::GraphQLContext>()
3977 .map_err(|_| Error::new("Missing GraphQL context"))?;
3978
3979 // Check if user is authenticated
3980 let token = gql_ctx.auth_token.as_ref()
3981 .ok_or_else(|| Error::new("Authentication required"))?;
3982
3983 // Get rkey
3984 let rkey = ctx.args.get("rkey")
3985 .and_then(|v| v.string().ok())
3986 .ok_or_else(|| Error::new("Missing rkey argument"))?
3987 .to_string();
3988
3989 // Verify OAuth token and get user info
3990 let user_info = crate::auth::verify_oauth_token_cached(
3991 token,
3992 &gql_ctx.auth_base_url,
3993 gql_ctx.auth_cache.clone(),
3994 )
3995 .await
3996 .map_err(|e| Error::new(format!("Invalid token: {}", e)))?;
3997
3998 // Get AT Protocol DPoP auth and PDS URL
3999 let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached(
4000 token,
4001 &gql_ctx.auth_base_url,
4002 gql_ctx.auth_cache.clone(),
4003 )
4004 .await
4005 .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?;
4006
4007 let repo = user_info.did.unwrap_or(user_info.sub);
4008 let uri = format!("at://{}/{}/{}", repo, _collection, rkey);
4009
4010 // Fetch the record before deleting it
4011 let deleted_record = _db.get_record(&uri).await
4012 .map_err(|e| Error::new(format!("Failed to fetch record: {}", e)))?
4013 .ok_or_else(|| Error::new("Record not found"))?;
4014
4015 // Extract the target slice URI for lexicon records before deletion
4016 let target_slice_for_invalidation = if _collection == "network.slices.lexicon" {
4017 deleted_record.value.get("slice")
4018 .and_then(|v| v.as_str())
4019 .map(|s| s.to_string())
4020 } else {
4021 None
4022 };
4023
4024 // Delete record using AT Protocol
4025 let http_client = reqwest::Client::new();
4026 let delete_request = atproto_client::com::atproto::repo::DeleteRecordRequest {
4027 repo: repo.clone(),
4028 collection: _collection.clone(),
4029 record_key: rkey.clone(),
4030 swap_record: None,
4031 swap_commit: None,
4032 };
4033
4034 atproto_client::com::atproto::repo::delete_record(
4035 &http_client,
4036 &atproto_client::client::Auth::DPoP(dpop_auth),
4037 &pds_url,
4038 delete_request,
4039 )
4040 .await
4041 .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?;
4042
4043 // Handle cascade deletion
4044 if let Err(e) = _db.handle_cascade_deletion(&uri, &_collection).await {
4045 tracing::warn!("Cascade deletion failed for {}: {}", uri, e);
4046 }
4047
4048 // Delete from local database
4049 let _ = _db.delete_record_by_uri(&uri, None).await;
4050
4051 // Invalidate GraphQL schema cache if this is a lexicon record
4052 if let Some(target_slice) = target_slice_for_invalidation {
4053 crate::graphql::invalidate_schema_cache(&target_slice).await;
4054 }
4055
4056 // Wrap the record in a RecordContainer so the type fields can access it
4057 let container = RecordContainer {
4058 record: deleted_record,
4059 };
4060
4061 Ok(Some(FieldValue::owned_any(container)))
4062 })
4063 },
4064 )
4065 .argument(InputValue::new("rkey", TypeRef::named_nn(TypeRef::STRING)))
4066 .description(format!("Delete a {} record", nsid_clone))
4067 )
4068}
4069
4070/// Creates the MutationResponse type for create/update mutations
4071fn create_mutation_response_type() -> Object {
4072 Object::new("MutationResponse")
4073 .field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
4074 FieldFuture::new(async move {
4075 // Parent value is a GraphQLValue, extract the Object variant
4076 if let Ok(value) = ctx.parent_value.try_downcast_ref::<GraphQLValue>() {
4077 if let GraphQLValue::Object(obj) = value {
4078 if let Some(uri) = obj.get(&async_graphql::Name::new("uri")) {
4079 return Ok(Some(uri.clone()));
4080 }
4081 }
4082 }
4083 Ok(None)
4084 })
4085 }))
4086 .field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| {
4087 FieldFuture::new(async move {
4088 // Parent value is a GraphQLValue, extract the Object variant
4089 if let Ok(value) = ctx.parent_value.try_downcast_ref::<GraphQLValue>() {
4090 if let GraphQLValue::Object(obj) = value {
4091 if let Some(cid) = obj.get(&async_graphql::Name::new("cid")) {
4092 return Ok(Some(cid.clone()));
4093 }
4094 }
4095 }
4096 Ok(None)
4097 })
4098 }))
4099}