Highly ambitious ATProtocol AppView service and sdks
at main 4099 lines 182 kB view raw
1//! Dynamic GraphQL schema builder from AT Protocol lexicons 2//! 3//! This module generates GraphQL schemas at runtime based on lexicon definitions 4//! stored in the database, enabling flexible querying of slice records. 5 6use async_graphql::dynamic::{ 7 Enum, EnumItem, Field, FieldFuture, FieldValue, InputObject, InputValue, Object, Scalar, 8 Schema, Subscription, SubscriptionField, SubscriptionFieldFuture, TypeRef, 9}; 10use async_graphql::{Error, Value as GraphQLValue}; 11use base64::Engine; 12use base64::engine::general_purpose; 13use serde_json; 14use std::collections::HashMap; 15use std::sync::Arc; 16use tokio::sync::Mutex; 17 18use crate::database::Database; 19use crate::graphql::dataloader::GraphQLContext; 20use crate::graphql::schema_ext::{ 21 add_cancel_job_mutation, add_create_oauth_client_mutation, add_delete_job_mutation, 22 add_delete_oauth_client_mutation, add_delete_slice_records_mutation, add_get_sync_summary_query, 23 add_jetstream_logs_query, add_jetstream_logs_subscription, add_oauth_clients_query, 24 add_oauth_clients_field_to_slice, add_slice_records_query, add_sparklines_query, 25 add_sparklines_field_to_slice, add_start_sync_mutation, add_stats_field_to_slice, 26 add_sync_job_logs_query, add_sync_job_query, add_sync_job_subscription, add_sync_jobs_query, 27 add_update_oauth_client_mutation, add_upload_blob_mutation, create_blob_upload_response_type, 28 create_collection_stats_type, create_collection_summary_type, 29 create_delete_slice_records_output_type, create_jetstream_log_entry_type, 30 create_oauth_client_type, create_slice_record_type, create_slice_record_edge_type, 31 create_slice_records_connection_type, create_slice_records_where_input, 32 create_slice_sparkline_type, create_slice_stats_type, create_sparkline_point_type, 33 create_start_sync_output_type, create_sync_job_result_type, create_sync_job_type, 34 create_sync_summary_type, 35}; 36use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType}; 37use crate::graphql::PUBSUB; 38 39/// Metadata about a collection for cross-referencing 40#[derive(Clone)] 41struct CollectionMeta { 42 nsid: String, 43 key_type: String, // "tid", "literal:self", or "any" 44 type_name: String, // GraphQL type name for this collection 45 at_uri_fields: Vec<String>, // Fields with format "at-uri" for reverse joins 46} 47 48/// Type registry for tracking generated nested object types 49type TypeRegistry = HashMap<String, Object>; 50 51/// Container for nested object field values 52#[derive(Clone)] 53struct NestedObjectContainer { 54 data: serde_json::Value, 55} 56 57/// Generates a unique type name for a nested object field 58fn generate_nested_type_name(parent_type: &str, field_name: &str) -> String { 59 let mut chars = field_name.chars(); 60 let capitalized_field = match chars.next() { 61 None => String::new(), 62 Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(), 63 }; 64 format!("{}{}", parent_type, capitalized_field) 65} 66 67/// Resolves a lexicon ref and generates a GraphQL type for it 68/// Returns the generated type name 69fn resolve_lexicon_ref_type( 70 ref_nsid: &str, 71 current_lexicon_nsid: &str, 72 all_lexicons: &[serde_json::Value], 73 type_registry: &mut TypeRegistry, 74 database: &Database, 75) -> String { 76 // Handle different ref formats: 77 // 1. Local ref: #image 78 // 2. External ref with specific def: app.bsky.embed.defs#aspectRatio 79 // 3. External ref to main: community.lexicon.location.hthree 80 let (target_nsid, def_name) = if ref_nsid.starts_with('#') { 81 // Local ref - use current lexicon NSID and the def name without # 82 (current_lexicon_nsid, &ref_nsid[1..]) 83 } else if let Some(hash_pos) = ref_nsid.find('#') { 84 // External ref with specific def - split on # 85 (&ref_nsid[..hash_pos], &ref_nsid[hash_pos + 1..]) 86 } else { 87 // External ref to main def 88 (ref_nsid, "main") 89 }; 90 91 // Generate type name from NSID and def name 92 let type_name = if def_name == "main" { 93 // For refs to main: CommunityLexiconLocationHthree 94 nsid_to_type_name(target_nsid) 95 } else { 96 // For refs to specific def: AppBskyEmbedDefsAspectRatio 97 format!("{}{}", nsid_to_type_name(target_nsid), capitalize_first(def_name)) 98 }; 99 100 // Check if already generated 101 if type_registry.contains_key(&type_name) { 102 return type_name; 103 } 104 105 // Find the lexicon definition 106 let lexicon = all_lexicons.iter().find(|lex| { 107 lex.get("id").and_then(|id| id.as_str()) == Some(target_nsid) 108 }); 109 110 if let Some(lex) = lexicon { 111 // Extract the definition (either "main" or specific def like "image") 112 if let Some(defs) = lex.get("defs") { 113 if let Some(def) = defs.get(def_name) { 114 // Extract fields from this specific definition 115 if let Some(properties) = def.get("properties") { 116 let fields = extract_fields_from_properties(properties); 117 118 if !fields.is_empty() { 119 // Generate the type using existing nested object generator 120 generate_nested_object_type(&type_name, &fields, type_registry, database); 121 return type_name; 122 } 123 } 124 } 125 } 126 } 127 128 // Fallback: couldn't resolve the ref, will use JSON 129 tracing::warn!("Could not resolve lexicon ref: {} (target: {}, def: {})", ref_nsid, target_nsid, def_name); 130 type_name 131} 132 133/// Capitalizes the first character of a string 134fn capitalize_first(s: &str) -> String { 135 let mut chars = s.chars(); 136 match chars.next() { 137 None => String::new(), 138 Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(), 139 } 140} 141 142/// Extracts fields from a lexicon properties object 143fn extract_fields_from_properties(properties: &serde_json::Value) -> Vec<GraphQLField> { 144 let mut fields = Vec::new(); 145 146 if let Some(props) = properties.as_object() { 147 for (field_name, field_def) in props { 148 let field_type_str = field_def.get("type").and_then(|t| t.as_str()).unwrap_or("unknown"); 149 let field_type = crate::graphql::types::map_lexicon_type_to_graphql(field_type_str, field_def); 150 151 // Check if field is required 152 let is_required = false; // We'd need the parent's "required" array to know this 153 154 // Extract format if present 155 let format = field_def.get("format").and_then(|f| f.as_str()).map(|s| s.to_string()); 156 157 fields.push(GraphQLField { 158 name: field_name.clone(), 159 field_type, 160 is_required, 161 format, 162 }); 163 } 164 } 165 166 fields 167} 168 169/// Recursively generates GraphQL object types for nested objects 170/// Returns the type name of the generated object type 171fn generate_nested_object_type( 172 type_name: &str, 173 fields: &[GraphQLField], 174 type_registry: &mut TypeRegistry, 175 database: &Database, 176) -> String { 177 // Check if type already exists in registry 178 if type_registry.contains_key(type_name) { 179 return type_name.to_string(); 180 } 181 182 let mut object = Object::new(type_name); 183 184 // Add fields to the object 185 for field in fields { 186 let field_name = field.name.clone(); 187 let field_name_for_field = field_name.clone(); // Clone for Field::new 188 let field_type = field.field_type.clone(); 189 190 // Determine the TypeRef for this field 191 let type_ref = match &field.field_type { 192 GraphQLType::Object(nested_fields) => { 193 // Generate nested object type recursively 194 let nested_type_name = generate_nested_type_name(type_name, &field_name); 195 let actual_type_name = generate_nested_object_type( 196 &nested_type_name, 197 nested_fields, 198 type_registry, 199 database, 200 ); 201 202 if field.is_required { 203 TypeRef::named_nn(actual_type_name) 204 } else { 205 TypeRef::named(actual_type_name) 206 } 207 } 208 GraphQLType::Array(inner) => { 209 if let GraphQLType::Object(nested_fields) = inner.as_ref() { 210 // Generate nested object type for array items 211 let nested_type_name = generate_nested_type_name(type_name, &field_name); 212 let actual_type_name = generate_nested_object_type( 213 &nested_type_name, 214 nested_fields, 215 type_registry, 216 database, 217 ); 218 219 if field.is_required { 220 TypeRef::named_nn_list(actual_type_name) 221 } else { 222 TypeRef::named_list(actual_type_name) 223 } 224 } else { 225 // Use standard type ref for arrays of primitives 226 graphql_type_to_typeref(&field.field_type, field.is_required) 227 } 228 } 229 _ => { 230 // Use standard type ref for other types 231 graphql_type_to_typeref(&field.field_type, field.is_required) 232 } 233 }; 234 235 // Add field with resolver 236 object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| { 237 let field_name = field_name.clone(); 238 let field_type = field_type.clone(); 239 240 FieldFuture::new(async move { 241 // Get parent container 242 let container = ctx.parent_value.try_downcast_ref::<NestedObjectContainer>()?; 243 let value = container.data.get(&field_name); 244 245 if let Some(val) = value { 246 if val.is_null() { 247 return Ok(None); 248 } 249 250 // For nested objects, wrap in container 251 if matches!(field_type, GraphQLType::Object(_)) { 252 let nested_container = NestedObjectContainer { 253 data: val.clone(), 254 }; 255 return Ok(Some(FieldValue::owned_any(nested_container))); 256 } 257 258 // For arrays of objects, wrap each item 259 if let GraphQLType::Array(inner) = &field_type { 260 if matches!(inner.as_ref(), GraphQLType::Object(_)) { 261 if let Some(arr) = val.as_array() { 262 let containers: Vec<FieldValue> = arr 263 .iter() 264 .map(|item| { 265 let nested_container = NestedObjectContainer { 266 data: item.clone(), 267 }; 268 FieldValue::owned_any(nested_container) 269 }) 270 .collect(); 271 return Ok(Some(FieldValue::list(containers))); 272 } 273 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))); 274 } 275 } 276 277 // For other types, return the GraphQL value 278 let graphql_val = json_to_graphql_value(val); 279 Ok(Some(FieldValue::value(graphql_val))) 280 } else { 281 Ok(None) 282 } 283 }) 284 })); 285 } 286 287 // Store the generated type in registry 288 type_registry.insert(type_name.to_string(), object); 289 type_name.to_string() 290} 291 292/// Builds a dynamic GraphQL schema from lexicons for a given slice 293pub async fn build_graphql_schema(database: Database, slice_uri: String, auth_base_url: String) -> Result<Schema, String> { 294 // Fetch all lexicons for this slice 295 let all_lexicons = database 296 .get_lexicons_by_slice(&slice_uri) 297 .await 298 .map_err(|e| format!("Failed to load lexicons: {}", e))?; 299 300 // Deduplicate by NSID for schema building (keep most recent due to ORDER BY indexed_at DESC) 301 // This prevents duplicate type registration errors without hiding duplicates from users 302 let mut seen_nsids = std::collections::HashSet::new(); 303 let lexicons: Vec<serde_json::Value> = all_lexicons 304 .into_iter() 305 .filter(|lexicon| { 306 if let Some(nsid) = lexicon.get("id").and_then(|n| n.as_str()) { 307 seen_nsids.insert(nsid.to_string()) 308 } else { 309 true // Keep lexicons without an ID (will fail validation later) 310 } 311 }) 312 .collect(); 313 314 // Build Query root type and collect all object types 315 let mut query = Object::new("Query"); 316 let mut objects_to_register = Vec::new(); 317 let mut where_inputs_to_register = Vec::new(); 318 let mut group_by_enums_to_register = Vec::new(); 319 let mut mutation_inputs_to_register = Vec::new(); 320 321 // First pass: collect metadata about all collections for cross-referencing 322 let mut all_collections: Vec<CollectionMeta> = Vec::new(); 323 for lexicon in &lexicons { 324 let nsid = lexicon 325 .get("id") 326 .and_then(|n| n.as_str()) 327 .ok_or_else(|| "Lexicon missing id".to_string())?; 328 329 let defs = lexicon 330 .get("defs") 331 .ok_or_else(|| format!("Lexicon {} missing defs", nsid))?; 332 333 let fields = extract_collection_fields(defs); 334 if !fields.is_empty() { 335 if let Some(key_type) = extract_record_key(defs) { 336 // Extract at-uri field names for reverse joins 337 let at_uri_fields: Vec<String> = fields 338 .iter() 339 .filter(|f| f.format.as_deref() == Some("at-uri")) 340 .map(|f| f.name.clone()) 341 .collect(); 342 343 if !at_uri_fields.is_empty() { 344 tracing::debug!("Collection {} has at-uri fields: {:?}", nsid, at_uri_fields); 345 } 346 347 all_collections.push(CollectionMeta { 348 nsid: nsid.to_string(), 349 key_type, 350 type_name: nsid_to_type_name(nsid), 351 at_uri_fields, 352 }); 353 } 354 } 355 } 356 357 // Initialize type registry for nested object types 358 let mut type_registry: TypeRegistry = HashMap::new(); 359 360 // Second pass: create types and queries 361 for lexicon in &lexicons { 362 // get_lexicons_by_slice returns {lexicon: 1, id: "nsid", defs: {...}} 363 let nsid = lexicon 364 .get("id") 365 .and_then(|n| n.as_str()) 366 .ok_or_else(|| "Lexicon missing id".to_string())?; 367 368 let defs = lexicon 369 .get("defs") 370 .ok_or_else(|| format!("Lexicon {} missing defs", nsid))? 371 .clone(); 372 373 // Extract fields from lexicon 374 let fields = extract_collection_fields(&defs); 375 376 if !fields.is_empty() { 377 // Create a GraphQL type for this collection 378 let type_name = nsid_to_type_name(nsid); 379 let record_type = create_record_type( 380 &type_name, 381 &fields, 382 database.clone(), 383 slice_uri.clone(), 384 &all_collections, 385 auth_base_url.clone(), 386 &mut type_registry, 387 &lexicons, 388 nsid, 389 ); 390 391 // Create edge and connection types for this collection (Relay standard) 392 let edge_type = create_edge_type(&type_name); 393 let connection_type = create_connection_type(&type_name); 394 395 // Create WhereInput type for this collection 396 let mut where_input = InputObject::new(format!("{}WhereInput", type_name)); 397 398 // Collect lexicon field names to avoid duplicates 399 let lexicon_field_names: std::collections::HashSet<&str> = 400 fields.iter().map(|f| f.name.as_str()).collect(); 401 402 // Add system fields available on all records (skip if already in lexicon) 403 let system_fields = [ 404 ("indexedAt", "DateTimeFilter"), 405 ("uri", "StringFilter"), 406 ("cid", "StringFilter"), 407 ("did", "StringFilter"), 408 ("collection", "StringFilter"), 409 ("actorHandle", "StringFilter"), 410 ]; 411 412 for (field_name, filter_type) in system_fields { 413 if !lexicon_field_names.contains(field_name) { 414 where_input = 415 where_input.field(InputValue::new(field_name, TypeRef::named(filter_type))); 416 } 417 } 418 419 // Add fields from the lexicon 420 for field in &fields { 421 let filter_type = match field.field_type { 422 GraphQLType::Int => "IntFilter", 423 _ => "StringFilter", // Default to StringFilter for strings and other types 424 }; 425 where_input = 426 where_input.field(InputValue::new(&field.name, TypeRef::named(filter_type))); 427 } 428 429 // Add generic json field for querying record content 430 where_input = where_input.field(InputValue::new("json", TypeRef::named("StringFilter"))); 431 432 // Add nested and/or support 433 where_input = where_input 434 .field(InputValue::new( 435 "and", 436 TypeRef::named_list(format!("{}WhereInput", type_name)), 437 )) 438 .field(InputValue::new( 439 "or", 440 TypeRef::named_list(format!("{}WhereInput", type_name)), 441 )); 442 443 // Create GroupByField enum for this collection 444 let mut group_by_enum = Enum::new(format!("{}GroupByField", type_name)); 445 group_by_enum = group_by_enum.item(EnumItem::new("indexedAt")); 446 447 for field in &fields { 448 group_by_enum = group_by_enum.item(EnumItem::new(&field.name)); 449 } 450 451 // Create collection-specific GroupByFieldInput 452 let group_by_input = InputObject::new(format!("{}GroupByFieldInput", type_name)) 453 .field(InputValue::new( 454 "field", 455 TypeRef::named_nn(format!("{}GroupByField", type_name)), 456 )) 457 .field(InputValue::new("interval", TypeRef::named("DateInterval"))); 458 459 // Create collection-specific SortFieldInput 460 let sort_field_input = InputObject::new(format!("{}SortFieldInput", type_name)) 461 .field(InputValue::new( 462 "field", 463 TypeRef::named_nn(format!("{}GroupByField", type_name)), 464 )) 465 .field(InputValue::new( 466 "direction", 467 TypeRef::named("SortDirection"), 468 )); 469 470 // Create mutation input type for this collection 471 let mutation_input = create_mutation_input_type(&type_name, &fields); 472 473 // Collect the types to register with schema later 474 objects_to_register.push(record_type); 475 objects_to_register.push(edge_type); 476 objects_to_register.push(connection_type); 477 where_inputs_to_register.push(where_input); 478 where_inputs_to_register.push(group_by_input); 479 where_inputs_to_register.push(sort_field_input); 480 group_by_enums_to_register.push(group_by_enum); 481 mutation_inputs_to_register.push(mutation_input); 482 483 // Add query field for this collection 484 let collection_query_name = nsid_to_query_name(nsid); 485 let db_clone = database.clone(); 486 let slice_clone = slice_uri.clone(); 487 let nsid_clone = nsid.to_string(); 488 489 let connection_type_name = format!("{}Connection", &type_name); 490 query = query.field( 491 Field::new( 492 &collection_query_name, 493 TypeRef::named_nn(&connection_type_name), 494 move |ctx| { 495 let db = db_clone.clone(); 496 let slice = slice_clone.clone(); 497 let collection = nsid_clone.clone(); 498 499 FieldFuture::new(async move { 500 // Get Relay-standard pagination arguments 501 let first: i32 = match ctx.args.get("first") { 502 Some(val) => val.i64().unwrap_or(50) as i32, 503 None => 50, 504 }; 505 506 let after: Option<&str> = match ctx.args.get("after") { 507 Some(val) => val.string().ok(), 508 None => None, 509 }; 510 511 // Parse sortBy argument 512 let sort_by: Option<Vec<crate::models::SortField>> = match ctx 513 .args 514 .get("sortBy") 515 { 516 Some(val) => { 517 if let Ok(list) = val.list() { 518 let mut sort_fields = Vec::new(); 519 for item in list.iter() { 520 if let Ok(obj) = item.object() { 521 let field = obj 522 .get("field") 523 .and_then(|v| { 524 v.enum_name().ok().map(|s| s.to_string()) 525 }) 526 .unwrap_or_else(|| "indexedAt".to_string()); 527 let direction = obj 528 .get("direction") 529 .and_then(|v| { 530 v.enum_name().ok().map(|s| s.to_string()) 531 }) 532 .unwrap_or_else(|| "desc".to_string()); 533 sort_fields.push(crate::models::SortField { 534 field, 535 direction, 536 }); 537 } 538 } 539 Some(sort_fields) 540 } else { 541 None 542 } 543 } 544 None => None, 545 }; 546 547 // Build where clause for this collection 548 let mut where_clause = crate::models::WhereClause { 549 conditions: HashMap::new(), 550 or_conditions: None, 551 and: None, 552 or: None, 553 }; 554 555 // Always filter by collection 556 where_clause.conditions.insert( 557 "collection".to_string(), 558 crate::models::WhereCondition { 559 gt: None, 560 gte: None, 561 lt: None, 562 lte: None, 563 eq: Some(serde_json::Value::String(collection.clone())), 564 in_values: None, 565 contains: None, 566 fuzzy: None, 567 }, 568 ); 569 570 // Parse where argument if provided 571 if let Some(where_val) = ctx.args.get("where") { 572 if let Ok(where_obj) = where_val.object() { 573 let parsed_where = parse_where_clause(where_obj); 574 // Merge parsed conditions with existing collection filter 575 where_clause.conditions.extend(parsed_where.conditions); 576 where_clause.or_conditions = parsed_where.or_conditions; 577 where_clause.and = parsed_where.and; 578 where_clause.or = parsed_where.or; 579 } 580 } 581 582 // Resolve actorHandle to did if present 583 if let Some(actor_handle_condition) = 584 where_clause.conditions.remove("actorHandle") 585 { 586 // Handle different filter types 587 let dids = if let Some(pattern) = &actor_handle_condition.contains { 588 // Pattern matching (contains) 589 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() 590 } else if let Some(pattern) = &actor_handle_condition.fuzzy { 591 // Fuzzy matching 592 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() 593 } else { 594 // Exact matching (eq / in_values) 595 let mut handles = Vec::new(); 596 if let Some(eq_value) = &actor_handle_condition.eq { 597 if let Some(handle_str) = eq_value.as_str() { 598 handles.push(handle_str.to_string()); 599 } 600 } 601 if let Some(in_values) = &actor_handle_condition.in_values { 602 for value in in_values { 603 if let Some(handle_str) = value.as_str() { 604 handles.push(handle_str.to_string()); 605 } 606 } 607 } 608 609 if !handles.is_empty() { 610 db.resolve_handles_to_dids(&handles, &slice).await.ok() 611 } else { 612 None 613 } 614 }; 615 616 // Replace actorHandle condition with did condition if we found matches 617 if let Some(dids) = dids { 618 if !dids.is_empty() { 619 let did_condition = if dids.len() == 1 { 620 crate::models::WhereCondition { 621 gt: None, 622 gte: None, 623 lt: None, 624 lte: None, 625 eq: Some(serde_json::Value::String( 626 dids[0].clone(), 627 )), 628 in_values: None, 629 contains: None, 630 fuzzy: None, 631 } 632 } else { 633 crate::models::WhereCondition { 634 gt: None, 635 gte: None, 636 lt: None, 637 lte: None, 638 eq: None, 639 in_values: Some( 640 dids.into_iter() 641 .map(|d| { 642 serde_json::Value::String(d) 643 }) 644 .collect(), 645 ), 646 contains: None, 647 fuzzy: None, 648 } 649 }; 650 where_clause 651 .conditions 652 .insert("did".to_string(), did_condition); 653 } 654 // If no DIDs found, the query will return 0 results naturally 655 } 656 } 657 658 // Resolve actorHandle to did in OR clauses 659 if let Some(or_clauses) = &mut where_clause.or { 660 for or_clause in or_clauses.iter_mut() { 661 if let Some(actor_handle_condition) = 662 or_clause.conditions.remove("actorHandle") 663 { 664 // Handle different filter types 665 let dids = if let Some(pattern) = &actor_handle_condition.contains { 666 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() 667 } else if let Some(pattern) = &actor_handle_condition.fuzzy { 668 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() 669 } else { 670 let mut handles = Vec::new(); 671 if let Some(eq_value) = &actor_handle_condition.eq { 672 if let Some(handle_str) = eq_value.as_str() { 673 handles.push(handle_str.to_string()); 674 } 675 } 676 if let Some(in_values) = &actor_handle_condition.in_values { 677 for value in in_values { 678 if let Some(handle_str) = value.as_str() { 679 handles.push(handle_str.to_string()); 680 } 681 } 682 } 683 if !handles.is_empty() { 684 db.resolve_handles_to_dids(&handles, &slice).await.ok() 685 } else { 686 None 687 } 688 }; 689 690 // Replace actorHandle with did condition 691 if let Some(dids) = dids { 692 if !dids.is_empty() { 693 let did_condition = if dids.len() == 1 { 694 crate::models::WhereCondition { 695 gt: None, gte: None, lt: None, lte: None, 696 eq: Some(serde_json::Value::String(dids[0].clone())), 697 in_values: None, contains: None, fuzzy: None, 698 } 699 } else { 700 crate::models::WhereCondition { 701 gt: None, gte: None, lt: None, lte: None, eq: None, 702 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()), 703 contains: None, fuzzy: None, 704 } 705 }; 706 or_clause.conditions.insert("did".to_string(), did_condition); 707 } 708 } 709 } 710 } 711 } 712 713 // Query database for records 714 let (records, next_cursor) = db 715 .get_slice_collections_records( 716 &slice, 717 Some(first), 718 after, 719 sort_by.as_ref(), 720 Some(&where_clause), 721 ) 722 .await 723 .map_err(|e| Error::new(format!("Database query failed: {}", e)))?; 724 725 // Query database for total count 726 let total_count = db 727 .count_slice_collections_records(&slice, Some(&where_clause)) 728 .await 729 .map_err(|e| Error::new(format!("Count query failed: {}", e)))? 730 as i32; 731 732 // Convert records to RecordContainers 733 let record_containers: Vec<RecordContainer> = records 734 .into_iter() 735 .map(|record| { 736 // Convert Record to IndexedRecord 737 let indexed_record = crate::models::IndexedRecord { 738 uri: record.uri, 739 cid: record.cid, 740 did: record.did, 741 collection: record.collection, 742 value: record.json, 743 indexed_at: record.indexed_at.to_rfc3339(), 744 }; 745 RecordContainer { 746 record: indexed_record, 747 } 748 }) 749 .collect(); 750 751 // Build Connection data 752 let connection_data = ConnectionData { 753 total_count, 754 has_next_page: next_cursor.is_some(), 755 end_cursor: next_cursor, 756 nodes: record_containers, 757 }; 758 759 Ok(Some(FieldValue::owned_any(connection_data))) 760 }) 761 }, 762 ) 763 .argument(async_graphql::dynamic::InputValue::new( 764 "first", 765 TypeRef::named(TypeRef::INT), 766 )) 767 .argument(async_graphql::dynamic::InputValue::new( 768 "after", 769 TypeRef::named(TypeRef::STRING), 770 )) 771 .argument(async_graphql::dynamic::InputValue::new( 772 "last", 773 TypeRef::named(TypeRef::INT), 774 )) 775 .argument(async_graphql::dynamic::InputValue::new( 776 "before", 777 TypeRef::named(TypeRef::STRING), 778 )) 779 .argument(async_graphql::dynamic::InputValue::new( 780 "sortBy", 781 TypeRef::named_list(format!("{}SortFieldInput", type_name)), 782 )) 783 .argument(async_graphql::dynamic::InputValue::new( 784 "where", 785 TypeRef::named(format!("{}WhereInput", type_name)), 786 )) 787 .description(format!("Query {} records", nsid)), 788 ); 789 790 // Add aggregated query field for this collection 791 let aggregated_query_name = format!("{}Aggregated", collection_query_name); 792 let aggregated_type_name = format!("{}Aggregated", &type_name); 793 794 // Create aggregated type 795 let aggregated_type = create_aggregated_type(&aggregated_type_name, &fields); 796 objects_to_register.push(aggregated_type); 797 798 let db_clone_agg = database.clone(); 799 let slice_clone_agg = slice_uri.clone(); 800 let nsid_clone_agg = nsid.to_string(); 801 802 query = query.field( 803 Field::new( 804 &aggregated_query_name, 805 TypeRef::named_nn_list_nn(&aggregated_type_name), 806 move |ctx| { 807 let db = db_clone_agg.clone(); 808 let slice = slice_clone_agg.clone(); 809 let collection = nsid_clone_agg.clone(); 810 811 FieldFuture::new(async move { 812 // Parse groupBy argument 813 let group_by_fields: Vec<crate::models::GroupByField> = match ctx.args.get("groupBy") { 814 Some(val) => { 815 if let Ok(list) = val.list() { 816 let mut fields = Vec::new(); 817 for item in list.iter() { 818 if let Ok(obj) = item.object() { 819 // Get field name from enum 820 let field_name = obj.get("field") 821 .and_then(|v| v.enum_name().ok().map(|s| s.to_string())) 822 .ok_or_else(|| Error::new("Missing field name in groupBy"))?; 823 824 // Get optional interval 825 if let Some(interval_val) = obj.get("interval") { 826 if let Ok(interval_str) = interval_val.enum_name() { 827 // Parse interval string to DateInterval 828 let interval = match interval_str { 829 "second" => crate::models::DateInterval::Second, 830 "minute" => crate::models::DateInterval::Minute, 831 "hour" => crate::models::DateInterval::Hour, 832 "day" => crate::models::DateInterval::Day, 833 "week" => crate::models::DateInterval::Week, 834 "month" => crate::models::DateInterval::Month, 835 "quarter" => crate::models::DateInterval::Quarter, 836 "year" => crate::models::DateInterval::Year, 837 _ => return Err(Error::new(format!("Invalid interval: {}", interval_str))), 838 }; 839 fields.push(crate::models::GroupByField::Truncated { 840 field: field_name, 841 interval, 842 }); 843 } else { 844 return Err(Error::new("Invalid interval value")); 845 } 846 } else { 847 // No interval, simple field 848 fields.push(crate::models::GroupByField::Simple(field_name)); 849 } 850 } else { 851 return Err(Error::new("Invalid groupBy item")); 852 } 853 } 854 fields 855 } else { 856 Vec::new() 857 } 858 } 859 None => Vec::new(), 860 }; 861 862 if group_by_fields.is_empty() { 863 return Err(Error::new("groupBy is required for aggregated queries")); 864 } 865 866 // Parse limit argument 867 let limit: i32 = match ctx.args.get("limit") { 868 Some(val) => val.i64().unwrap_or(50) as i32, 869 None => 50, 870 }; 871 872 // Parse orderBy argument 873 let order_by_count: Option<String> = match ctx.args.get("orderBy") { 874 Some(val) => { 875 if let Ok(obj) = val.object() { 876 obj.get("count") 877 .and_then(|v| v.string().ok()) 878 .map(|s| s.to_string()) 879 } else { 880 None 881 } 882 } 883 None => None, 884 }; 885 886 // Build where clause for this collection 887 let mut where_clause = crate::models::WhereClause { 888 conditions: HashMap::new(), 889 or_conditions: None, 890 and: None, 891 or: None, 892 }; 893 894 // Always filter by collection 895 where_clause.conditions.insert( 896 "collection".to_string(), 897 crate::models::WhereCondition { 898 gt: None, 899 gte: None, 900 lt: None, 901 lte: None, 902 eq: Some(serde_json::Value::String(collection.clone())), 903 in_values: None, 904 contains: None, 905 fuzzy: None, 906 }, 907 ); 908 909 // Parse where argument if provided 910 if let Some(where_val) = ctx.args.get("where") { 911 if let Ok(where_obj) = where_val.object() { 912 let parsed_where = parse_where_clause(where_obj); 913 // Merge parsed conditions with existing collection filter 914 where_clause.conditions.extend(parsed_where.conditions); 915 where_clause.or_conditions = parsed_where.or_conditions; 916 where_clause.and = parsed_where.and; 917 where_clause.or = parsed_where.or; 918 } 919 } 920 921 // Resolve actorHandle to did if present 922 if let Some(actor_handle_condition) = where_clause.conditions.remove("actorHandle") { 923 // Handle different filter types 924 let dids = if let Some(pattern) = &actor_handle_condition.contains { 925 // Pattern matching (contains) 926 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() 927 } else if let Some(pattern) = &actor_handle_condition.fuzzy { 928 // Fuzzy matching 929 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() 930 } else { 931 // Exact matching (eq / in_values) 932 let mut handles = Vec::new(); 933 if let Some(eq_value) = &actor_handle_condition.eq { 934 if let Some(handle_str) = eq_value.as_str() { 935 handles.push(handle_str.to_string()); 936 } 937 } 938 if let Some(in_values) = &actor_handle_condition.in_values { 939 for value in in_values { 940 if let Some(handle_str) = value.as_str() { 941 handles.push(handle_str.to_string()); 942 } 943 } 944 } 945 946 if !handles.is_empty() { 947 db.resolve_handles_to_dids(&handles, &slice).await.ok() 948 } else { 949 None 950 } 951 }; 952 953 // Replace actorHandle condition with did condition if we found matches 954 if let Some(dids) = dids { 955 if !dids.is_empty() { 956 let did_condition = if dids.len() == 1 { 957 crate::models::WhereCondition { 958 gt: None, 959 gte: None, 960 lt: None, 961 lte: None, 962 eq: Some(serde_json::Value::String(dids[0].clone())), 963 in_values: None, 964 contains: None, 965 fuzzy: None, 966 } 967 } else { 968 crate::models::WhereCondition { 969 gt: None, 970 gte: None, 971 lt: None, 972 lte: None, 973 eq: None, 974 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()), 975 contains: None, 976 fuzzy: None, 977 } 978 }; 979 where_clause.conditions.insert("did".to_string(), did_condition); 980 } 981 // If no DIDs found, the query will return 0 results naturally 982 } 983 } 984 985 // Resolve actorHandle to did in OR clauses 986 if let Some(or_clauses) = &mut where_clause.or { 987 for or_clause in or_clauses.iter_mut() { 988 if let Some(actor_handle_condition) = 989 or_clause.conditions.remove("actorHandle") 990 { 991 // Handle different filter types 992 let dids = if let Some(pattern) = &actor_handle_condition.contains { 993 db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() 994 } else if let Some(pattern) = &actor_handle_condition.fuzzy { 995 db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() 996 } else { 997 let mut handles = Vec::new(); 998 if let Some(eq_value) = &actor_handle_condition.eq { 999 if let Some(handle_str) = eq_value.as_str() { 1000 handles.push(handle_str.to_string()); 1001 } 1002 } 1003 if let Some(in_values) = &actor_handle_condition.in_values { 1004 for value in in_values { 1005 if let Some(handle_str) = value.as_str() { 1006 handles.push(handle_str.to_string()); 1007 } 1008 } 1009 } 1010 if !handles.is_empty() { 1011 db.resolve_handles_to_dids(&handles, &slice).await.ok() 1012 } else { 1013 None 1014 } 1015 }; 1016 1017 // Replace actorHandle with did condition 1018 if let Some(dids) = dids { 1019 if !dids.is_empty() { 1020 let did_condition = if dids.len() == 1 { 1021 crate::models::WhereCondition { 1022 gt: None, gte: None, lt: None, lte: None, 1023 eq: Some(serde_json::Value::String(dids[0].clone())), 1024 in_values: None, contains: None, fuzzy: None, 1025 } 1026 } else { 1027 crate::models::WhereCondition { 1028 gt: None, gte: None, lt: None, lte: None, eq: None, 1029 in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()), 1030 contains: None, fuzzy: None, 1031 } 1032 }; 1033 or_clause.conditions.insert("did".to_string(), did_condition); 1034 } 1035 } 1036 } 1037 } 1038 } 1039 1040 // Query database for aggregated records 1041 let results = db 1042 .get_aggregated_records( 1043 &slice, 1044 &group_by_fields, 1045 Some(&where_clause), 1046 order_by_count.as_deref(), 1047 Some(limit), 1048 ) 1049 .await 1050 .map_err(|e| { 1051 Error::new(format!("Aggregation query failed: {}", e)) 1052 })?; 1053 1054 // Convert JSON values to GraphQL values 1055 let field_values: Vec<FieldValue<'_>> = results 1056 .into_iter() 1057 .map(|json_val| FieldValue::owned_any(json_val)) 1058 .collect(); 1059 1060 Ok(Some(FieldValue::list(field_values))) 1061 }) 1062 }, 1063 ) 1064 .argument(async_graphql::dynamic::InputValue::new( 1065 "groupBy", 1066 TypeRef::named_nn_list(format!("{}GroupByFieldInput", type_name)), 1067 )) 1068 .argument(async_graphql::dynamic::InputValue::new( 1069 "where", 1070 TypeRef::named(format!("{}WhereInput", type_name)), 1071 )) 1072 .argument(async_graphql::dynamic::InputValue::new( 1073 "orderBy", 1074 TypeRef::named("AggregationOrderBy"), 1075 )) 1076 .argument(async_graphql::dynamic::InputValue::new( 1077 "limit", 1078 TypeRef::named(TypeRef::INT), 1079 )) 1080 .description(format!("Aggregated query for {} records with GROUP BY support", nsid)), 1081 ); 1082 } 1083 } 1084 1085 // Add jetstreamLogs query to root query type 1086 query = add_jetstream_logs_query(query, database.clone(), slice_uri.clone()); 1087 1088 // Add sync job queries 1089 query = add_sync_job_query(query); 1090 query = add_sync_jobs_query(query, slice_uri.clone()); 1091 query = add_sync_job_logs_query(query); 1092 query = add_get_sync_summary_query(query); 1093 1094 // Add sparklines query 1095 query = add_sparklines_query(query); 1096 1097 // Add slice records query 1098 query = add_slice_records_query(query, database.clone()); 1099 1100 // Add OAuth clients query 1101 query = add_oauth_clients_query(query, slice_uri.clone(), auth_base_url.clone()); 1102 1103 // Build Mutation type 1104 let mut mutation = create_mutation_type(database.clone(), slice_uri.clone(), &lexicons); 1105 1106 // Add startSync mutation 1107 mutation = add_start_sync_mutation(mutation, slice_uri.clone()); 1108 1109 // Add cancelJob mutation 1110 mutation = add_cancel_job_mutation(mutation); 1111 1112 // Add deleteJob mutation 1113 mutation = add_delete_job_mutation(mutation); 1114 1115 // Add uploadBlob mutation 1116 mutation = add_upload_blob_mutation(mutation, auth_base_url.clone()); 1117 1118 // Add OAuth client mutations 1119 mutation = add_create_oauth_client_mutation(mutation, auth_base_url.clone()); 1120 mutation = add_update_oauth_client_mutation(mutation, auth_base_url.clone()); 1121 mutation = add_delete_oauth_client_mutation(mutation, auth_base_url.clone()); 1122 1123 // Add deleteSliceRecords mutation 1124 mutation = add_delete_slice_records_mutation(mutation, slice_uri.clone()); 1125 1126 // Build Subscription type with collection-specific subscriptions 1127 let mut subscription = create_subscription_type(slice_uri.clone(), &lexicons); 1128 1129 // Add jetstreamLogsCreated subscription 1130 subscription = add_jetstream_logs_subscription(subscription, slice_uri.clone()); 1131 1132 // Add syncJobUpdated subscription 1133 subscription = add_sync_job_subscription(subscription); 1134 1135 // Build and return the schema with complexity limits 1136 let mut schema_builder = Schema::build( 1137 query.type_name(), 1138 Some(mutation.type_name()), 1139 Some(subscription.type_name()), 1140 ) 1141 .register(query) 1142 .register(mutation) 1143 .register(subscription) 1144 .limit_depth(50) // Higher limit to support GraphiQL introspection with reverse joins 1145 .limit_complexity(5000); // Prevent expensive deeply nested queries 1146 1147 // Register JSON scalar type for complex fields 1148 let json_scalar = Scalar::new("JSON"); 1149 schema_builder = schema_builder.register(json_scalar); 1150 1151 // Register filter input types for WHERE clauses 1152 let string_filter = InputObject::new("StringFilter") 1153 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))) 1154 .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING))) 1155 .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING))) 1156 .field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING))) 1157 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING))) 1158 .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING))) 1159 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING))) 1160 .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING))); 1161 schema_builder = schema_builder.register(string_filter); 1162 1163 let int_filter = InputObject::new("IntFilter") 1164 .field(InputValue::new("eq", TypeRef::named(TypeRef::INT))) 1165 .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT))) 1166 .field(InputValue::new("gt", TypeRef::named(TypeRef::INT))) 1167 .field(InputValue::new("gte", TypeRef::named(TypeRef::INT))) 1168 .field(InputValue::new("lt", TypeRef::named(TypeRef::INT))) 1169 .field(InputValue::new("lte", TypeRef::named(TypeRef::INT))); 1170 schema_builder = schema_builder.register(int_filter); 1171 1172 let datetime_filter = InputObject::new("DateTimeFilter") 1173 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))) 1174 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING))) 1175 .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING))) 1176 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING))) 1177 .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING))); 1178 schema_builder = schema_builder.register(datetime_filter); 1179 1180 // Register Blob type 1181 let blob_type = create_blob_type(); 1182 schema_builder = schema_builder.register(blob_type); 1183 1184 // Register SyncResult type for mutations 1185 let sync_result_type = create_sync_result_type(); 1186 schema_builder = schema_builder.register(sync_result_type); 1187 1188 // Register BlobUploadResponse type for blob upload mutation 1189 let blob_upload_response_type = create_blob_upload_response_type(); 1190 schema_builder = schema_builder.register(blob_upload_response_type); 1191 1192 // Register OAuthClient type for OAuth operations 1193 let oauth_client_type = create_oauth_client_type(); 1194 schema_builder = schema_builder.register(oauth_client_type); 1195 1196 // Register MutationResponse type for dynamic mutations 1197 let mutation_response_type = create_mutation_response_type(); 1198 schema_builder = schema_builder.register(mutation_response_type); 1199 1200 // Register SortDirection enum 1201 let sort_direction_enum = create_sort_direction_enum(); 1202 schema_builder = schema_builder.register(sort_direction_enum); 1203 1204 // Register SortField input type 1205 let sort_field_input = create_sort_field_input(); 1206 schema_builder = schema_builder.register(sort_field_input); 1207 1208 // Register condition input types for where clauses 1209 let string_condition_input = create_string_condition_input(); 1210 schema_builder = schema_builder.register(string_condition_input); 1211 1212 let int_condition_input = create_int_condition_input(); 1213 schema_builder = schema_builder.register(int_condition_input); 1214 1215 // Register AggregationOrderBy input type 1216 let aggregation_order_by_input = create_aggregation_order_by_input(); 1217 schema_builder = schema_builder.register(aggregation_order_by_input); 1218 1219 // Register DateInterval enum for date truncation 1220 let date_interval_enum = create_date_interval_enum(); 1221 schema_builder = schema_builder.register(date_interval_enum); 1222 1223 // Register PageInfo type 1224 let page_info_type = create_page_info_type(); 1225 schema_builder = schema_builder.register(page_info_type); 1226 1227 // Register RecordUpdate type for subscriptions 1228 let record_update_type = create_record_update_type(); 1229 schema_builder = schema_builder.register(record_update_type); 1230 1231 // Register JetstreamLogEntry type 1232 let jetstream_log_entry_type = create_jetstream_log_entry_type(); 1233 schema_builder = schema_builder.register(jetstream_log_entry_type); 1234 1235 // Register SyncJob and SyncJobResult types 1236 let sync_job_type = create_sync_job_type(); 1237 schema_builder = schema_builder.register(sync_job_type); 1238 1239 let sync_job_result_type = create_sync_job_result_type(); 1240 schema_builder = schema_builder.register(sync_job_result_type); 1241 1242 // Register StartSyncOutput type (created via schema_ext) 1243 let start_sync_output = create_start_sync_output_type(); 1244 schema_builder = schema_builder.register(start_sync_output); 1245 1246 // Register SyncSummary and CollectionSummary types 1247 let sync_summary_type = create_sync_summary_type(); 1248 schema_builder = schema_builder.register(sync_summary_type); 1249 1250 let collection_summary_type = create_collection_summary_type(); 1251 schema_builder = schema_builder.register(collection_summary_type); 1252 1253 // Register Sparklines types 1254 let sparkline_point_type = create_sparkline_point_type(); 1255 schema_builder = schema_builder.register(sparkline_point_type); 1256 1257 let slice_sparkline_type = create_slice_sparkline_type(); 1258 schema_builder = schema_builder.register(slice_sparkline_type); 1259 1260 // Register Stats types 1261 let collection_stats_type = create_collection_stats_type(); 1262 schema_builder = schema_builder.register(collection_stats_type); 1263 1264 let slice_stats_type = create_slice_stats_type(); 1265 schema_builder = schema_builder.register(slice_stats_type); 1266 1267 // Register Records types 1268 let slice_record_type = create_slice_record_type(); 1269 schema_builder = schema_builder.register(slice_record_type); 1270 1271 let slice_record_edge_type = create_slice_record_edge_type(); 1272 schema_builder = schema_builder.register(slice_record_edge_type); 1273 1274 let slice_records_connection_type = create_slice_records_connection_type(); 1275 schema_builder = schema_builder.register(slice_records_connection_type); 1276 1277 let slice_records_where_input = create_slice_records_where_input(); 1278 schema_builder = schema_builder.register(slice_records_where_input); 1279 1280 // Register DeleteSliceRecordsOutput type 1281 let delete_slice_records_output_type = create_delete_slice_records_output_type(); 1282 schema_builder = schema_builder.register(delete_slice_records_output_type); 1283 1284 // Register all object types 1285 for obj in objects_to_register { 1286 schema_builder = schema_builder.register(obj); 1287 } 1288 1289 // Register all WhereInput types 1290 for where_input in where_inputs_to_register { 1291 schema_builder = schema_builder.register(where_input); 1292 } 1293 1294 // Register all GroupByField enums 1295 for group_by_enum in group_by_enums_to_register { 1296 schema_builder = schema_builder.register(group_by_enum); 1297 } 1298 1299 // Register all mutation input types 1300 for mutation_input in mutation_inputs_to_register { 1301 schema_builder = schema_builder.register(mutation_input); 1302 } 1303 1304 // Register all nested object types from the type registry 1305 for (_, nested_type) in type_registry { 1306 schema_builder = schema_builder.register(nested_type); 1307 } 1308 1309 schema_builder 1310 .finish() 1311 .map_err(|e| format!("Schema build error: {:?}", e)) 1312} 1313 1314/// Container to hold record data for resolvers 1315#[derive(Clone)] 1316pub struct RecordContainer { 1317 pub record: crate::models::IndexedRecord, 1318} 1319 1320/// Container to hold blob data and DID for URL generation 1321#[derive(Clone)] 1322pub struct BlobContainer { 1323 pub blob_ref: String, // CID reference 1324 pub mime_type: String, // MIME type 1325 pub size: i64, // Size in bytes 1326 pub did: String, // DID for CDN URL generation 1327} 1328 1329/// Creates a GraphQL Object type for a record collection 1330fn create_record_type( 1331 type_name: &str, 1332 fields: &[GraphQLField], 1333 database: Database, 1334 slice_uri: String, 1335 all_collections: &[CollectionMeta], 1336 auth_base_url: String, 1337 type_registry: &mut TypeRegistry, 1338 all_lexicons: &[serde_json::Value], 1339 lexicon_nsid: &str, 1340) -> Object { 1341 let mut object = Object::new(type_name); 1342 1343 // Check which field names exist in lexicon to avoid conflicts 1344 let lexicon_field_names: std::collections::HashSet<&str> = 1345 fields.iter().map(|f| f.name.as_str()).collect(); 1346 1347 // Add global ID field for Relay (base64 encoded typename:uri) 1348 // Only add if lexicon doesn't already have an "id" field 1349 if !lexicon_field_names.contains("id") { 1350 let type_name_for_id = type_name.to_string(); 1351 object = object.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), move |ctx| { 1352 let typename = type_name_for_id.clone(); 1353 FieldFuture::new(async move { 1354 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1355 let global_id = format!("{}:{}", typename, container.record.uri); 1356 let encoded_id = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, global_id.as_bytes()); 1357 Ok(Some(GraphQLValue::from(encoded_id))) 1358 }) 1359 })); 1360 } 1361 1362 // Add standard AT Protocol fields only if they don't conflict with lexicon fields 1363 if !lexicon_field_names.contains("uri") { 1364 object = object.field(Field::new( 1365 "uri", 1366 TypeRef::named_nn(TypeRef::STRING), 1367 |ctx| { 1368 FieldFuture::new(async move { 1369 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1370 Ok(Some(GraphQLValue::from(container.record.uri.clone()))) 1371 }) 1372 }, 1373 )); 1374 } 1375 1376 if !lexicon_field_names.contains("cid") { 1377 object = object.field(Field::new( 1378 "cid", 1379 TypeRef::named_nn(TypeRef::STRING), 1380 |ctx| { 1381 FieldFuture::new(async move { 1382 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1383 Ok(Some(GraphQLValue::from(container.record.cid.clone()))) 1384 }) 1385 }, 1386 )); 1387 } 1388 1389 if !lexicon_field_names.contains("did") { 1390 object = object.field(Field::new( 1391 "did", 1392 TypeRef::named_nn(TypeRef::STRING), 1393 |ctx| { 1394 FieldFuture::new(async move { 1395 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1396 Ok(Some(GraphQLValue::from(container.record.did.clone()))) 1397 }) 1398 }, 1399 )); 1400 } 1401 1402 if !lexicon_field_names.contains("indexedAt") { 1403 object = object.field(Field::new( 1404 "indexedAt", 1405 TypeRef::named_nn(TypeRef::STRING), 1406 |ctx| { 1407 FieldFuture::new(async move { 1408 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1409 Ok(Some(GraphQLValue::from( 1410 container.record.indexed_at.clone(), 1411 ))) 1412 }) 1413 }, 1414 )); 1415 } 1416 1417 // Add actor metadata field (handle from actors table) 1418 // Always add as "actorHandle" to avoid conflicts with lexicon fields 1419 let db_for_actor = database.clone(); 1420 let slice_for_actor = slice_uri.clone(); 1421 object = object.field(Field::new( 1422 "actorHandle", 1423 TypeRef::named(TypeRef::STRING), 1424 move |ctx| { 1425 let db = db_for_actor.clone(); 1426 let slice = slice_for_actor.clone(); 1427 FieldFuture::new(async move { 1428 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1429 let did = &container.record.did; 1430 1431 // Build where clause to find actor by DID 1432 let mut where_clause = crate::models::WhereClause { 1433 conditions: std::collections::HashMap::new(), 1434 or_conditions: None, 1435 and: None, 1436 or: None, 1437 }; 1438 where_clause.conditions.insert( 1439 "did".to_string(), 1440 crate::models::WhereCondition { 1441 gt: None, 1442 gte: None, 1443 lt: None, 1444 lte: None, 1445 eq: Some(serde_json::Value::String(did.clone())), 1446 in_values: None, 1447 contains: None, 1448 fuzzy: None, 1449 }, 1450 ); 1451 1452 match db 1453 .get_slice_actors(&slice, Some(1), None, Some(&where_clause)) 1454 .await 1455 { 1456 Ok((actors, _cursor)) => { 1457 if let Some(actor) = actors.first() { 1458 if let Some(handle) = &actor.handle { 1459 Ok(Some(GraphQLValue::from(handle.clone()))) 1460 } else { 1461 Ok(None) 1462 } 1463 } else { 1464 Ok(None) 1465 } 1466 } 1467 Err(e) => { 1468 tracing::debug!("Actor not found for {}: {}", did, e); 1469 Ok(None) 1470 } 1471 } 1472 }) 1473 }, 1474 )); 1475 1476 // Add fields from lexicon 1477 for field in fields { 1478 let field_name = field.name.clone(); 1479 let field_name_for_field = field_name.clone(); // Need separate clone for Field::new 1480 let field_type = field.field_type.clone(); 1481 let db_clone = database.clone(); 1482 1483 // Determine type ref - handle nested objects and lexicon refs specially 1484 let type_ref = match &field.field_type { 1485 GraphQLType::LexiconRef(ref_nsid) => { 1486 // Resolve lexicon ref and generate type for it 1487 let resolved_type_name = resolve_lexicon_ref_type( 1488 ref_nsid, 1489 lexicon_nsid, 1490 all_lexicons, 1491 type_registry, 1492 &database, 1493 ); 1494 1495 if field.is_required { 1496 TypeRef::named_nn(resolved_type_name) 1497 } else { 1498 TypeRef::named(resolved_type_name) 1499 } 1500 } 1501 GraphQLType::Object(nested_fields) => { 1502 // Generate nested object type 1503 let nested_type_name = generate_nested_type_name(type_name, &field_name); 1504 let actual_type_name = generate_nested_object_type( 1505 &nested_type_name, 1506 nested_fields, 1507 type_registry, 1508 &database, 1509 ); 1510 1511 if field.is_required { 1512 TypeRef::named_nn(actual_type_name) 1513 } else { 1514 TypeRef::named(actual_type_name) 1515 } 1516 } 1517 GraphQLType::Array(inner) => { 1518 match inner.as_ref() { 1519 GraphQLType::LexiconRef(ref_nsid) => { 1520 // Resolve lexicon ref for array items 1521 let resolved_type_name = resolve_lexicon_ref_type( 1522 ref_nsid, 1523 lexicon_nsid, 1524 all_lexicons, 1525 type_registry, 1526 &database, 1527 ); 1528 1529 if field.is_required { 1530 TypeRef::named_nn_list(resolved_type_name) 1531 } else { 1532 TypeRef::named_list(resolved_type_name) 1533 } 1534 } 1535 GraphQLType::Object(nested_fields) => { 1536 // Generate nested object type for array items 1537 let nested_type_name = generate_nested_type_name(type_name, &field_name); 1538 let actual_type_name = generate_nested_object_type( 1539 &nested_type_name, 1540 nested_fields, 1541 type_registry, 1542 &database, 1543 ); 1544 1545 if field.is_required { 1546 TypeRef::named_nn_list(actual_type_name) 1547 } else { 1548 TypeRef::named_list(actual_type_name) 1549 } 1550 } 1551 _ => graphql_type_to_typeref(&field.field_type, field.is_required), 1552 } 1553 } 1554 _ => graphql_type_to_typeref(&field.field_type, field.is_required), 1555 }; 1556 1557 object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| { 1558 let field_name = field_name.clone(); 1559 let field_type = field_type.clone(); 1560 let db = db_clone.clone(); 1561 1562 FieldFuture::new(async move { 1563 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1564 let value = container.record.value.get(&field_name); 1565 1566 if let Some(val) = value { 1567 // Check for explicit null value 1568 if val.is_null() { 1569 return Ok(None); 1570 } 1571 1572 // Check if this is an array of blobs 1573 if let GraphQLType::Array(inner) = &field_type { 1574 if matches!(inner.as_ref(), GraphQLType::Blob) { 1575 if let Some(arr) = val.as_array() { 1576 let blob_containers: Vec<FieldValue> = arr 1577 .iter() 1578 .filter_map(|blob_val| { 1579 let obj = blob_val.as_object()?; 1580 let blob_ref = obj 1581 .get("ref") 1582 .and_then(|r| r.as_object()) 1583 .and_then(|r| r.get("$link")) 1584 .and_then(|l| l.as_str()) 1585 .unwrap_or("") 1586 .to_string(); 1587 1588 let mime_type = obj 1589 .get("mimeType") 1590 .and_then(|m| m.as_str()) 1591 .unwrap_or("image/jpeg") 1592 .to_string(); 1593 1594 let size = 1595 obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0); 1596 1597 let blob_container = BlobContainer { 1598 blob_ref, 1599 mime_type, 1600 size, 1601 did: container.record.did.clone(), 1602 }; 1603 1604 Some(FieldValue::owned_any(blob_container)) 1605 }) 1606 .collect(); 1607 1608 return Ok(Some(FieldValue::list(blob_containers))); 1609 } 1610 1611 // If not a proper array, return empty list 1612 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))); 1613 } 1614 } 1615 1616 // Check if this is a blob field 1617 if matches!(field_type, GraphQLType::Blob) { 1618 // Extract blob fields from JSON object 1619 if let Some(obj) = val.as_object() { 1620 let blob_ref = obj 1621 .get("ref") 1622 .and_then(|r| r.as_object()) 1623 .and_then(|r| r.get("$link")) 1624 .and_then(|l| l.as_str()) 1625 .unwrap_or("") 1626 .to_string(); 1627 1628 let mime_type = obj 1629 .get("mimeType") 1630 .and_then(|m| m.as_str()) 1631 .unwrap_or("image/jpeg") 1632 .to_string(); 1633 1634 let size = obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0); 1635 1636 let blob_container = BlobContainer { 1637 blob_ref, 1638 mime_type, 1639 size, 1640 did: container.record.did.clone(), 1641 }; 1642 1643 return Ok(Some(FieldValue::owned_any(blob_container))); 1644 } 1645 1646 // If not a proper blob object, return None (field is null) 1647 return Ok(None); 1648 } 1649 1650 // Check if this is a reference field that needs joining 1651 if matches!(field_type, GraphQLType::Ref) { 1652 // Extract URI from strongRef and fetch the linked record 1653 if let Some(uri) = 1654 crate::graphql::dataloaders::extract_uri_from_strong_ref(val) 1655 { 1656 match db.get_record(&uri).await { 1657 Ok(Some(linked_record)) => { 1658 // Convert the linked record to a JSON value 1659 let record_json = 1660 serde_json::to_value(linked_record).map_err(|e| { 1661 Error::new(format!("Serialization error: {}", e)) 1662 })?; 1663 1664 // Convert serde_json::Value to async_graphql::Value 1665 let graphql_val = json_to_graphql_value(&record_json); 1666 return Ok(Some(FieldValue::value(graphql_val))); 1667 } 1668 Ok(None) => { 1669 return Ok(None); 1670 } 1671 Err(e) => { 1672 tracing::error!("Error fetching linked record: {}", e); 1673 return Ok(None); 1674 } 1675 } 1676 } 1677 } 1678 1679 // Check if this is a lexicon ref field 1680 if matches!(field_type, GraphQLType::LexiconRef(_)) { 1681 let nested_container = NestedObjectContainer { 1682 data: val.clone(), 1683 }; 1684 return Ok(Some(FieldValue::owned_any(nested_container))); 1685 } 1686 1687 // Check if this is a nested object field 1688 if matches!(field_type, GraphQLType::Object(_)) { 1689 let nested_container = NestedObjectContainer { 1690 data: val.clone(), 1691 }; 1692 return Ok(Some(FieldValue::owned_any(nested_container))); 1693 } 1694 1695 // Check if this is an array of nested objects or lexicon refs 1696 if let GraphQLType::Array(inner) = &field_type { 1697 if matches!(inner.as_ref(), GraphQLType::LexiconRef(_)) || matches!(inner.as_ref(), GraphQLType::Object(_)) { 1698 if let Some(arr) = val.as_array() { 1699 let containers: Vec<FieldValue> = arr 1700 .iter() 1701 .map(|item| { 1702 let nested_container = NestedObjectContainer { 1703 data: item.clone(), 1704 }; 1705 FieldValue::owned_any(nested_container) 1706 }) 1707 .collect(); 1708 return Ok(Some(FieldValue::list(containers))); 1709 } 1710 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))); 1711 } 1712 } 1713 1714 // For non-ref, non-object fields, return the raw JSON value 1715 let graphql_val = json_to_graphql_value(val); 1716 Ok(Some(FieldValue::value(graphql_val))) 1717 } else { 1718 Ok(None) 1719 } 1720 }) 1721 })); 1722 } 1723 1724 // Add join fields for cross-referencing other collections 1725 for collection in all_collections { 1726 let field_name = nsid_to_join_field_name(&collection.nsid); 1727 1728 // Skip if this would conflict with existing field 1729 if lexicon_field_names.contains(field_name.as_str()) { 1730 continue; 1731 } 1732 1733 // Skip self-referencing join fields (collection referencing itself) 1734 if collection.type_name == type_name { 1735 continue; 1736 } 1737 1738 // Collect all string fields with format "at-uri" that might reference this collection 1739 // We'll check each one at runtime to see if it contains a URI to this collection 1740 let uri_ref_fields: Vec<_> = fields 1741 .iter() 1742 .filter(|f| matches!(f.format.as_deref(), Some("at-uri"))) 1743 .collect(); 1744 1745 let collection_nsid = collection.nsid.clone(); 1746 let key_type = collection.key_type.clone(); 1747 let db_for_join = database.clone(); 1748 1749 // If we found at-uri fields, create a resolver that checks each one at runtime 1750 // But skip this for literal:self collections - they should use DID-based joins instead 1751 if !uri_ref_fields.is_empty() && key_type != "literal:self" { 1752 let ref_field_names: Vec<String> = 1753 uri_ref_fields.iter().map(|f| f.name.clone()).collect(); 1754 let db_for_uri_join = database.clone(); 1755 let target_collection = collection_nsid.clone(); 1756 1757 object = object.field(Field::new( 1758 &field_name, 1759 TypeRef::named(&collection.type_name), 1760 move |ctx| { 1761 let db = db_for_uri_join.clone(); 1762 let field_names = ref_field_names.clone(); 1763 let expected_collection = target_collection.clone(); 1764 FieldFuture::new(async move { 1765 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1766 1767 // Try each at-uri field to find one that references this collection 1768 for field_name in &field_names { 1769 if let Some(uri_value) = container.record.value.get(field_name) { 1770 if let Some(uri) = uri_value.as_str() { 1771 // Check if the URI is for the expected collection 1772 if uri.contains(&format!("/{}/", expected_collection)) { 1773 // Fetch the record at this URI 1774 match db.get_record(uri).await { 1775 Ok(Some(record)) => { 1776 let new_container = RecordContainer { record }; 1777 return Ok(Some(FieldValue::owned_any( 1778 new_container, 1779 ))); 1780 } 1781 Ok(None) => continue, // Try next field 1782 Err(_) => continue, // Try next field 1783 } 1784 } 1785 } 1786 } 1787 } 1788 // No matching URI found in any field 1789 Ok(None) 1790 }) 1791 }, 1792 )); 1793 // Skip DID-based join logic for non-literal:self collections 1794 continue; 1795 } 1796 1797 // Determine type and resolver based on key_type 1798 match key_type.as_str() { 1799 "literal:self" => { 1800 // Single record per DID - return nullable object of the collection's type 1801 let slice_for_self_join = slice_uri.clone(); 1802 object = object.field(Field::new( 1803 &field_name, 1804 TypeRef::named(&collection.type_name), 1805 move |ctx| { 1806 let db = db_for_join.clone(); 1807 let nsid = collection_nsid.clone(); 1808 let slice = slice_for_self_join.clone(); 1809 FieldFuture::new(async move { 1810 let container = 1811 ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1812 let did = container.record.did.clone(); 1813 let uri = format!("at://{}/{}/self", did, nsid); 1814 1815 // Query with slice_uri filter to ensure we get the right record 1816 let mut where_clause = crate::models::WhereClause { 1817 conditions: std::collections::HashMap::new(), 1818 or_conditions: None, 1819 and: None, 1820 or: None, 1821 }; 1822 where_clause.conditions.insert( 1823 "collection".to_string(), 1824 crate::models::WhereCondition { 1825 gt: None, 1826 gte: None, 1827 lt: None, 1828 lte: None, 1829 eq: Some(serde_json::Value::String(nsid.clone())), 1830 in_values: None, 1831 contains: None, 1832 fuzzy: None, 1833 }, 1834 ); 1835 where_clause.conditions.insert( 1836 "did".to_string(), 1837 crate::models::WhereCondition { 1838 gt: None, 1839 gte: None, 1840 lt: None, 1841 lte: None, 1842 eq: Some(serde_json::Value::String(did.clone())), 1843 in_values: None, 1844 contains: None, 1845 fuzzy: None, 1846 }, 1847 ); 1848 1849 match db.get_slice_collections_records( 1850 &slice, 1851 Some(1), 1852 None, 1853 None, 1854 Some(&where_clause), 1855 ).await { 1856 Ok((records, _cursor)) => { 1857 if let Some(record) = records.into_iter().next() { 1858 let indexed_record = crate::models::IndexedRecord { 1859 uri: record.uri, 1860 cid: record.cid, 1861 did: record.did, 1862 collection: record.collection, 1863 value: record.json, 1864 indexed_at: record.indexed_at.to_rfc3339(), 1865 }; 1866 let new_container = RecordContainer { record: indexed_record }; 1867 Ok(Some(FieldValue::owned_any(new_container))) 1868 } else { 1869 Ok(None) 1870 } 1871 } 1872 Err(e) => { 1873 tracing::debug!("Join query error for {}: {}", uri, e); 1874 Ok(None) 1875 } 1876 } 1877 }) 1878 }, 1879 )); 1880 } 1881 "tid" | "any" => { 1882 // Skip - these are handled as plural reverse joins below with URI filtering 1883 continue; 1884 1885 // Multiple records per DID - return array of the collection's type (DISABLED) 1886 /*object = object.field( 1887 Field::new( 1888 &field_name, 1889 TypeRef::named_nn_list_nn(&collection.type_name), 1890 move |ctx| { 1891 let nsid = collection_nsid.clone(); 1892 let slice = slice_for_join.clone(); 1893 let db_fallback = db_for_join.clone(); 1894 FieldFuture::new(async move { 1895 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 1896 let did = &container.record.did; 1897 1898 // Get limit from argument, default to 50 1899 let limit = ctx.args.get("limit") 1900 .and_then(|v| v.i64().ok()) 1901 .map(|i| i as usize) 1902 .unwrap_or(50) 1903 .min(100); // Cap at 100 to prevent abuse 1904 1905 // Try to get DataLoader from context 1906 if let Some(gql_ctx) = ctx.data_opt::<GraphQLContext>() { 1907 // Use DataLoader for batched loading 1908 let key = CollectionDidKey { 1909 slice_uri: slice.clone(), 1910 collection: nsid.clone(), 1911 did: did.clone(), 1912 }; 1913 1914 match gql_ctx.collection_did_loader.load_one(key).await { 1915 Ok(Some(mut records)) => { 1916 // Apply limit after loading 1917 records.truncate(limit); 1918 1919 let values: Vec<FieldValue> = records 1920 .into_iter() 1921 .map(|indexed_record| { 1922 let container = RecordContainer { 1923 record: indexed_record, 1924 }; 1925 FieldValue::owned_any(container) 1926 }) 1927 .collect(); 1928 Ok(Some(FieldValue::list(values))) 1929 } 1930 Ok(None) => { 1931 Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))) 1932 } 1933 Err(e) => { 1934 tracing::debug!("DataLoader error for {}: {:?}", nsid, e); 1935 Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))) 1936 } 1937 } 1938 } else { 1939 // Fallback to direct database query if DataLoader not available 1940 let db = db_fallback.clone(); 1941 let mut where_clause = crate::models::WhereClause { 1942 conditions: HashMap::new(), 1943 or_conditions: None, 1944 and: None, 1945 or: None, 1946 }; 1947 where_clause.conditions.insert( 1948 "collection".to_string(), 1949 crate::models::WhereCondition { 1950 gt: None, 1951 gte: None, 1952 lt: None, 1953 lte: None, 1954 eq: Some(serde_json::Value::String(nsid.clone())), 1955 in_values: None, 1956 contains: None, 1957 fuzzy: None, 1958 }, 1959 ); 1960 where_clause.conditions.insert( 1961 "did".to_string(), 1962 crate::models::WhereCondition { 1963 gt: None, 1964 gte: None, 1965 lt: None, 1966 lte: None, 1967 eq: Some(serde_json::Value::String(did.clone())), 1968 in_values: None, 1969 contains: None, 1970 fuzzy: None, 1971 }, 1972 ); 1973 1974 match db.get_slice_collections_records( 1975 &slice, 1976 Some(limit as i32), 1977 None, // cursor 1978 None, // sort 1979 Some(&where_clause), 1980 ).await { 1981 Ok((records, _cursor)) => { 1982 let values: Vec<FieldValue> = records 1983 .into_iter() 1984 .map(|record| { 1985 let indexed_record = crate::models::IndexedRecord { 1986 uri: record.uri, 1987 cid: record.cid, 1988 did: record.did, 1989 collection: record.collection, 1990 value: record.json, 1991 indexed_at: record.indexed_at.to_rfc3339(), 1992 }; 1993 let container = RecordContainer { 1994 record: indexed_record, 1995 }; 1996 FieldValue::owned_any(container) 1997 }) 1998 .collect(); 1999 Ok(Some(FieldValue::list(values))) 2000 } 2001 Err(e) => { 2002 tracing::debug!("Error querying {}: {}", nsid, e); 2003 Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))) 2004 } 2005 } 2006 } 2007 }) 2008 }, 2009 ) 2010 .argument(async_graphql::dynamic::InputValue::new( 2011 "limit", 2012 TypeRef::named(TypeRef::INT), 2013 )) 2014 );*/ 2015 } 2016 _ => { 2017 // Unknown key type, skip 2018 continue; 2019 } 2020 } 2021 } 2022 2023 // Add reverse joins: for every other collection, add a field to query records by DID 2024 // This enables bidirectional traversal (e.g., profile.plays and play.profile) 2025 for collection in all_collections { 2026 let reverse_field_name = format!("{}s", nsid_to_join_field_name(&collection.nsid)); 2027 let slice_for_reverse = slice_uri.clone(); 2028 let collection_nsid = collection.nsid.clone(); 2029 let collection_type = collection.type_name.clone(); 2030 let at_uri_fields = collection.at_uri_fields.clone(); 2031 2032 object = object.field( 2033 Field::new( 2034 &reverse_field_name, 2035 TypeRef::named_nn_list_nn(&collection_type), 2036 move |ctx| { 2037 let slice = slice_for_reverse.clone(); 2038 let nsid = collection_nsid.clone(); 2039 let ref_fields = at_uri_fields.clone(); 2040 FieldFuture::new(async move { 2041 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 2042 2043 // Get limit from argument, default to 50 2044 let limit = ctx.args.get("limit") 2045 .and_then(|v| v.i64().ok()) 2046 .map(|i| i as usize) 2047 .unwrap_or(50) 2048 .min(100); // Cap at 100 to prevent abuse 2049 2050 // Try to get DataLoader from context 2051 if let Some(gql_ctx) = ctx.data_opt::<GraphQLContext>() { 2052 let parent_uri = &container.record.uri; 2053 2054 // Try each at-uri field from the lexicon 2055 tracing::debug!( 2056 "Trying reverse join for {} with at-uri fields: {:?}", 2057 nsid, 2058 ref_fields 2059 ); 2060 2061 for ref_field in &ref_fields { 2062 let key = crate::graphql::dataloader::CollectionUriKey { 2063 slice_uri: slice.clone(), 2064 collection: nsid.clone(), 2065 parent_uri: parent_uri.clone(), 2066 reference_field: ref_field.clone(), 2067 }; 2068 2069 tracing::debug!( 2070 "Querying {} via field '{}' for URI: {}", 2071 nsid, 2072 ref_field, 2073 parent_uri 2074 ); 2075 2076 match gql_ctx.collection_uri_loader.load_one(key).await { 2077 Ok(Some(mut records)) => { 2078 if !records.is_empty() { 2079 tracing::debug!( 2080 "Found {} {} records via '{}' field for parent URI: {}", 2081 records.len(), 2082 nsid, 2083 ref_field, 2084 parent_uri 2085 ); 2086 2087 // Apply limit 2088 records.truncate(limit); 2089 2090 let values: Vec<FieldValue> = records 2091 .into_iter() 2092 .map(|indexed_record| { 2093 let container = RecordContainer { 2094 record: indexed_record, 2095 }; 2096 FieldValue::owned_any(container) 2097 }) 2098 .collect(); 2099 return Ok(Some(FieldValue::list(values))); 2100 } 2101 } 2102 Ok(None) => continue, 2103 Err(e) => { 2104 tracing::debug!("DataLoader error for {} field '{}': {:?}", nsid, ref_field, e); 2105 continue; 2106 } 2107 } 2108 } 2109 2110 // No records found via any at-uri field 2111 tracing::debug!("No {} records found for parent URI: {}", nsid, parent_uri); 2112 return Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))); 2113 } 2114 2115 // Fallback: DataLoader not available 2116 tracing::debug!("DataLoader not available for reverse join"); 2117 Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))) 2118 }) 2119 }, 2120 ) 2121 .argument(async_graphql::dynamic::InputValue::new( 2122 "limit", 2123 TypeRef::named(TypeRef::INT), 2124 )) 2125 ); 2126 2127 // Add count field for the reverse join 2128 let count_field_name = format!("{}Count", reverse_field_name); 2129 let db_for_count = database.clone(); 2130 let slice_for_count = slice_uri.clone(); 2131 let collection_for_count = collection.nsid.clone(); 2132 let at_uri_fields_for_count = collection.at_uri_fields.clone(); 2133 2134 object = object.field(Field::new( 2135 &count_field_name, 2136 TypeRef::named_nn(TypeRef::INT), 2137 move |ctx| { 2138 let slice = slice_for_count.clone(); 2139 let nsid = collection_for_count.clone(); 2140 let db = db_for_count.clone(); 2141 let ref_fields = at_uri_fields_for_count.clone(); 2142 FieldFuture::new(async move { 2143 let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 2144 let parent_uri = &container.record.uri; 2145 2146 // Build where clause to count records referencing this URI 2147 for ref_field in &ref_fields { 2148 let mut where_clause = crate::models::WhereClause { 2149 conditions: HashMap::new(), 2150 or_conditions: None, 2151 and: None, 2152 or: None, 2153 }; 2154 2155 where_clause.conditions.insert( 2156 "collection".to_string(), 2157 crate::models::WhereCondition { 2158 gt: None, 2159 gte: None, 2160 lt: None, 2161 lte: None, 2162 eq: Some(serde_json::Value::String(nsid.clone())), 2163 in_values: None, 2164 contains: None, 2165 fuzzy: None, 2166 }, 2167 ); 2168 2169 where_clause.conditions.insert( 2170 ref_field.clone(), 2171 crate::models::WhereCondition { 2172 gt: None, 2173 gte: None, 2174 lt: None, 2175 lte: None, 2176 eq: Some(serde_json::Value::String(parent_uri.clone())), 2177 in_values: None, 2178 contains: None, 2179 fuzzy: None, 2180 }, 2181 ); 2182 2183 match db 2184 .count_slice_collections_records(&slice, Some(&where_clause)) 2185 .await 2186 { 2187 Ok(count) if count > 0 => { 2188 return Ok(Some(FieldValue::value(count as i32))); 2189 } 2190 Ok(_) => continue, 2191 Err(e) => { 2192 tracing::debug!("Count error for {}: {}", nsid, e); 2193 continue; 2194 } 2195 } 2196 } 2197 2198 // No matching field found, return 0 2199 Ok(Some(FieldValue::value(0))) 2200 }) 2201 }, 2202 )); 2203 } 2204 2205 // Add sparklines, stats, and oauth clients fields for NetworkSlicesSlice type 2206 if type_name == "NetworkSlicesSlice" { 2207 object = add_sparklines_field_to_slice(object, database.clone()); 2208 object = add_stats_field_to_slice(object, database.clone()); 2209 object = add_oauth_clients_field_to_slice(object, auth_base_url); 2210 } 2211 2212 object 2213} 2214 2215/// Convert serde_json::Value to async_graphql::Value 2216fn json_to_graphql_value(val: &serde_json::Value) -> GraphQLValue { 2217 match val { 2218 serde_json::Value::Null => GraphQLValue::Null, 2219 serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b), 2220 serde_json::Value::Number(n) => { 2221 if let Some(i) = n.as_i64() { 2222 GraphQLValue::Number((i as i32).into()) 2223 } else if let Some(f) = n.as_f64() { 2224 GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into()) 2225 } else { 2226 GraphQLValue::Null 2227 } 2228 } 2229 serde_json::Value::String(s) => GraphQLValue::String(s.clone()), 2230 serde_json::Value::Array(arr) => { 2231 GraphQLValue::List(arr.iter().map(json_to_graphql_value).collect()) 2232 } 2233 serde_json::Value::Object(obj) => { 2234 let mut map = async_graphql::indexmap::IndexMap::new(); 2235 for (k, v) in obj { 2236 map.insert( 2237 async_graphql::Name::new(k.as_str()), 2238 json_to_graphql_value(v), 2239 ); 2240 } 2241 GraphQLValue::Object(map) 2242 } 2243 } 2244} 2245 2246/// Converts GraphQL type to TypeRef for async-graphql 2247fn graphql_type_to_typeref(gql_type: &GraphQLType, is_required: bool) -> TypeRef { 2248 match gql_type { 2249 GraphQLType::String => { 2250 if is_required { 2251 TypeRef::named_nn(TypeRef::STRING) 2252 } else { 2253 TypeRef::named(TypeRef::STRING) 2254 } 2255 } 2256 GraphQLType::Int => { 2257 if is_required { 2258 TypeRef::named_nn(TypeRef::INT) 2259 } else { 2260 TypeRef::named(TypeRef::INT) 2261 } 2262 } 2263 GraphQLType::Boolean => { 2264 if is_required { 2265 TypeRef::named_nn(TypeRef::BOOLEAN) 2266 } else { 2267 TypeRef::named(TypeRef::BOOLEAN) 2268 } 2269 } 2270 GraphQLType::Float => { 2271 if is_required { 2272 TypeRef::named_nn(TypeRef::FLOAT) 2273 } else { 2274 TypeRef::named(TypeRef::FLOAT) 2275 } 2276 } 2277 GraphQLType::Blob => { 2278 // Blob object type with url resolver 2279 // Always nullable since blob data might be missing or malformed 2280 TypeRef::named("Blob") 2281 } 2282 GraphQLType::Json | GraphQLType::Ref | GraphQLType::LexiconRef(_) | GraphQLType::Object(_) | GraphQLType::Union => { 2283 // JSON scalar type - linked records, lexicon refs, and complex objects return as JSON (fallback) 2284 if is_required { 2285 TypeRef::named_nn("JSON") 2286 } else { 2287 TypeRef::named("JSON") 2288 } 2289 } 2290 GraphQLType::Array(inner) => { 2291 // For arrays of primitives, use typed arrays 2292 // For arrays of complex types, use JSON scalar 2293 match inner.as_ref() { 2294 GraphQLType::String 2295 | GraphQLType::Int 2296 | GraphQLType::Boolean 2297 | GraphQLType::Float => { 2298 let inner_ref = match inner.as_ref() { 2299 GraphQLType::String => TypeRef::STRING, 2300 GraphQLType::Int => TypeRef::INT, 2301 GraphQLType::Boolean => TypeRef::BOOLEAN, 2302 GraphQLType::Float => TypeRef::FLOAT, 2303 _ => unreachable!(), 2304 }; 2305 2306 if is_required { 2307 TypeRef::named_nn_list_nn(inner_ref) 2308 } else { 2309 TypeRef::named_list(inner_ref) 2310 } 2311 } 2312 GraphQLType::Blob => { 2313 // Arrays of blobs - return list of Blob objects 2314 if is_required { 2315 TypeRef::named_nn_list("Blob") 2316 } else { 2317 TypeRef::named_list("Blob") 2318 } 2319 } 2320 _ => { 2321 // Arrays of complex types (objects, etc.) are just JSON 2322 if is_required { 2323 TypeRef::named_nn("JSON") 2324 } else { 2325 TypeRef::named("JSON") 2326 } 2327 } 2328 } 2329 } 2330 } 2331} 2332 2333/// Creates the Blob GraphQL type with url resolver 2334fn create_blob_type() -> Object { 2335 let mut blob = Object::new("Blob"); 2336 2337 // ref field - CID reference 2338 blob = blob.field(Field::new( 2339 "ref", 2340 TypeRef::named_nn(TypeRef::STRING), 2341 |ctx| { 2342 FieldFuture::new(async move { 2343 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?; 2344 Ok(Some(GraphQLValue::from(container.blob_ref.clone()))) 2345 }) 2346 }, 2347 )); 2348 2349 // mimeType field 2350 blob = blob.field(Field::new( 2351 "mimeType", 2352 TypeRef::named_nn(TypeRef::STRING), 2353 |ctx| { 2354 FieldFuture::new(async move { 2355 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?; 2356 Ok(Some(GraphQLValue::from(container.mime_type.clone()))) 2357 }) 2358 }, 2359 )); 2360 2361 // size field 2362 blob = blob.field(Field::new("size", TypeRef::named_nn(TypeRef::INT), |ctx| { 2363 FieldFuture::new(async move { 2364 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?; 2365 Ok(Some(GraphQLValue::from(container.size as i32))) 2366 }) 2367 })); 2368 2369 // url(preset) field with argument 2370 blob = blob.field( 2371 Field::new("url", TypeRef::named_nn(TypeRef::STRING), |ctx| { 2372 FieldFuture::new(async move { 2373 let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?; 2374 2375 // Get preset argument, default to "feed_fullsize" 2376 let preset: String = match ctx.args.get("preset") { 2377 Some(val) => val.string().unwrap_or("feed_fullsize").to_string(), 2378 None => "feed_fullsize".to_string(), 2379 }; 2380 2381 // Build CDN URL: https://cdn.bsky.app/img/{preset}/plain/{did}/{cid}@jpeg 2382 let cdn_base_url = "https://cdn.bsky.app/img"; 2383 let url = format!( 2384 "{}/{}/plain/{}/{}@jpeg", 2385 cdn_base_url, 2386 preset, 2387 container.did, 2388 container.blob_ref 2389 ); 2390 2391 Ok(Some(GraphQLValue::from(url))) 2392 }) 2393 }) 2394 .argument(async_graphql::dynamic::InputValue::new( 2395 "preset", 2396 TypeRef::named(TypeRef::STRING), 2397 )) 2398 .description("Generate CDN URL for the blob with the specified preset (avatar, banner, feed_thumbnail, feed_fullsize)"), 2399 ); 2400 2401 blob 2402} 2403 2404/// Creates the SyncResult GraphQL type for mutation responses 2405fn create_sync_result_type() -> Object { 2406 let mut sync_result = Object::new("SyncResult"); 2407 2408 sync_result = sync_result.field(Field::new( 2409 "success", 2410 TypeRef::named_nn(TypeRef::BOOLEAN), 2411 |ctx| { 2412 FieldFuture::new(async move { 2413 let value = ctx 2414 .parent_value 2415 .downcast_ref::<GraphQLValue>() 2416 .ok_or_else(|| Error::new("Failed to downcast sync result"))?; 2417 if let GraphQLValue::Object(obj) = value { 2418 if let Some(success) = obj.get("success") { 2419 return Ok(Some(success.clone())); 2420 } 2421 } 2422 Ok(None) 2423 }) 2424 }, 2425 )); 2426 2427 sync_result = sync_result.field(Field::new( 2428 "reposProcessed", 2429 TypeRef::named_nn(TypeRef::INT), 2430 |ctx| { 2431 FieldFuture::new(async move { 2432 let value = ctx 2433 .parent_value 2434 .downcast_ref::<GraphQLValue>() 2435 .ok_or_else(|| Error::new("Failed to downcast sync result"))?; 2436 if let GraphQLValue::Object(obj) = value { 2437 if let Some(repos) = obj.get("reposProcessed") { 2438 return Ok(Some(repos.clone())); 2439 } 2440 } 2441 Ok(None) 2442 }) 2443 }, 2444 )); 2445 2446 sync_result = sync_result.field(Field::new( 2447 "recordsSynced", 2448 TypeRef::named_nn(TypeRef::INT), 2449 |ctx| { 2450 FieldFuture::new(async move { 2451 let value = ctx 2452 .parent_value 2453 .downcast_ref::<GraphQLValue>() 2454 .ok_or_else(|| Error::new("Failed to downcast sync result"))?; 2455 if let GraphQLValue::Object(obj) = value { 2456 if let Some(records) = obj.get("recordsSynced") { 2457 return Ok(Some(records.clone())); 2458 } 2459 } 2460 Ok(None) 2461 }) 2462 }, 2463 )); 2464 2465 sync_result = sync_result.field(Field::new( 2466 "timedOut", 2467 TypeRef::named_nn(TypeRef::BOOLEAN), 2468 |ctx| { 2469 FieldFuture::new(async move { 2470 let value = ctx 2471 .parent_value 2472 .downcast_ref::<GraphQLValue>() 2473 .ok_or_else(|| Error::new("Failed to downcast sync result"))?; 2474 if let GraphQLValue::Object(obj) = value { 2475 if let Some(timed_out) = obj.get("timedOut") { 2476 return Ok(Some(timed_out.clone())); 2477 } 2478 } 2479 Ok(None) 2480 }) 2481 }, 2482 )); 2483 2484 sync_result = sync_result.field(Field::new( 2485 "message", 2486 TypeRef::named_nn(TypeRef::STRING), 2487 |ctx| { 2488 FieldFuture::new(async move { 2489 let value = ctx 2490 .parent_value 2491 .downcast_ref::<GraphQLValue>() 2492 .ok_or_else(|| Error::new("Failed to downcast sync result"))?; 2493 if let GraphQLValue::Object(obj) = value { 2494 if let Some(message) = obj.get("message") { 2495 return Ok(Some(message.clone())); 2496 } 2497 } 2498 Ok(None) 2499 }) 2500 }, 2501 )); 2502 2503 sync_result 2504} 2505 2506/// Creates the SortDirection enum type 2507fn create_sort_direction_enum() -> Enum { 2508 Enum::new("SortDirection") 2509 .item(EnumItem::new("asc")) 2510 .item(EnumItem::new("desc")) 2511} 2512 2513/// Creates the SortField input type 2514fn create_sort_field_input() -> InputObject { 2515 InputObject::new("SortField") 2516 .field(InputValue::new("field", TypeRef::named_nn(TypeRef::STRING))) 2517 .field(InputValue::new( 2518 "direction", 2519 TypeRef::named_nn("SortDirection"), 2520 )) 2521} 2522 2523/// Creates the StringCondition input type for string field filtering 2524fn create_string_condition_input() -> InputObject { 2525 InputObject::new("StringCondition") 2526 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))) 2527 .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING))) 2528 .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING))) 2529 .field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING))) 2530} 2531 2532/// Creates the IntCondition input type for int field filtering 2533fn create_int_condition_input() -> InputObject { 2534 InputObject::new("IntCondition") 2535 .field(InputValue::new("eq", TypeRef::named(TypeRef::INT))) 2536 .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT))) 2537} 2538 2539/// Creates the PageInfo type for connection pagination 2540fn create_page_info_type() -> Object { 2541 let mut page_info = Object::new("PageInfo"); 2542 2543 page_info = page_info.field(Field::new( 2544 "hasNextPage", 2545 TypeRef::named_nn(TypeRef::BOOLEAN), 2546 |ctx| { 2547 FieldFuture::new(async move { 2548 let value = ctx 2549 .parent_value 2550 .downcast_ref::<GraphQLValue>() 2551 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; 2552 if let GraphQLValue::Object(obj) = value { 2553 if let Some(has_next) = obj.get("hasNextPage") { 2554 return Ok(Some(has_next.clone())); 2555 } 2556 } 2557 Ok(Some(GraphQLValue::from(false))) 2558 }) 2559 }, 2560 )); 2561 2562 page_info = page_info.field(Field::new( 2563 "hasPreviousPage", 2564 TypeRef::named_nn(TypeRef::BOOLEAN), 2565 |ctx| { 2566 FieldFuture::new(async move { 2567 let value = ctx 2568 .parent_value 2569 .downcast_ref::<GraphQLValue>() 2570 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; 2571 if let GraphQLValue::Object(obj) = value { 2572 if let Some(has_prev) = obj.get("hasPreviousPage") { 2573 return Ok(Some(has_prev.clone())); 2574 } 2575 } 2576 Ok(Some(GraphQLValue::from(false))) 2577 }) 2578 }, 2579 )); 2580 2581 page_info = page_info.field(Field::new( 2582 "startCursor", 2583 TypeRef::named(TypeRef::STRING), 2584 |ctx| { 2585 FieldFuture::new(async move { 2586 let value = ctx 2587 .parent_value 2588 .downcast_ref::<GraphQLValue>() 2589 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; 2590 if let GraphQLValue::Object(obj) = value { 2591 if let Some(cursor) = obj.get("startCursor") { 2592 return Ok(Some(cursor.clone())); 2593 } 2594 } 2595 Ok(None) 2596 }) 2597 }, 2598 )); 2599 2600 page_info = page_info.field(Field::new( 2601 "endCursor", 2602 TypeRef::named(TypeRef::STRING), 2603 |ctx| { 2604 FieldFuture::new(async move { 2605 let value = ctx 2606 .parent_value 2607 .downcast_ref::<GraphQLValue>() 2608 .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; 2609 if let GraphQLValue::Object(obj) = value { 2610 if let Some(cursor) = obj.get("endCursor") { 2611 return Ok(Some(cursor.clone())); 2612 } 2613 } 2614 Ok(None) 2615 }) 2616 }, 2617 )); 2618 2619 page_info 2620} 2621 2622/// Connection data structure that holds all connection fields 2623#[derive(Clone)] 2624struct ConnectionData { 2625 total_count: i32, 2626 has_next_page: bool, 2627 end_cursor: Option<String>, 2628 nodes: Vec<RecordContainer>, 2629} 2630 2631/// Edge data structure for Relay connections 2632#[derive(Clone)] 2633struct EdgeData { 2634 node: RecordContainer, 2635 cursor: String, 2636} 2637 2638/// Creates an Edge type for a given record type 2639/// Example: For "Post" creates "PostEdge" with node and cursor 2640fn create_edge_type(record_type_name: &str) -> Object { 2641 let edge_name = format!("{}Edge", record_type_name); 2642 let mut edge = Object::new(&edge_name); 2643 2644 // Add node field 2645 let record_type = record_type_name.to_string(); 2646 edge = edge.field(Field::new("node", TypeRef::named_nn(&record_type), |ctx| { 2647 FieldFuture::new(async move { 2648 let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?; 2649 Ok(Some(FieldValue::owned_any(edge_data.node.clone()))) 2650 }) 2651 })); 2652 2653 // Add cursor field 2654 edge = edge.field(Field::new( 2655 "cursor", 2656 TypeRef::named_nn(TypeRef::STRING), 2657 |ctx| { 2658 FieldFuture::new(async move { 2659 let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?; 2660 Ok(Some(GraphQLValue::from(edge_data.cursor.clone()))) 2661 }) 2662 }, 2663 )); 2664 2665 edge 2666} 2667 2668/// Creates a Connection type for a given record type 2669/// Example: For "Post" creates "PostConnection" with edges, pageInfo, and totalCount 2670fn create_connection_type(record_type_name: &str) -> Object { 2671 let connection_name = format!("{}Connection", record_type_name); 2672 let mut connection = Object::new(&connection_name); 2673 2674 // Add totalCount field 2675 connection = connection.field(Field::new( 2676 "totalCount", 2677 TypeRef::named_nn(TypeRef::INT), 2678 |ctx| { 2679 FieldFuture::new(async move { 2680 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?; 2681 Ok(Some(GraphQLValue::from(data.total_count))) 2682 }) 2683 }, 2684 )); 2685 2686 // Add pageInfo field 2687 connection = connection.field(Field::new( 2688 "pageInfo", 2689 TypeRef::named_nn("PageInfo"), 2690 |ctx| { 2691 FieldFuture::new(async move { 2692 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?; 2693 2694 let mut page_info = async_graphql::indexmap::IndexMap::new(); 2695 page_info.insert( 2696 async_graphql::Name::new("hasNextPage"), 2697 GraphQLValue::from(data.has_next_page), 2698 ); 2699 // For forward pagination only, hasPreviousPage is always false 2700 page_info.insert( 2701 async_graphql::Name::new("hasPreviousPage"), 2702 GraphQLValue::from(false), 2703 ); 2704 2705 // Add startCursor (first node's cid if available) 2706 if !data.nodes.is_empty() { 2707 if let Some(first_record) = data.nodes.first() { 2708 let start_cursor = general_purpose::URL_SAFE_NO_PAD 2709 .encode(first_record.record.cid.clone()); 2710 page_info.insert( 2711 async_graphql::Name::new("startCursor"), 2712 GraphQLValue::from(start_cursor), 2713 ); 2714 } 2715 } 2716 2717 // Add endCursor 2718 if let Some(ref cursor) = data.end_cursor { 2719 page_info.insert( 2720 async_graphql::Name::new("endCursor"), 2721 GraphQLValue::from(cursor.clone()), 2722 ); 2723 } 2724 2725 Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info)))) 2726 }) 2727 }, 2728 )); 2729 2730 // Add edges field (Relay standard) 2731 let edge_type = format!("{}Edge", record_type_name); 2732 connection = connection.field(Field::new( 2733 "edges", 2734 TypeRef::named_nn_list_nn(&edge_type), 2735 |ctx| { 2736 FieldFuture::new(async move { 2737 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?; 2738 2739 let field_values: Vec<FieldValue<'_>> = data 2740 .nodes 2741 .iter() 2742 .map(|node| { 2743 // Use base64-encoded CID as cursor 2744 let cursor = 2745 general_purpose::URL_SAFE_NO_PAD.encode(node.record.cid.clone()); 2746 let edge = EdgeData { 2747 node: node.clone(), 2748 cursor, 2749 }; 2750 FieldValue::owned_any(edge) 2751 }) 2752 .collect(); 2753 2754 Ok(Some(FieldValue::list(field_values))) 2755 }) 2756 }, 2757 )); 2758 2759 // Add nodes field (convenience, direct access to records without edges wrapper) 2760 connection = connection.field(Field::new( 2761 "nodes", 2762 TypeRef::named_nn_list_nn(record_type_name), 2763 |ctx| { 2764 FieldFuture::new(async move { 2765 let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?; 2766 2767 let field_values: Vec<FieldValue<'_>> = data 2768 .nodes 2769 .iter() 2770 .map(|node| FieldValue::owned_any(node.clone())) 2771 .collect(); 2772 2773 Ok(Some(FieldValue::list(field_values))) 2774 }) 2775 }, 2776 )); 2777 2778 connection 2779} 2780 2781/// Creates the Mutation root type with sync operations and dynamic collection mutations 2782fn create_mutation_type( 2783 database: Database, 2784 slice_uri: String, 2785 lexicons: &[serde_json::Value], 2786) -> Object { 2787 let mut mutation = Object::new("Mutation"); 2788 2789 // Add syncUserCollections mutation 2790 let db_clone = database.clone(); 2791 let slice_clone = slice_uri.clone(); 2792 2793 mutation = mutation.field( 2794 Field::new( 2795 "syncUserCollections", 2796 TypeRef::named_nn("SyncResult"), 2797 move |ctx| { 2798 let db = db_clone.clone(); 2799 let slice = slice_clone.clone(); 2800 2801 FieldFuture::new(async move { 2802 let did = ctx 2803 .args 2804 .get("did") 2805 .and_then(|v| v.string().ok()) 2806 .ok_or_else(|| Error::new("did argument is required"))?; 2807 2808 // Create sync service and call sync_user_collections 2809 let cache_backend = crate::cache::CacheFactory::create_cache( 2810 crate::cache::CacheBackend::InMemory { ttl_seconds: None }, 2811 ) 2812 .await 2813 .map_err(|e| Error::new(format!("Failed to create cache: {}", e)))?; 2814 let cache = Arc::new(Mutex::new(crate::cache::SliceCache::new(cache_backend))); 2815 let sync_service = crate::sync::SyncService::with_cache( 2816 db.clone(), 2817 std::env::var("RELAY_ENDPOINT") 2818 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()), 2819 cache, 2820 ); 2821 2822 let result = sync_service 2823 .sync_user_collections(did, &slice, 30) // 30 second timeout 2824 .await 2825 .map_err(|e| Error::new(format!("Sync failed: {}", e)))?; 2826 2827 // Convert result to GraphQL object 2828 let mut obj = async_graphql::indexmap::IndexMap::new(); 2829 obj.insert( 2830 async_graphql::Name::new("success"), 2831 GraphQLValue::from(result.success), 2832 ); 2833 obj.insert( 2834 async_graphql::Name::new("reposProcessed"), 2835 GraphQLValue::from(result.repos_processed), 2836 ); 2837 obj.insert( 2838 async_graphql::Name::new("recordsSynced"), 2839 GraphQLValue::from(result.records_synced), 2840 ); 2841 obj.insert( 2842 async_graphql::Name::new("timedOut"), 2843 GraphQLValue::from(result.timed_out), 2844 ); 2845 obj.insert( 2846 async_graphql::Name::new("message"), 2847 GraphQLValue::from(result.message), 2848 ); 2849 2850 Ok(Some(FieldValue::owned_any(GraphQLValue::Object(obj)))) 2851 }) 2852 }, 2853 ) 2854 .argument(async_graphql::dynamic::InputValue::new( 2855 "did", 2856 TypeRef::named_nn(TypeRef::STRING), 2857 )) 2858 .description("Sync user collections for a given DID"), 2859 ); 2860 2861 // Add dynamic mutations for each collection 2862 for lexicon in lexicons { 2863 if let (Some(nsid), Some(defs)) = ( 2864 lexicon.get("id").and_then(|n| n.as_str()), 2865 lexicon.get("defs"), 2866 ) { 2867 let fields = extract_collection_fields(defs); 2868 if !fields.is_empty() { 2869 let type_name = nsid_to_type_name(nsid); 2870 2871 // Add create mutation 2872 mutation = add_create_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone()); 2873 2874 // Add update mutation 2875 mutation = add_update_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone()); 2876 2877 // Add delete mutation 2878 mutation = add_delete_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone()); 2879 } 2880 } 2881 } 2882 2883 mutation 2884} 2885 2886/// Converts NSID to GraphQL type name 2887/// Example: app.bsky.feed.post -> AppBskyFeedPost 2888fn nsid_to_type_name(nsid: &str) -> String { 2889 nsid.split('.') 2890 .map(|part| { 2891 let mut chars = part.chars(); 2892 match chars.next() { 2893 None => String::new(), 2894 Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(), 2895 } 2896 }) 2897 .collect::<Vec<_>>() 2898 .join("") 2899} 2900 2901/// Converts NSID to GraphQL query name in camelCase and pluralized 2902/// Example: app.bsky.feed.post -> appBskyFeedPosts 2903/// Example: fm.teal.alpha.feed.play -> fmTealAlphaFeedPlays 2904fn nsid_to_query_name(nsid: &str) -> String { 2905 // First convert to camelCase like join fields 2906 let camel_case = nsid_to_join_field_name(nsid); 2907 2908 // Then pluralize the end 2909 if camel_case.ends_with("s") 2910 || camel_case.ends_with("x") 2911 || camel_case.ends_with("ch") 2912 || camel_case.ends_with("sh") 2913 { 2914 format!("{}es", camel_case) // status -> statuses, box -> boxes 2915 } else if camel_case.ends_with("y") && camel_case.len() > 1 { 2916 let chars: Vec<char> = camel_case.chars().collect(); 2917 if chars.len() > 1 && !['a', 'e', 'i', 'o', 'u'].contains(&chars[chars.len() - 2]) { 2918 format!("{}ies", &camel_case[..camel_case.len() - 1]) // party -> parties 2919 } else { 2920 format!("{}s", camel_case) // day -> days 2921 } 2922 } else { 2923 format!("{}s", camel_case) // post -> posts 2924 } 2925} 2926 2927/// Converts NSID to GraphQL join field name in camelCase 2928/// Example: app.bsky.actor.profile -> appBskyActorProfile 2929fn nsid_to_join_field_name(nsid: &str) -> String { 2930 let parts: Vec<&str> = nsid.split('.').collect(); 2931 if parts.is_empty() { 2932 return nsid.to_string(); 2933 } 2934 2935 // First part is lowercase, rest are capitalized 2936 let mut result = parts[0].to_string(); 2937 for part in &parts[1..] { 2938 let mut chars = part.chars(); 2939 if let Some(first) = chars.next() { 2940 result.push_str(&first.to_uppercase().collect::<String>()); 2941 result.push_str(chars.as_str()); 2942 } 2943 } 2944 2945 result 2946} 2947 2948/// Creates an aggregated type for GROUP BY queries 2949/// Returns a dynamic object with the grouped fields plus a count field 2950fn create_aggregated_type(type_name: &str, fields: &[GraphQLField]) -> Object { 2951 let mut aggregated = Object::new(type_name); 2952 2953 // Add fields from the lexicon that can be grouped 2954 // Use JSON type for all fields to support both strings and complex types 2955 for field in fields { 2956 let field_name = field.name.clone(); 2957 let field_name_clone = field_name.clone(); 2958 aggregated = aggregated.field(Field::new( 2959 &field_name, 2960 TypeRef::named("JSON"), 2961 move |ctx| { 2962 let field_name = field_name_clone.clone(); 2963 FieldFuture::new(async move { 2964 let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?; 2965 if let Some(obj) = json_value.as_object() { 2966 if let Some(value) = obj.get(&field_name) { 2967 // Convert serde_json::Value to async_graphql::Value 2968 let graphql_value = serde_json_to_graphql_value(value); 2969 return Ok(Some(graphql_value)); 2970 } 2971 } 2972 Ok(None) 2973 }) 2974 }, 2975 )); 2976 } 2977 2978 // Add count field 2979 aggregated = aggregated.field(Field::new( 2980 "count", 2981 TypeRef::named_nn(TypeRef::INT), 2982 |ctx| { 2983 FieldFuture::new(async move { 2984 let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?; 2985 if let Some(obj) = json_value.as_object() { 2986 if let Some(count) = obj.get("count") { 2987 if let Some(count_i64) = count.as_i64() { 2988 return Ok(Some(GraphQLValue::from(count_i64 as i32))); 2989 } 2990 } 2991 } 2992 Ok(Some(GraphQLValue::from(0))) 2993 }) 2994 }, 2995 )); 2996 2997 aggregated 2998} 2999 3000/// Creates the AggregationOrderBy input type for ordering by count 3001fn create_aggregation_order_by_input() -> InputObject { 3002 InputObject::new("AggregationOrderBy") 3003 .field(InputValue::new("count", TypeRef::named("SortDirection"))) 3004} 3005 3006/// Creates the DateInterval enum for date truncation 3007fn create_date_interval_enum() -> Enum { 3008 Enum::new("DateInterval") 3009 .item(EnumItem::new("second")) 3010 .item(EnumItem::new("minute")) 3011 .item(EnumItem::new("hour")) 3012 .item(EnumItem::new("day")) 3013 .item(EnumItem::new("week")) 3014 .item(EnumItem::new("month")) 3015 .item(EnumItem::new("quarter")) 3016 .item(EnumItem::new("year")) 3017} 3018 3019/// Converts a serde_json::Value to an async_graphql::Value 3020fn serde_json_to_graphql_value(value: &serde_json::Value) -> GraphQLValue { 3021 match value { 3022 serde_json::Value::Null => GraphQLValue::Null, 3023 serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b), 3024 serde_json::Value::Number(n) => { 3025 if let Some(i) = n.as_i64() { 3026 GraphQLValue::Number(i.into()) 3027 } else if let Some(f) = n.as_f64() { 3028 GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into()) 3029 } else { 3030 GraphQLValue::Null 3031 } 3032 } 3033 serde_json::Value::String(s) => GraphQLValue::String(s.clone()), 3034 serde_json::Value::Array(arr) => { 3035 let values: Vec<GraphQLValue> = arr.iter().map(serde_json_to_graphql_value).collect(); 3036 GraphQLValue::List(values) 3037 } 3038 serde_json::Value::Object(obj) => { 3039 let mut map = async_graphql::indexmap::IndexMap::new(); 3040 for (k, v) in obj { 3041 map.insert(async_graphql::Name::new(k), serde_json_to_graphql_value(v)); 3042 } 3043 GraphQLValue::Object(map) 3044 } 3045 } 3046} 3047 3048/// Creates the RecordUpdate type for subscription events 3049fn create_record_update_type() -> Object { 3050 let mut record_update = Object::new("RecordUpdate"); 3051 3052 record_update = record_update.field(Field::new( 3053 "uri", 3054 TypeRef::named_nn(TypeRef::STRING), 3055 |ctx| { 3056 FieldFuture::new(async move { 3057 let value = ctx 3058 .parent_value 3059 .downcast_ref::<GraphQLValue>() 3060 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; 3061 if let GraphQLValue::Object(obj) = value { 3062 if let Some(uri) = obj.get("uri") { 3063 return Ok(Some(uri.clone())); 3064 } 3065 } 3066 Ok(None) 3067 }) 3068 }, 3069 )); 3070 3071 record_update = record_update.field(Field::new( 3072 "cid", 3073 TypeRef::named_nn(TypeRef::STRING), 3074 |ctx| { 3075 FieldFuture::new(async move { 3076 let value = ctx 3077 .parent_value 3078 .downcast_ref::<GraphQLValue>() 3079 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; 3080 if let GraphQLValue::Object(obj) = value { 3081 if let Some(cid) = obj.get("cid") { 3082 return Ok(Some(cid.clone())); 3083 } 3084 } 3085 Ok(None) 3086 }) 3087 }, 3088 )); 3089 3090 record_update = record_update.field(Field::new( 3091 "did", 3092 TypeRef::named_nn(TypeRef::STRING), 3093 |ctx| { 3094 FieldFuture::new(async move { 3095 let value = ctx 3096 .parent_value 3097 .downcast_ref::<GraphQLValue>() 3098 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; 3099 if let GraphQLValue::Object(obj) = value { 3100 if let Some(did) = obj.get("did") { 3101 return Ok(Some(did.clone())); 3102 } 3103 } 3104 Ok(None) 3105 }) 3106 }, 3107 )); 3108 3109 record_update = record_update.field(Field::new( 3110 "collection", 3111 TypeRef::named_nn(TypeRef::STRING), 3112 |ctx| { 3113 FieldFuture::new(async move { 3114 let value = ctx 3115 .parent_value 3116 .downcast_ref::<GraphQLValue>() 3117 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; 3118 if let GraphQLValue::Object(obj) = value { 3119 if let Some(collection) = obj.get("collection") { 3120 return Ok(Some(collection.clone())); 3121 } 3122 } 3123 Ok(None) 3124 }) 3125 }, 3126 )); 3127 3128 record_update = record_update.field(Field::new( 3129 "indexedAt", 3130 TypeRef::named_nn(TypeRef::STRING), 3131 |ctx| { 3132 FieldFuture::new(async move { 3133 let value = ctx 3134 .parent_value 3135 .downcast_ref::<GraphQLValue>() 3136 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; 3137 if let GraphQLValue::Object(obj) = value { 3138 if let Some(indexed_at) = obj.get("indexedAt") { 3139 return Ok(Some(indexed_at.clone())); 3140 } 3141 } 3142 Ok(None) 3143 }) 3144 }, 3145 )); 3146 3147 record_update = record_update.field(Field::new( 3148 "operation", 3149 TypeRef::named_nn(TypeRef::STRING), 3150 |ctx| { 3151 FieldFuture::new(async move { 3152 let value = ctx 3153 .parent_value 3154 .downcast_ref::<GraphQLValue>() 3155 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; 3156 if let GraphQLValue::Object(obj) = value { 3157 if let Some(operation) = obj.get("operation") { 3158 return Ok(Some(operation.clone())); 3159 } 3160 } 3161 Ok(None) 3162 }) 3163 }, 3164 )); 3165 3166 record_update = record_update.field(Field::new("value", TypeRef::named_nn("JSON"), |ctx| { 3167 FieldFuture::new(async move { 3168 let value = ctx 3169 .parent_value 3170 .downcast_ref::<GraphQLValue>() 3171 .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; 3172 if let GraphQLValue::Object(obj) = value { 3173 if let Some(val) = obj.get("value") { 3174 return Ok(Some(val.clone())); 3175 } 3176 } 3177 Ok(None) 3178 }) 3179 })); 3180 3181 record_update 3182} 3183 3184/// Creates the Subscription root type with collection-specific subscriptions 3185fn create_subscription_type(slice_uri: String, lexicons: &[serde_json::Value]) -> Subscription { 3186 let mut subscription = Subscription::new("Subscription"); 3187 3188 // For each record collection, create {collection}Created, {collection}Updated, {collection}Deleted subscriptions 3189 for lexicon in lexicons { 3190 let nsid = match lexicon.get("id").and_then(|n| n.as_str()) { 3191 Some(n) => n, 3192 None => continue, 3193 }; 3194 3195 let defs = match lexicon.get("defs") { 3196 Some(d) => d, 3197 None => continue, 3198 }; 3199 3200 // Only process record types (skip queries, procedures, etc.) 3201 let is_record = defs 3202 .get("main") 3203 .and_then(|m| m.get("type")) 3204 .and_then(|t| t.as_str()) 3205 == Some("record"); 3206 3207 if !is_record { 3208 continue; 3209 } 3210 3211 let fields = extract_collection_fields(defs); 3212 if fields.is_empty() { 3213 continue; 3214 } 3215 3216 let type_name = nsid_to_type_name(nsid); 3217 let field_base_name = nsid_to_join_field_name(nsid); 3218 3219 // {collection}Created subscription 3220 let created_field_name = format!("{}Created", field_base_name); 3221 let slice_for_created = slice_uri.clone(); 3222 let nsid_for_created = nsid.to_string(); 3223 let type_name_for_created = type_name.clone(); 3224 3225 subscription = subscription.field(SubscriptionField::new( 3226 &created_field_name, 3227 TypeRef::named_nn(&type_name_for_created), 3228 move |_ctx| { 3229 let slice_uri = slice_for_created.clone(); 3230 let collection = nsid_for_created.clone(); 3231 3232 SubscriptionFieldFuture::new(async move { 3233 let mut receiver = PUBSUB.subscribe(&slice_uri).await; 3234 3235 let stream = async_stream::stream! { 3236 while let Ok(event) = receiver.recv().await { 3237 // Filter by collection and operation 3238 if event.collection != collection || event.operation != crate::graphql::RecordOperation::Create { 3239 continue; 3240 } 3241 3242 // Convert to RecordContainer and yield 3243 let indexed_record = crate::models::IndexedRecord { 3244 uri: event.uri, 3245 cid: event.cid, 3246 did: event.did, 3247 collection: event.collection, 3248 value: event.value, 3249 indexed_at: event.indexed_at, 3250 }; 3251 let container = RecordContainer { 3252 record: indexed_record, 3253 }; 3254 yield Ok(FieldValue::owned_any(container)); 3255 } 3256 }; 3257 3258 Ok(stream) 3259 }) 3260 }, 3261 ) 3262 .description(format!("Subscribe to {} record creation events", nsid))); 3263 3264 // {collection}Updated subscription 3265 let updated_field_name = format!("{}Updated", field_base_name); 3266 let slice_for_updated = slice_uri.clone(); 3267 let nsid_for_updated = nsid.to_string(); 3268 let type_name_for_updated = type_name.clone(); 3269 3270 subscription = subscription.field(SubscriptionField::new( 3271 &updated_field_name, 3272 TypeRef::named_nn(&type_name_for_updated), 3273 move |_ctx| { 3274 let slice_uri = slice_for_updated.clone(); 3275 let collection = nsid_for_updated.clone(); 3276 3277 SubscriptionFieldFuture::new(async move { 3278 let mut receiver = PUBSUB.subscribe(&slice_uri).await; 3279 3280 let stream = async_stream::stream! { 3281 while let Ok(event) = receiver.recv().await { 3282 // Filter by collection and operation 3283 if event.collection != collection || event.operation != crate::graphql::RecordOperation::Update { 3284 continue; 3285 } 3286 3287 // Convert to RecordContainer and yield 3288 let indexed_record = crate::models::IndexedRecord { 3289 uri: event.uri, 3290 cid: event.cid, 3291 did: event.did, 3292 collection: event.collection, 3293 value: event.value, 3294 indexed_at: event.indexed_at, 3295 }; 3296 let container = RecordContainer { 3297 record: indexed_record, 3298 }; 3299 yield Ok(FieldValue::owned_any(container)); 3300 } 3301 }; 3302 3303 Ok(stream) 3304 }) 3305 }, 3306 ) 3307 .description(format!("Subscribe to {} record update events", nsid))); 3308 3309 // {collection}Deleted subscription - returns just the URI string 3310 let deleted_field_name = format!("{}Deleted", field_base_name); 3311 let slice_for_deleted = slice_uri.clone(); 3312 let nsid_for_deleted = nsid.to_string(); 3313 3314 subscription = subscription.field(SubscriptionField::new( 3315 &deleted_field_name, 3316 TypeRef::named_nn(TypeRef::STRING), 3317 move |_ctx| { 3318 let slice_uri = slice_for_deleted.clone(); 3319 let collection = nsid_for_deleted.clone(); 3320 3321 SubscriptionFieldFuture::new(async move { 3322 let mut receiver = PUBSUB.subscribe(&slice_uri).await; 3323 3324 let stream = async_stream::stream! { 3325 while let Ok(event) = receiver.recv().await { 3326 // Filter by collection and operation 3327 if event.collection != collection || event.operation != crate::graphql::RecordOperation::Delete { 3328 continue; 3329 } 3330 3331 // For deletes, just return the URI 3332 yield Ok(FieldValue::value(GraphQLValue::String(event.uri))); 3333 } 3334 }; 3335 3336 Ok(stream) 3337 }) 3338 }, 3339 ) 3340 .description(format!("Subscribe to {} record deletion events. Returns the URI of deleted records.", nsid))); 3341 } 3342 3343 subscription 3344} 3345 3346/// Helper function to parse GraphQL where clause recursively 3347fn parse_where_clause( 3348 where_obj: async_graphql::dynamic::ObjectAccessor, 3349) -> crate::models::WhereClause { 3350 let mut where_clause = crate::models::WhereClause { 3351 conditions: HashMap::new(), 3352 or_conditions: None, 3353 and: None, 3354 or: None, 3355 }; 3356 3357 for (field_name, condition_val) in where_obj.iter() { 3358 let field_str = field_name.as_str(); 3359 3360 // Handle nested AND array 3361 if field_str == "and" { 3362 if let Ok(and_list) = condition_val.list() { 3363 let mut and_clauses = Vec::new(); 3364 for item in and_list.iter() { 3365 if let Ok(obj) = item.object() { 3366 and_clauses.push(parse_where_clause(obj)); 3367 } 3368 } 3369 if !and_clauses.is_empty() { 3370 where_clause.and = Some(and_clauses); 3371 } 3372 } 3373 continue; 3374 } 3375 3376 // Handle nested OR array 3377 if field_str == "or" { 3378 if let Ok(or_list) = condition_val.list() { 3379 let mut or_clauses = Vec::new(); 3380 for item in or_list.iter() { 3381 if let Ok(obj) = item.object() { 3382 or_clauses.push(parse_where_clause(obj)); 3383 } 3384 } 3385 if !or_clauses.is_empty() { 3386 where_clause.or = Some(or_clauses); 3387 } 3388 } 3389 continue; 3390 } 3391 3392 // Handle regular field conditions 3393 if let Ok(condition_obj) = condition_val.object() { 3394 let mut where_condition = crate::models::WhereCondition { 3395 eq: None, 3396 in_values: None, 3397 contains: None, 3398 fuzzy: None, 3399 gt: None, 3400 gte: None, 3401 lt: None, 3402 lte: None, 3403 }; 3404 3405 // Parse eq condition 3406 if let Some(eq_val) = condition_obj.get("eq") { 3407 if let Ok(eq_str) = eq_val.string() { 3408 where_condition.eq = Some(serde_json::Value::String(eq_str.to_string())); 3409 } else if let Ok(eq_i64) = eq_val.i64() { 3410 where_condition.eq = Some(serde_json::Value::Number(eq_i64.into())); 3411 } 3412 } 3413 3414 // Parse in condition 3415 if let Some(in_val) = condition_obj.get("in") { 3416 if let Ok(in_list) = in_val.list() { 3417 let mut values = Vec::new(); 3418 for item in in_list.iter() { 3419 if let Ok(s) = item.string() { 3420 values.push(serde_json::Value::String(s.to_string())); 3421 } else if let Ok(i) = item.i64() { 3422 values.push(serde_json::Value::Number(i.into())); 3423 } 3424 } 3425 where_condition.in_values = Some(values); 3426 } 3427 } 3428 3429 // Parse contains condition 3430 if let Some(contains_val) = condition_obj.get("contains") { 3431 if let Ok(contains_str) = contains_val.string() { 3432 where_condition.contains = Some(contains_str.to_string()); 3433 } 3434 } 3435 3436 // Parse fuzzy condition 3437 if let Some(fuzzy_val) = condition_obj.get("fuzzy") { 3438 if let Ok(fuzzy_str) = fuzzy_val.string() { 3439 where_condition.fuzzy = Some(fuzzy_str.to_string()); 3440 } 3441 } 3442 3443 // Parse gt condition 3444 if let Some(gt_val) = condition_obj.get("gt") { 3445 if let Ok(gt_str) = gt_val.string() { 3446 where_condition.gt = Some(serde_json::Value::String(gt_str.to_string())); 3447 } else if let Ok(gt_i64) = gt_val.i64() { 3448 where_condition.gt = Some(serde_json::Value::Number(gt_i64.into())); 3449 } 3450 } 3451 3452 // Parse gte condition 3453 if let Some(gte_val) = condition_obj.get("gte") { 3454 if let Ok(gte_str) = gte_val.string() { 3455 where_condition.gte = Some(serde_json::Value::String(gte_str.to_string())); 3456 } else if let Ok(gte_i64) = gte_val.i64() { 3457 where_condition.gte = Some(serde_json::Value::Number(gte_i64.into())); 3458 } 3459 } 3460 3461 // Parse lt condition 3462 if let Some(lt_val) = condition_obj.get("lt") { 3463 if let Ok(lt_str) = lt_val.string() { 3464 where_condition.lt = Some(serde_json::Value::String(lt_str.to_string())); 3465 } else if let Ok(lt_i64) = lt_val.i64() { 3466 where_condition.lt = Some(serde_json::Value::Number(lt_i64.into())); 3467 } 3468 } 3469 3470 // Parse lte condition 3471 if let Some(lte_val) = condition_obj.get("lte") { 3472 if let Ok(lte_str) = lte_val.string() { 3473 where_condition.lte = Some(serde_json::Value::String(lte_str.to_string())); 3474 } else if let Ok(lte_i64) = lte_val.i64() { 3475 where_condition.lte = Some(serde_json::Value::Number(lte_i64.into())); 3476 } 3477 } 3478 3479 // Convert indexedAt to indexed_at for database column 3480 let db_field_name = if field_str == "indexedAt" { 3481 "indexed_at".to_string() 3482 } else { 3483 field_str.to_string() 3484 }; 3485 3486 where_clause 3487 .conditions 3488 .insert(db_field_name, where_condition); 3489 } 3490 } 3491 3492 where_clause 3493} 3494 3495/// Creates an input type for mutations from lexicon fields 3496fn create_mutation_input_type(type_name: &str, fields: &[GraphQLField]) -> InputObject { 3497 let mut input = InputObject::new(format!("{}Input", type_name)); 3498 3499 for field in fields { 3500 let field_type_ref = match &field.field_type { 3501 GraphQLType::String => TypeRef::named(TypeRef::STRING), 3502 GraphQLType::Int => TypeRef::named(TypeRef::INT), 3503 GraphQLType::Boolean => TypeRef::named(TypeRef::BOOLEAN), 3504 GraphQLType::Float => TypeRef::named(TypeRef::FLOAT), 3505 GraphQLType::Array(inner) => match **inner { 3506 GraphQLType::String => TypeRef::named_list(TypeRef::STRING), 3507 GraphQLType::Int => TypeRef::named_list(TypeRef::INT), 3508 _ => TypeRef::named("JSON"), 3509 }, 3510 _ => TypeRef::named("JSON"), 3511 }; 3512 3513 let field_type_ref = if field.is_required { 3514 TypeRef::NonNull(Box::new(field_type_ref)) 3515 } else { 3516 field_type_ref 3517 }; 3518 3519 input = input.field(InputValue::new(&field.name, field_type_ref)); 3520 } 3521 3522 input 3523} 3524 3525/// Transforms fields in record data from GraphQL format to AT Protocol format 3526/// 3527/// Blob fields: 3528/// - GraphQL format: `{ref: "bafyrei...", mimeType: "...", size: 123}` 3529/// - AT Protocol format: `{$type: "blob", ref: {$link: "bafyrei..."}, mimeType: "...", size: 123}` 3530/// 3531/// Lexicon ref fields: 3532/// - Adds `$type: "{ref_nsid}"` to objects (e.g., `{$type: "community.lexicon.location.hthree#main", ...}`) 3533/// 3534/// Nested objects: 3535/// - Recursively processes nested objects and arrays 3536fn transform_fields_for_atproto( 3537 mut data: serde_json::Value, 3538 fields: &[GraphQLField], 3539) -> serde_json::Value { 3540 if let serde_json::Value::Object(ref mut map) = data { 3541 for field in fields { 3542 if let Some(field_value) = map.get_mut(&field.name) { 3543 match &field.field_type { 3544 GraphQLType::Blob => { 3545 // Transform single blob field 3546 if let Some(blob_obj) = field_value.as_object_mut() { 3547 // Add $type: "blob" 3548 blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string())); 3549 3550 // Check if ref is a string (GraphQL format) 3551 if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") { 3552 // Transform to {$link: "cid"} (AT Protocol format) 3553 let link_obj = serde_json::json!({ 3554 "$link": cid 3555 }); 3556 blob_obj.insert("ref".to_string(), link_obj); 3557 } 3558 } 3559 } 3560 GraphQLType::LexiconRef(ref_nsid) => { 3561 // Transform lexicon ref field by adding $type 3562 if let Some(ref_obj) = field_value.as_object_mut() { 3563 ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone())); 3564 } 3565 } 3566 GraphQLType::Object(nested_fields) => { 3567 // Recursively transform nested objects 3568 *field_value = transform_fields_for_atproto(field_value.clone(), nested_fields); 3569 } 3570 GraphQLType::Array(inner) => { 3571 match inner.as_ref() { 3572 GraphQLType::Blob => { 3573 // Transform array of blobs 3574 if let Some(arr) = field_value.as_array_mut() { 3575 for blob_value in arr { 3576 if let Some(blob_obj) = blob_value.as_object_mut() { 3577 // Add $type: "blob" 3578 blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string())); 3579 3580 if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") { 3581 let link_obj = serde_json::json!({ 3582 "$link": cid 3583 }); 3584 blob_obj.insert("ref".to_string(), link_obj); 3585 } 3586 } 3587 } 3588 } 3589 } 3590 GraphQLType::LexiconRef(ref_nsid) => { 3591 // Transform array of lexicon refs 3592 if let Some(arr) = field_value.as_array_mut() { 3593 for ref_value in arr { 3594 if let Some(ref_obj) = ref_value.as_object_mut() { 3595 ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone())); 3596 } 3597 } 3598 } 3599 } 3600 GraphQLType::Object(nested_fields) => { 3601 // Transform array of objects recursively 3602 if let Some(arr) = field_value.as_array_mut() { 3603 for item in arr { 3604 *item = transform_fields_for_atproto(item.clone(), nested_fields); 3605 } 3606 } 3607 } 3608 _ => {} // Other array types don't need transformation 3609 } 3610 } 3611 _ => {} // Other field types don't need transformation 3612 } 3613 } 3614 } 3615 } 3616 3617 data 3618} 3619 3620/// Adds a create mutation for a collection 3621fn add_create_mutation( 3622 mutation: Object, 3623 type_name: &str, 3624 nsid: &str, 3625 fields: &[GraphQLField], 3626 database: Database, 3627 slice_uri: String, 3628) -> Object { 3629 let mutation_name = format!("create{}", type_name); 3630 let nsid = nsid.to_string(); 3631 let nsid_clone = nsid.clone(); 3632 let fields = fields.to_vec(); 3633 3634 mutation.field( 3635 Field::new( 3636 mutation_name, 3637 TypeRef::named_nn(type_name), 3638 move |ctx| { 3639 let db = database.clone(); 3640 let slice = slice_uri.clone(); 3641 let collection = nsid.clone(); 3642 let fields = fields.clone(); 3643 3644 FieldFuture::new(async move { 3645 // Get GraphQL context which contains auth info 3646 let gql_ctx = ctx.data::<crate::graphql::GraphQLContext>() 3647 .map_err(|_| Error::new("Missing GraphQL context"))?; 3648 3649 // Check if user is authenticated 3650 let token = gql_ctx.auth_token.as_ref() 3651 .ok_or_else(|| Error::new("Authentication required"))?; 3652 3653 // Extract input data 3654 let input = ctx.args.get("input") 3655 .ok_or_else(|| Error::new("Missing input argument"))?; 3656 3657 // Convert GraphQL value to JSON using deserialize 3658 let mut record_data: serde_json::Value = input.deserialize() 3659 .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?; 3660 3661 // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs) 3662 record_data = transform_fields_for_atproto(record_data, &fields); 3663 3664 // Optional rkey argument 3665 let rkey = ctx.args.get("rkey") 3666 .and_then(|v| v.string().ok()) 3667 .filter(|s| !s.is_empty()) 3668 .map(|s| s.to_string()); 3669 3670 // Verify OAuth token and get user info 3671 let user_info = crate::auth::verify_oauth_token_cached( 3672 token, 3673 &gql_ctx.auth_base_url, 3674 gql_ctx.auth_cache.clone(), 3675 ) 3676 .await 3677 .map_err(|e| Error::new(format!("Invalid token: {}", e)))?; 3678 3679 // Get AT Protocol DPoP auth and PDS URL 3680 let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached( 3681 token, 3682 &gql_ctx.auth_base_url, 3683 gql_ctx.auth_cache.clone(), 3684 ) 3685 .await 3686 .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?; 3687 3688 let repo = user_info.did.unwrap_or(user_info.sub); 3689 3690 // Validate record against lexicon if not network.slices.lexicon 3691 let validation_slice_uri = if collection == "network.slices.lexicon" { 3692 std::env::var("SYSTEM_SLICE_URI").unwrap_or_default() 3693 } else { 3694 slice.clone() 3695 }; 3696 3697 if !validation_slice_uri.is_empty() { 3698 if let Ok(lexicons) = db.get_lexicons_by_slice(&validation_slice_uri).await { 3699 if !lexicons.is_empty() { 3700 if let Err(e) = slices_lexicon::validate_record( 3701 lexicons, 3702 &collection, 3703 record_data.clone() 3704 ) { 3705 return Err(Error::new(format!("Validation error: {}", e))); 3706 } 3707 } 3708 } 3709 } 3710 3711 // Create record using AT Protocol 3712 let http_client = reqwest::Client::new(); 3713 let create_request = atproto_client::com::atproto::repo::CreateRecordRequest { 3714 repo: repo.clone(), 3715 collection: collection.clone(), 3716 record_key: rkey, 3717 record: record_data.clone(), 3718 swap_commit: None, 3719 validate: false, 3720 }; 3721 3722 let result = atproto_client::com::atproto::repo::create_record( 3723 &http_client, 3724 &atproto_client::client::Auth::DPoP(dpop_auth), 3725 &pds_url, 3726 create_request, 3727 ) 3728 .await 3729 .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?; 3730 3731 // Extract URI and CID from the response 3732 let (uri, cid) = match result { 3733 atproto_client::com::atproto::repo::CreateRecordResponse::StrongRef { uri, cid, .. } => (uri, cid), 3734 atproto_client::com::atproto::repo::CreateRecordResponse::Error(e) => { 3735 return Err(Error::new(format!("AT Protocol error: {} - {}", e.error.unwrap_or_default(), e.message.unwrap_or_default()))); 3736 } 3737 }; 3738 3739 // Extract the target slice URI for lexicon records before moving record_data 3740 let target_slice_for_invalidation = if collection == "network.slices.lexicon" { 3741 record_data.get("slice") 3742 .and_then(|v| v.as_str()) 3743 .map(|s| s.to_string()) 3744 } else { 3745 None 3746 }; 3747 3748 // Store in local database for indexing 3749 let record = crate::models::Record { 3750 uri: uri.clone(), 3751 cid: cid.clone(), 3752 did: repo, 3753 collection: collection.clone(), 3754 json: record_data, 3755 indexed_at: chrono::Utc::now(), 3756 slice_uri: Some(slice.clone()), 3757 }; 3758 3759 let _ = db.insert_record(&record).await; 3760 3761 // Invalidate GraphQL schema cache if this is a lexicon record 3762 if let Some(target_slice) = target_slice_for_invalidation { 3763 crate::graphql::invalidate_schema_cache(&target_slice).await; 3764 } 3765 3766 // Fetch the created record and return it so Relay can update its cache 3767 let created_record = db.get_record(&uri).await 3768 .map_err(|e| Error::new(format!("Failed to fetch created record: {}", e)))? 3769 .ok_or_else(|| Error::new("Created record not found"))?; 3770 3771 // Wrap the record in a RecordContainer so the type fields can access it 3772 let container = RecordContainer { 3773 record: created_record, 3774 }; 3775 3776 Ok(Some(FieldValue::owned_any(container))) 3777 }) 3778 }, 3779 ) 3780 .argument(InputValue::new("input", TypeRef::named_nn(format!("{}Input", type_name)))) 3781 .argument(InputValue::new("rkey", TypeRef::named(TypeRef::STRING))) 3782 .description(format!("Create a new {} record", nsid_clone)) 3783 ) 3784} 3785 3786/// Adds an update mutation for a collection 3787fn add_update_mutation( 3788 mutation: Object, 3789 type_name: &str, 3790 nsid: &str, 3791 fields: &[GraphQLField], 3792 database: Database, 3793 slice_uri: String, 3794) -> Object { 3795 let mutation_name = format!("update{}", type_name); 3796 let nsid = nsid.to_string(); 3797 let nsid_clone = nsid.clone(); 3798 let fields = fields.to_vec(); 3799 3800 mutation.field( 3801 Field::new( 3802 mutation_name, 3803 TypeRef::named_nn(type_name), 3804 move |ctx| { 3805 let db = database.clone(); 3806 let slice = slice_uri.clone(); 3807 let collection = nsid.clone(); 3808 let fields = fields.clone(); 3809 3810 FieldFuture::new(async move { 3811 // Get GraphQL context which contains auth info 3812 let gql_ctx = ctx.data::<crate::graphql::GraphQLContext>() 3813 .map_err(|_| Error::new("Missing GraphQL context"))?; 3814 3815 // Check if user is authenticated 3816 let token = gql_ctx.auth_token.as_ref() 3817 .ok_or_else(|| Error::new("Authentication required"))?; 3818 3819 // Get rkey 3820 let rkey = ctx.args.get("rkey") 3821 .and_then(|v| v.string().ok()) 3822 .ok_or_else(|| Error::new("Missing rkey argument"))? 3823 .to_string(); 3824 3825 // Extract input data 3826 let input = ctx.args.get("input") 3827 .ok_or_else(|| Error::new("Missing input argument"))?; 3828 3829 // Convert GraphQL value to JSON using deserialize 3830 let mut record_data: serde_json::Value = input.deserialize() 3831 .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?; 3832 3833 // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs) 3834 record_data = transform_fields_for_atproto(record_data, &fields); 3835 3836 // Verify OAuth token and get user info 3837 let user_info = crate::auth::verify_oauth_token_cached( 3838 token, 3839 &gql_ctx.auth_base_url, 3840 gql_ctx.auth_cache.clone(), 3841 ) 3842 .await 3843 .map_err(|e| Error::new(format!("Invalid token: {}", e)))?; 3844 3845 // Get AT Protocol DPoP auth and PDS URL 3846 let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached( 3847 token, 3848 &gql_ctx.auth_base_url, 3849 gql_ctx.auth_cache.clone(), 3850 ) 3851 .await 3852 .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?; 3853 3854 let repo = user_info.did.unwrap_or(user_info.sub); 3855 3856 // Validate record against lexicon 3857 let validation_slice_uri = if collection == "network.slices.lexicon" { 3858 std::env::var("SYSTEM_SLICE_URI").unwrap_or_default() 3859 } else { 3860 slice.clone() 3861 }; 3862 3863 if !validation_slice_uri.is_empty() { 3864 if let Ok(lexicons) = db.get_lexicons_by_slice(&validation_slice_uri).await { 3865 if !lexicons.is_empty() { 3866 if let Err(e) = slices_lexicon::validate_record( 3867 lexicons, 3868 &collection, 3869 record_data.clone() 3870 ) { 3871 return Err(Error::new(format!("Validation error: {}", e))); 3872 } 3873 } 3874 } 3875 } 3876 3877 // Update record using AT Protocol 3878 let http_client = reqwest::Client::new(); 3879 let put_request = atproto_client::com::atproto::repo::PutRecordRequest { 3880 repo: repo.clone(), 3881 collection: collection.clone(), 3882 record_key: rkey.clone(), 3883 record: record_data.clone(), 3884 swap_record: None, 3885 swap_commit: None, 3886 validate: false, 3887 }; 3888 3889 let result = atproto_client::com::atproto::repo::put_record( 3890 &http_client, 3891 &atproto_client::client::Auth::DPoP(dpop_auth), 3892 &pds_url, 3893 put_request, 3894 ) 3895 .await 3896 .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?; 3897 3898 // Extract URI and CID from the response 3899 let (uri, cid) = match result { 3900 atproto_client::com::atproto::repo::PutRecordResponse::StrongRef { uri, cid, .. } => (uri, cid), 3901 atproto_client::com::atproto::repo::PutRecordResponse::Error(e) => { 3902 return Err(Error::new(format!("AT Protocol error: {} - {}", e.error.unwrap_or_default(), e.message.unwrap_or_default()))); 3903 } 3904 }; 3905 3906 // Extract the target slice URI for lexicon records before moving record_data 3907 let target_slice_for_invalidation = if collection == "network.slices.lexicon" { 3908 record_data.get("slice") 3909 .and_then(|v| v.as_str()) 3910 .map(|s| s.to_string()) 3911 } else { 3912 None 3913 }; 3914 3915 // Update in local database 3916 let record = crate::models::Record { 3917 uri: uri.clone(), 3918 cid: cid.clone(), 3919 did: repo, 3920 collection: collection.clone(), 3921 json: record_data, 3922 indexed_at: chrono::Utc::now(), 3923 slice_uri: Some(slice.clone()), 3924 }; 3925 3926 let _ = db.update_record(&record).await; 3927 3928 // Invalidate GraphQL schema cache if this is a lexicon record 3929 if let Some(target_slice) = target_slice_for_invalidation { 3930 crate::graphql::invalidate_schema_cache(&target_slice).await; 3931 } 3932 3933 // Fetch the updated record and return it so Relay can update its cache 3934 let updated_record = db.get_record(&uri).await 3935 .map_err(|e| Error::new(format!("Failed to fetch updated record: {}", e)))? 3936 .ok_or_else(|| Error::new("Updated record not found"))?; 3937 3938 // Wrap the record in a RecordContainer so the type fields can access it 3939 let container = RecordContainer { 3940 record: updated_record, 3941 }; 3942 3943 Ok(Some(FieldValue::owned_any(container))) 3944 }) 3945 }, 3946 ) 3947 .argument(InputValue::new("rkey", TypeRef::named_nn(TypeRef::STRING))) 3948 .argument(InputValue::new("input", TypeRef::named_nn(format!("{}Input", type_name)))) 3949 .description(format!("Update a {} record", nsid_clone)) 3950 ) 3951} 3952 3953/// Adds a delete mutation for a collection 3954fn add_delete_mutation( 3955 mutation: Object, 3956 type_name: &str, 3957 nsid: &str, 3958 database: Database, 3959 slice_uri: String, 3960) -> Object { 3961 let mutation_name = format!("delete{}", type_name); 3962 let nsid = nsid.to_string(); 3963 let nsid_clone = nsid.clone(); 3964 3965 mutation.field( 3966 Field::new( 3967 mutation_name, 3968 TypeRef::named_nn(type_name), 3969 move |ctx| { 3970 let _db = database.clone(); 3971 let _slice = slice_uri.clone(); 3972 let _collection = nsid.clone(); 3973 3974 FieldFuture::new(async move { 3975 // Get GraphQL context which contains auth info 3976 let gql_ctx = ctx.data::<crate::graphql::GraphQLContext>() 3977 .map_err(|_| Error::new("Missing GraphQL context"))?; 3978 3979 // Check if user is authenticated 3980 let token = gql_ctx.auth_token.as_ref() 3981 .ok_or_else(|| Error::new("Authentication required"))?; 3982 3983 // Get rkey 3984 let rkey = ctx.args.get("rkey") 3985 .and_then(|v| v.string().ok()) 3986 .ok_or_else(|| Error::new("Missing rkey argument"))? 3987 .to_string(); 3988 3989 // Verify OAuth token and get user info 3990 let user_info = crate::auth::verify_oauth_token_cached( 3991 token, 3992 &gql_ctx.auth_base_url, 3993 gql_ctx.auth_cache.clone(), 3994 ) 3995 .await 3996 .map_err(|e| Error::new(format!("Invalid token: {}", e)))?; 3997 3998 // Get AT Protocol DPoP auth and PDS URL 3999 let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached( 4000 token, 4001 &gql_ctx.auth_base_url, 4002 gql_ctx.auth_cache.clone(), 4003 ) 4004 .await 4005 .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?; 4006 4007 let repo = user_info.did.unwrap_or(user_info.sub); 4008 let uri = format!("at://{}/{}/{}", repo, _collection, rkey); 4009 4010 // Fetch the record before deleting it 4011 let deleted_record = _db.get_record(&uri).await 4012 .map_err(|e| Error::new(format!("Failed to fetch record: {}", e)))? 4013 .ok_or_else(|| Error::new("Record not found"))?; 4014 4015 // Extract the target slice URI for lexicon records before deletion 4016 let target_slice_for_invalidation = if _collection == "network.slices.lexicon" { 4017 deleted_record.value.get("slice") 4018 .and_then(|v| v.as_str()) 4019 .map(|s| s.to_string()) 4020 } else { 4021 None 4022 }; 4023 4024 // Delete record using AT Protocol 4025 let http_client = reqwest::Client::new(); 4026 let delete_request = atproto_client::com::atproto::repo::DeleteRecordRequest { 4027 repo: repo.clone(), 4028 collection: _collection.clone(), 4029 record_key: rkey.clone(), 4030 swap_record: None, 4031 swap_commit: None, 4032 }; 4033 4034 atproto_client::com::atproto::repo::delete_record( 4035 &http_client, 4036 &atproto_client::client::Auth::DPoP(dpop_auth), 4037 &pds_url, 4038 delete_request, 4039 ) 4040 .await 4041 .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?; 4042 4043 // Handle cascade deletion 4044 if let Err(e) = _db.handle_cascade_deletion(&uri, &_collection).await { 4045 tracing::warn!("Cascade deletion failed for {}: {}", uri, e); 4046 } 4047 4048 // Delete from local database 4049 let _ = _db.delete_record_by_uri(&uri, None).await; 4050 4051 // Invalidate GraphQL schema cache if this is a lexicon record 4052 if let Some(target_slice) = target_slice_for_invalidation { 4053 crate::graphql::invalidate_schema_cache(&target_slice).await; 4054 } 4055 4056 // Wrap the record in a RecordContainer so the type fields can access it 4057 let container = RecordContainer { 4058 record: deleted_record, 4059 }; 4060 4061 Ok(Some(FieldValue::owned_any(container))) 4062 }) 4063 }, 4064 ) 4065 .argument(InputValue::new("rkey", TypeRef::named_nn(TypeRef::STRING))) 4066 .description(format!("Delete a {} record", nsid_clone)) 4067 ) 4068} 4069 4070/// Creates the MutationResponse type for create/update mutations 4071fn create_mutation_response_type() -> Object { 4072 Object::new("MutationResponse") 4073 .field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| { 4074 FieldFuture::new(async move { 4075 // Parent value is a GraphQLValue, extract the Object variant 4076 if let Ok(value) = ctx.parent_value.try_downcast_ref::<GraphQLValue>() { 4077 if let GraphQLValue::Object(obj) = value { 4078 if let Some(uri) = obj.get(&async_graphql::Name::new("uri")) { 4079 return Ok(Some(uri.clone())); 4080 } 4081 } 4082 } 4083 Ok(None) 4084 }) 4085 })) 4086 .field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| { 4087 FieldFuture::new(async move { 4088 // Parent value is a GraphQLValue, extract the Object variant 4089 if let Ok(value) = ctx.parent_value.try_downcast_ref::<GraphQLValue>() { 4090 if let GraphQLValue::Object(obj) = value { 4091 if let Some(cid) = obj.get(&async_graphql::Name::new("cid")) { 4092 return Ok(Some(cid.clone())); 4093 } 4094 } 4095 } 4096 Ok(None) 4097 }) 4098 })) 4099}