//! Dynamic GraphQL schema builder from AT Protocol lexicons //! //! This module generates GraphQL schemas at runtime based on lexicon definitions //! stored in the database, enabling flexible querying of slice records. use async_graphql::dynamic::{ Enum, EnumItem, Field, FieldFuture, FieldValue, InputObject, InputValue, Object, Scalar, Schema, Subscription, SubscriptionField, SubscriptionFieldFuture, TypeRef, }; use async_graphql::{Error, Value as GraphQLValue}; use base64::Engine; use base64::engine::general_purpose; use serde_json; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; use crate::database::Database; use crate::graphql::dataloader::GraphQLContext; use crate::graphql::schema_ext::{ add_cancel_job_mutation, add_create_oauth_client_mutation, add_delete_job_mutation, add_delete_oauth_client_mutation, add_delete_slice_records_mutation, add_get_sync_summary_query, add_jetstream_logs_query, add_jetstream_logs_subscription, add_oauth_clients_query, add_oauth_clients_field_to_slice, add_slice_records_query, add_sparklines_query, add_sparklines_field_to_slice, add_start_sync_mutation, add_stats_field_to_slice, add_sync_job_logs_query, add_sync_job_query, add_sync_job_subscription, add_sync_jobs_query, add_update_oauth_client_mutation, add_upload_blob_mutation, create_blob_upload_response_type, create_collection_stats_type, create_collection_summary_type, create_delete_slice_records_output_type, create_jetstream_log_entry_type, create_oauth_client_type, create_slice_record_type, create_slice_record_edge_type, create_slice_records_connection_type, create_slice_records_where_input, create_slice_sparkline_type, create_slice_stats_type, create_sparkline_point_type, create_start_sync_output_type, create_sync_job_result_type, create_sync_job_type, create_sync_summary_type, }; use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType}; use crate::graphql::PUBSUB; /// Metadata about a collection for cross-referencing #[derive(Clone)] struct CollectionMeta { nsid: String, key_type: String, // "tid", "literal:self", or "any" type_name: String, // GraphQL type name for this collection at_uri_fields: Vec, // Fields with format "at-uri" for reverse joins } /// Type registry for tracking generated nested object types type TypeRegistry = HashMap; /// Container for nested object field values #[derive(Clone)] struct NestedObjectContainer { data: serde_json::Value, } /// Generates a unique type name for a nested object field fn generate_nested_type_name(parent_type: &str, field_name: &str) -> String { let mut chars = field_name.chars(); let capitalized_field = match chars.next() { None => String::new(), Some(first) => first.to_uppercase().collect::() + chars.as_str(), }; format!("{}{}", parent_type, capitalized_field) } /// Resolves a lexicon ref and generates a GraphQL type for it /// Returns the generated type name fn resolve_lexicon_ref_type( ref_nsid: &str, current_lexicon_nsid: &str, all_lexicons: &[serde_json::Value], type_registry: &mut TypeRegistry, database: &Database, ) -> String { // Handle different ref formats: // 1. Local ref: #image // 2. External ref with specific def: app.bsky.embed.defs#aspectRatio // 3. External ref to main: community.lexicon.location.hthree let (target_nsid, def_name) = if ref_nsid.starts_with('#') { // Local ref - use current lexicon NSID and the def name without # (current_lexicon_nsid, &ref_nsid[1..]) } else if let Some(hash_pos) = ref_nsid.find('#') { // External ref with specific def - split on # (&ref_nsid[..hash_pos], &ref_nsid[hash_pos + 1..]) } else { // External ref to main def (ref_nsid, "main") }; // Generate type name from NSID and def name let type_name = if def_name == "main" { // For refs to main: CommunityLexiconLocationHthree nsid_to_type_name(target_nsid) } else { // For refs to specific def: AppBskyEmbedDefsAspectRatio format!("{}{}", nsid_to_type_name(target_nsid), capitalize_first(def_name)) }; // Check if already generated if type_registry.contains_key(&type_name) { return type_name; } // Find the lexicon definition let lexicon = all_lexicons.iter().find(|lex| { lex.get("id").and_then(|id| id.as_str()) == Some(target_nsid) }); if let Some(lex) = lexicon { // Extract the definition (either "main" or specific def like "image") if let Some(defs) = lex.get("defs") { if let Some(def) = defs.get(def_name) { // Extract fields from this specific definition if let Some(properties) = def.get("properties") { let fields = extract_fields_from_properties(properties); if !fields.is_empty() { // Generate the type using existing nested object generator generate_nested_object_type(&type_name, &fields, type_registry, database); return type_name; } } } } } // Fallback: couldn't resolve the ref, will use JSON tracing::warn!("Could not resolve lexicon ref: {} (target: {}, def: {})", ref_nsid, target_nsid, def_name); type_name } /// Capitalizes the first character of a string fn capitalize_first(s: &str) -> String { let mut chars = s.chars(); match chars.next() { None => String::new(), Some(first) => first.to_uppercase().collect::() + chars.as_str(), } } /// Extracts fields from a lexicon properties object fn extract_fields_from_properties(properties: &serde_json::Value) -> Vec { let mut fields = Vec::new(); if let Some(props) = properties.as_object() { for (field_name, field_def) in props { let field_type_str = field_def.get("type").and_then(|t| t.as_str()).unwrap_or("unknown"); let field_type = crate::graphql::types::map_lexicon_type_to_graphql(field_type_str, field_def); // Check if field is required let is_required = false; // We'd need the parent's "required" array to know this // Extract format if present let format = field_def.get("format").and_then(|f| f.as_str()).map(|s| s.to_string()); fields.push(GraphQLField { name: field_name.clone(), field_type, is_required, format, }); } } fields } /// Recursively generates GraphQL object types for nested objects /// Returns the type name of the generated object type fn generate_nested_object_type( type_name: &str, fields: &[GraphQLField], type_registry: &mut TypeRegistry, database: &Database, ) -> String { // Check if type already exists in registry if type_registry.contains_key(type_name) { return type_name.to_string(); } let mut object = Object::new(type_name); // Add fields to the object for field in fields { let field_name = field.name.clone(); let field_name_for_field = field_name.clone(); // Clone for Field::new let field_type = field.field_type.clone(); // Determine the TypeRef for this field let type_ref = match &field.field_type { GraphQLType::Object(nested_fields) => { // Generate nested object type recursively let nested_type_name = generate_nested_type_name(type_name, &field_name); let actual_type_name = generate_nested_object_type( &nested_type_name, nested_fields, type_registry, database, ); if field.is_required { TypeRef::named_nn(actual_type_name) } else { TypeRef::named(actual_type_name) } } GraphQLType::Array(inner) => { if let GraphQLType::Object(nested_fields) = inner.as_ref() { // Generate nested object type for array items let nested_type_name = generate_nested_type_name(type_name, &field_name); let actual_type_name = generate_nested_object_type( &nested_type_name, nested_fields, type_registry, database, ); if field.is_required { TypeRef::named_nn_list(actual_type_name) } else { TypeRef::named_list(actual_type_name) } } else { // Use standard type ref for arrays of primitives graphql_type_to_typeref(&field.field_type, field.is_required) } } _ => { // Use standard type ref for other types graphql_type_to_typeref(&field.field_type, field.is_required) } }; // Add field with resolver object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| { let field_name = field_name.clone(); let field_type = field_type.clone(); FieldFuture::new(async move { // Get parent container let container = ctx.parent_value.try_downcast_ref::()?; let value = container.data.get(&field_name); if let Some(val) = value { if val.is_null() { return Ok(None); } // For nested objects, wrap in container if matches!(field_type, GraphQLType::Object(_)) { let nested_container = NestedObjectContainer { data: val.clone(), }; return Ok(Some(FieldValue::owned_any(nested_container))); } // For arrays of objects, wrap each item if let GraphQLType::Array(inner) = &field_type { if matches!(inner.as_ref(), GraphQLType::Object(_)) { if let Some(arr) = val.as_array() { let containers: Vec = arr .iter() .map(|item| { let nested_container = NestedObjectContainer { data: item.clone(), }; FieldValue::owned_any(nested_container) }) .collect(); return Ok(Some(FieldValue::list(containers))); } return Ok(Some(FieldValue::list(Vec::::new()))); } } // For other types, return the GraphQL value let graphql_val = json_to_graphql_value(val); Ok(Some(FieldValue::value(graphql_val))) } else { Ok(None) } }) })); } // Store the generated type in registry type_registry.insert(type_name.to_string(), object); type_name.to_string() } /// Builds a dynamic GraphQL schema from lexicons for a given slice pub async fn build_graphql_schema(database: Database, slice_uri: String, auth_base_url: String) -> Result { // Fetch all lexicons for this slice let all_lexicons = database .get_lexicons_by_slice(&slice_uri) .await .map_err(|e| format!("Failed to load lexicons: {}", e))?; // Deduplicate by NSID for schema building (keep most recent due to ORDER BY indexed_at DESC) // This prevents duplicate type registration errors without hiding duplicates from users let mut seen_nsids = std::collections::HashSet::new(); let lexicons: Vec = all_lexicons .into_iter() .filter(|lexicon| { if let Some(nsid) = lexicon.get("id").and_then(|n| n.as_str()) { seen_nsids.insert(nsid.to_string()) } else { true // Keep lexicons without an ID (will fail validation later) } }) .collect(); // Build Query root type and collect all object types let mut query = Object::new("Query"); let mut objects_to_register = Vec::new(); let mut where_inputs_to_register = Vec::new(); let mut group_by_enums_to_register = Vec::new(); let mut mutation_inputs_to_register = Vec::new(); // First pass: collect metadata about all collections for cross-referencing let mut all_collections: Vec = Vec::new(); for lexicon in &lexicons { let nsid = lexicon .get("id") .and_then(|n| n.as_str()) .ok_or_else(|| "Lexicon missing id".to_string())?; let defs = lexicon .get("defs") .ok_or_else(|| format!("Lexicon {} missing defs", nsid))?; let fields = extract_collection_fields(defs); if !fields.is_empty() { if let Some(key_type) = extract_record_key(defs) { // Extract at-uri field names for reverse joins let at_uri_fields: Vec = fields .iter() .filter(|f| f.format.as_deref() == Some("at-uri")) .map(|f| f.name.clone()) .collect(); if !at_uri_fields.is_empty() { tracing::debug!("Collection {} has at-uri fields: {:?}", nsid, at_uri_fields); } all_collections.push(CollectionMeta { nsid: nsid.to_string(), key_type, type_name: nsid_to_type_name(nsid), at_uri_fields, }); } } } // Initialize type registry for nested object types let mut type_registry: TypeRegistry = HashMap::new(); // Second pass: create types and queries for lexicon in &lexicons { // get_lexicons_by_slice returns {lexicon: 1, id: "nsid", defs: {...}} let nsid = lexicon .get("id") .and_then(|n| n.as_str()) .ok_or_else(|| "Lexicon missing id".to_string())?; let defs = lexicon .get("defs") .ok_or_else(|| format!("Lexicon {} missing defs", nsid))? .clone(); // Extract fields from lexicon let fields = extract_collection_fields(&defs); if !fields.is_empty() { // Create a GraphQL type for this collection let type_name = nsid_to_type_name(nsid); let record_type = create_record_type( &type_name, &fields, database.clone(), slice_uri.clone(), &all_collections, auth_base_url.clone(), &mut type_registry, &lexicons, nsid, ); // Create edge and connection types for this collection (Relay standard) let edge_type = create_edge_type(&type_name); let connection_type = create_connection_type(&type_name); // Create WhereInput type for this collection let mut where_input = InputObject::new(format!("{}WhereInput", type_name)); // Collect lexicon field names to avoid duplicates let lexicon_field_names: std::collections::HashSet<&str> = fields.iter().map(|f| f.name.as_str()).collect(); // Add system fields available on all records (skip if already in lexicon) let system_fields = [ ("indexedAt", "DateTimeFilter"), ("uri", "StringFilter"), ("cid", "StringFilter"), ("did", "StringFilter"), ("collection", "StringFilter"), ("actorHandle", "StringFilter"), ]; for (field_name, filter_type) in system_fields { if !lexicon_field_names.contains(field_name) { where_input = where_input.field(InputValue::new(field_name, TypeRef::named(filter_type))); } } // Add fields from the lexicon for field in &fields { let filter_type = match field.field_type { GraphQLType::Int => "IntFilter", _ => "StringFilter", // Default to StringFilter for strings and other types }; where_input = where_input.field(InputValue::new(&field.name, TypeRef::named(filter_type))); } // Add generic json field for querying record content where_input = where_input.field(InputValue::new("json", TypeRef::named("StringFilter"))); // Add nested and/or support where_input = where_input .field(InputValue::new( "and", TypeRef::named_list(format!("{}WhereInput", type_name)), )) .field(InputValue::new( "or", TypeRef::named_list(format!("{}WhereInput", type_name)), )); // Create GroupByField enum for this collection let mut group_by_enum = Enum::new(format!("{}GroupByField", type_name)); group_by_enum = group_by_enum.item(EnumItem::new("indexedAt")); for field in &fields { group_by_enum = group_by_enum.item(EnumItem::new(&field.name)); } // Create collection-specific GroupByFieldInput let group_by_input = InputObject::new(format!("{}GroupByFieldInput", type_name)) .field(InputValue::new( "field", TypeRef::named_nn(format!("{}GroupByField", type_name)), )) .field(InputValue::new("interval", TypeRef::named("DateInterval"))); // Create collection-specific SortFieldInput let sort_field_input = InputObject::new(format!("{}SortFieldInput", type_name)) .field(InputValue::new( "field", TypeRef::named_nn(format!("{}GroupByField", type_name)), )) .field(InputValue::new( "direction", TypeRef::named("SortDirection"), )); // Create mutation input type for this collection let mutation_input = create_mutation_input_type(&type_name, &fields); // Collect the types to register with schema later objects_to_register.push(record_type); objects_to_register.push(edge_type); objects_to_register.push(connection_type); where_inputs_to_register.push(where_input); where_inputs_to_register.push(group_by_input); where_inputs_to_register.push(sort_field_input); group_by_enums_to_register.push(group_by_enum); mutation_inputs_to_register.push(mutation_input); // Add query field for this collection let collection_query_name = nsid_to_query_name(nsid); let db_clone = database.clone(); let slice_clone = slice_uri.clone(); let nsid_clone = nsid.to_string(); let connection_type_name = format!("{}Connection", &type_name); query = query.field( Field::new( &collection_query_name, TypeRef::named_nn(&connection_type_name), move |ctx| { let db = db_clone.clone(); let slice = slice_clone.clone(); let collection = nsid_clone.clone(); FieldFuture::new(async move { // Get Relay-standard pagination arguments let first: i32 = match ctx.args.get("first") { Some(val) => val.i64().unwrap_or(50) as i32, None => 50, }; let after: Option<&str> = match ctx.args.get("after") { Some(val) => val.string().ok(), None => None, }; // Parse sortBy argument let sort_by: Option> = match ctx .args .get("sortBy") { Some(val) => { if let Ok(list) = val.list() { let mut sort_fields = Vec::new(); for item in list.iter() { if let Ok(obj) = item.object() { let field = obj .get("field") .and_then(|v| { v.enum_name().ok().map(|s| s.to_string()) }) .unwrap_or_else(|| "indexedAt".to_string()); let direction = obj .get("direction") .and_then(|v| { v.enum_name().ok().map(|s| s.to_string()) }) .unwrap_or_else(|| "desc".to_string()); sort_fields.push(crate::models::SortField { field, direction, }); } } Some(sort_fields) } else { None } } None => None, }; // Build where clause for this collection let mut where_clause = crate::models::WhereClause { conditions: HashMap::new(), or_conditions: None, and: None, or: None, }; // Always filter by collection where_clause.conditions.insert( "collection".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(collection.clone())), in_values: None, contains: None, fuzzy: None, }, ); // Parse where argument if provided if let Some(where_val) = ctx.args.get("where") { if let Ok(where_obj) = where_val.object() { let parsed_where = parse_where_clause(where_obj); // Merge parsed conditions with existing collection filter where_clause.conditions.extend(parsed_where.conditions); where_clause.or_conditions = parsed_where.or_conditions; where_clause.and = parsed_where.and; where_clause.or = parsed_where.or; } } // Resolve actorHandle to did if present if let Some(actor_handle_condition) = where_clause.conditions.remove("actorHandle") { // Handle different filter types let dids = if let Some(pattern) = &actor_handle_condition.contains { // Pattern matching (contains) db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() } else if let Some(pattern) = &actor_handle_condition.fuzzy { // Fuzzy matching db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() } else { // Exact matching (eq / in_values) let mut handles = Vec::new(); if let Some(eq_value) = &actor_handle_condition.eq { if let Some(handle_str) = eq_value.as_str() { handles.push(handle_str.to_string()); } } if let Some(in_values) = &actor_handle_condition.in_values { for value in in_values { if let Some(handle_str) = value.as_str() { handles.push(handle_str.to_string()); } } } if !handles.is_empty() { db.resolve_handles_to_dids(&handles, &slice).await.ok() } else { None } }; // Replace actorHandle condition with did condition if we found matches if let Some(dids) = dids { if !dids.is_empty() { let did_condition = if dids.len() == 1 { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String( dids[0].clone(), )), in_values: None, contains: None, fuzzy: None, } } else { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: None, in_values: Some( dids.into_iter() .map(|d| { serde_json::Value::String(d) }) .collect(), ), contains: None, fuzzy: None, } }; where_clause .conditions .insert("did".to_string(), did_condition); } // If no DIDs found, the query will return 0 results naturally } } // Resolve actorHandle to did in OR clauses if let Some(or_clauses) = &mut where_clause.or { for or_clause in or_clauses.iter_mut() { if let Some(actor_handle_condition) = or_clause.conditions.remove("actorHandle") { // Handle different filter types let dids = if let Some(pattern) = &actor_handle_condition.contains { db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() } else if let Some(pattern) = &actor_handle_condition.fuzzy { db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() } else { let mut handles = Vec::new(); if let Some(eq_value) = &actor_handle_condition.eq { if let Some(handle_str) = eq_value.as_str() { handles.push(handle_str.to_string()); } } if let Some(in_values) = &actor_handle_condition.in_values { for value in in_values { if let Some(handle_str) = value.as_str() { handles.push(handle_str.to_string()); } } } if !handles.is_empty() { db.resolve_handles_to_dids(&handles, &slice).await.ok() } else { None } }; // Replace actorHandle with did condition if let Some(dids) = dids { if !dids.is_empty() { let did_condition = if dids.len() == 1 { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(dids[0].clone())), in_values: None, contains: None, fuzzy: None, } } else { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: None, in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()), contains: None, fuzzy: None, } }; or_clause.conditions.insert("did".to_string(), did_condition); } } } } } // Query database for records let (records, next_cursor) = db .get_slice_collections_records( &slice, Some(first), after, sort_by.as_ref(), Some(&where_clause), ) .await .map_err(|e| Error::new(format!("Database query failed: {}", e)))?; // Query database for total count let total_count = db .count_slice_collections_records(&slice, Some(&where_clause)) .await .map_err(|e| Error::new(format!("Count query failed: {}", e)))? as i32; // Convert records to RecordContainers let record_containers: Vec = records .into_iter() .map(|record| { // Convert Record to IndexedRecord let indexed_record = crate::models::IndexedRecord { uri: record.uri, cid: record.cid, did: record.did, collection: record.collection, value: record.json, indexed_at: record.indexed_at.to_rfc3339(), }; RecordContainer { record: indexed_record, } }) .collect(); // Build Connection data let connection_data = ConnectionData { total_count, has_next_page: next_cursor.is_some(), end_cursor: next_cursor, nodes: record_containers, }; Ok(Some(FieldValue::owned_any(connection_data))) }) }, ) .argument(async_graphql::dynamic::InputValue::new( "first", TypeRef::named(TypeRef::INT), )) .argument(async_graphql::dynamic::InputValue::new( "after", TypeRef::named(TypeRef::STRING), )) .argument(async_graphql::dynamic::InputValue::new( "last", TypeRef::named(TypeRef::INT), )) .argument(async_graphql::dynamic::InputValue::new( "before", TypeRef::named(TypeRef::STRING), )) .argument(async_graphql::dynamic::InputValue::new( "sortBy", TypeRef::named_list(format!("{}SortFieldInput", type_name)), )) .argument(async_graphql::dynamic::InputValue::new( "where", TypeRef::named(format!("{}WhereInput", type_name)), )) .description(format!("Query {} records", nsid)), ); // Add aggregated query field for this collection let aggregated_query_name = format!("{}Aggregated", collection_query_name); let aggregated_type_name = format!("{}Aggregated", &type_name); // Create aggregated type let aggregated_type = create_aggregated_type(&aggregated_type_name, &fields); objects_to_register.push(aggregated_type); let db_clone_agg = database.clone(); let slice_clone_agg = slice_uri.clone(); let nsid_clone_agg = nsid.to_string(); query = query.field( Field::new( &aggregated_query_name, TypeRef::named_nn_list_nn(&aggregated_type_name), move |ctx| { let db = db_clone_agg.clone(); let slice = slice_clone_agg.clone(); let collection = nsid_clone_agg.clone(); FieldFuture::new(async move { // Parse groupBy argument let group_by_fields: Vec = match ctx.args.get("groupBy") { Some(val) => { if let Ok(list) = val.list() { let mut fields = Vec::new(); for item in list.iter() { if let Ok(obj) = item.object() { // Get field name from enum let field_name = obj.get("field") .and_then(|v| v.enum_name().ok().map(|s| s.to_string())) .ok_or_else(|| Error::new("Missing field name in groupBy"))?; // Get optional interval if let Some(interval_val) = obj.get("interval") { if let Ok(interval_str) = interval_val.enum_name() { // Parse interval string to DateInterval let interval = match interval_str { "second" => crate::models::DateInterval::Second, "minute" => crate::models::DateInterval::Minute, "hour" => crate::models::DateInterval::Hour, "day" => crate::models::DateInterval::Day, "week" => crate::models::DateInterval::Week, "month" => crate::models::DateInterval::Month, "quarter" => crate::models::DateInterval::Quarter, "year" => crate::models::DateInterval::Year, _ => return Err(Error::new(format!("Invalid interval: {}", interval_str))), }; fields.push(crate::models::GroupByField::Truncated { field: field_name, interval, }); } else { return Err(Error::new("Invalid interval value")); } } else { // No interval, simple field fields.push(crate::models::GroupByField::Simple(field_name)); } } else { return Err(Error::new("Invalid groupBy item")); } } fields } else { Vec::new() } } None => Vec::new(), }; if group_by_fields.is_empty() { return Err(Error::new("groupBy is required for aggregated queries")); } // Parse limit argument let limit: i32 = match ctx.args.get("limit") { Some(val) => val.i64().unwrap_or(50) as i32, None => 50, }; // Parse orderBy argument let order_by_count: Option = match ctx.args.get("orderBy") { Some(val) => { if let Ok(obj) = val.object() { obj.get("count") .and_then(|v| v.string().ok()) .map(|s| s.to_string()) } else { None } } None => None, }; // Build where clause for this collection let mut where_clause = crate::models::WhereClause { conditions: HashMap::new(), or_conditions: None, and: None, or: None, }; // Always filter by collection where_clause.conditions.insert( "collection".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(collection.clone())), in_values: None, contains: None, fuzzy: None, }, ); // Parse where argument if provided if let Some(where_val) = ctx.args.get("where") { if let Ok(where_obj) = where_val.object() { let parsed_where = parse_where_clause(where_obj); // Merge parsed conditions with existing collection filter where_clause.conditions.extend(parsed_where.conditions); where_clause.or_conditions = parsed_where.or_conditions; where_clause.and = parsed_where.and; where_clause.or = parsed_where.or; } } // Resolve actorHandle to did if present if let Some(actor_handle_condition) = where_clause.conditions.remove("actorHandle") { // Handle different filter types let dids = if let Some(pattern) = &actor_handle_condition.contains { // Pattern matching (contains) db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() } else if let Some(pattern) = &actor_handle_condition.fuzzy { // Fuzzy matching db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() } else { // Exact matching (eq / in_values) let mut handles = Vec::new(); if let Some(eq_value) = &actor_handle_condition.eq { if let Some(handle_str) = eq_value.as_str() { handles.push(handle_str.to_string()); } } if let Some(in_values) = &actor_handle_condition.in_values { for value in in_values { if let Some(handle_str) = value.as_str() { handles.push(handle_str.to_string()); } } } if !handles.is_empty() { db.resolve_handles_to_dids(&handles, &slice).await.ok() } else { None } }; // Replace actorHandle condition with did condition if we found matches if let Some(dids) = dids { if !dids.is_empty() { let did_condition = if dids.len() == 1 { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(dids[0].clone())), in_values: None, contains: None, fuzzy: None, } } else { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: None, in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()), contains: None, fuzzy: None, } }; where_clause.conditions.insert("did".to_string(), did_condition); } // If no DIDs found, the query will return 0 results naturally } } // Resolve actorHandle to did in OR clauses if let Some(or_clauses) = &mut where_clause.or { for or_clause in or_clauses.iter_mut() { if let Some(actor_handle_condition) = or_clause.conditions.remove("actorHandle") { // Handle different filter types let dids = if let Some(pattern) = &actor_handle_condition.contains { db.resolve_handle_pattern_to_dids(pattern, &slice).await.ok() } else if let Some(pattern) = &actor_handle_condition.fuzzy { db.resolve_handle_fuzzy_to_dids(pattern, &slice).await.ok() } else { let mut handles = Vec::new(); if let Some(eq_value) = &actor_handle_condition.eq { if let Some(handle_str) = eq_value.as_str() { handles.push(handle_str.to_string()); } } if let Some(in_values) = &actor_handle_condition.in_values { for value in in_values { if let Some(handle_str) = value.as_str() { handles.push(handle_str.to_string()); } } } if !handles.is_empty() { db.resolve_handles_to_dids(&handles, &slice).await.ok() } else { None } }; // Replace actorHandle with did condition if let Some(dids) = dids { if !dids.is_empty() { let did_condition = if dids.len() == 1 { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(dids[0].clone())), in_values: None, contains: None, fuzzy: None, } } else { crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: None, in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()), contains: None, fuzzy: None, } }; or_clause.conditions.insert("did".to_string(), did_condition); } } } } } // Query database for aggregated records let results = db .get_aggregated_records( &slice, &group_by_fields, Some(&where_clause), order_by_count.as_deref(), Some(limit), ) .await .map_err(|e| { Error::new(format!("Aggregation query failed: {}", e)) })?; // Convert JSON values to GraphQL values let field_values: Vec> = results .into_iter() .map(|json_val| FieldValue::owned_any(json_val)) .collect(); Ok(Some(FieldValue::list(field_values))) }) }, ) .argument(async_graphql::dynamic::InputValue::new( "groupBy", TypeRef::named_nn_list(format!("{}GroupByFieldInput", type_name)), )) .argument(async_graphql::dynamic::InputValue::new( "where", TypeRef::named(format!("{}WhereInput", type_name)), )) .argument(async_graphql::dynamic::InputValue::new( "orderBy", TypeRef::named("AggregationOrderBy"), )) .argument(async_graphql::dynamic::InputValue::new( "limit", TypeRef::named(TypeRef::INT), )) .description(format!("Aggregated query for {} records with GROUP BY support", nsid)), ); } } // Add jetstreamLogs query to root query type query = add_jetstream_logs_query(query, database.clone(), slice_uri.clone()); // Add sync job queries query = add_sync_job_query(query); query = add_sync_jobs_query(query, slice_uri.clone()); query = add_sync_job_logs_query(query); query = add_get_sync_summary_query(query); // Add sparklines query query = add_sparklines_query(query); // Add slice records query query = add_slice_records_query(query, database.clone()); // Add OAuth clients query query = add_oauth_clients_query(query, slice_uri.clone(), auth_base_url.clone()); // Build Mutation type let mut mutation = create_mutation_type(database.clone(), slice_uri.clone(), &lexicons); // Add startSync mutation mutation = add_start_sync_mutation(mutation, slice_uri.clone()); // Add cancelJob mutation mutation = add_cancel_job_mutation(mutation); // Add deleteJob mutation mutation = add_delete_job_mutation(mutation); // Add uploadBlob mutation mutation = add_upload_blob_mutation(mutation, auth_base_url.clone()); // Add OAuth client mutations mutation = add_create_oauth_client_mutation(mutation, auth_base_url.clone()); mutation = add_update_oauth_client_mutation(mutation, auth_base_url.clone()); mutation = add_delete_oauth_client_mutation(mutation, auth_base_url.clone()); // Add deleteSliceRecords mutation mutation = add_delete_slice_records_mutation(mutation, slice_uri.clone()); // Build Subscription type with collection-specific subscriptions let mut subscription = create_subscription_type(slice_uri.clone(), &lexicons); // Add jetstreamLogsCreated subscription subscription = add_jetstream_logs_subscription(subscription, slice_uri.clone()); // Add syncJobUpdated subscription subscription = add_sync_job_subscription(subscription); // Build and return the schema with complexity limits let mut schema_builder = Schema::build( query.type_name(), Some(mutation.type_name()), Some(subscription.type_name()), ) .register(query) .register(mutation) .register(subscription) .limit_depth(50) // Higher limit to support GraphiQL introspection with reverse joins .limit_complexity(5000); // Prevent expensive deeply nested queries // Register JSON scalar type for complex fields let json_scalar = Scalar::new("JSON"); schema_builder = schema_builder.register(json_scalar); // Register filter input types for WHERE clauses let string_filter = InputObject::new("StringFilter") .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING))) .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING))); schema_builder = schema_builder.register(string_filter); let int_filter = InputObject::new("IntFilter") .field(InputValue::new("eq", TypeRef::named(TypeRef::INT))) .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT))) .field(InputValue::new("gt", TypeRef::named(TypeRef::INT))) .field(InputValue::new("gte", TypeRef::named(TypeRef::INT))) .field(InputValue::new("lt", TypeRef::named(TypeRef::INT))) .field(InputValue::new("lte", TypeRef::named(TypeRef::INT))); schema_builder = schema_builder.register(int_filter); let datetime_filter = InputObject::new("DateTimeFilter") .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("gte", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("lte", TypeRef::named(TypeRef::STRING))); schema_builder = schema_builder.register(datetime_filter); // Register Blob type let blob_type = create_blob_type(); schema_builder = schema_builder.register(blob_type); // Register SyncResult type for mutations let sync_result_type = create_sync_result_type(); schema_builder = schema_builder.register(sync_result_type); // Register BlobUploadResponse type for blob upload mutation let blob_upload_response_type = create_blob_upload_response_type(); schema_builder = schema_builder.register(blob_upload_response_type); // Register OAuthClient type for OAuth operations let oauth_client_type = create_oauth_client_type(); schema_builder = schema_builder.register(oauth_client_type); // Register MutationResponse type for dynamic mutations let mutation_response_type = create_mutation_response_type(); schema_builder = schema_builder.register(mutation_response_type); // Register SortDirection enum let sort_direction_enum = create_sort_direction_enum(); schema_builder = schema_builder.register(sort_direction_enum); // Register SortField input type let sort_field_input = create_sort_field_input(); schema_builder = schema_builder.register(sort_field_input); // Register condition input types for where clauses let string_condition_input = create_string_condition_input(); schema_builder = schema_builder.register(string_condition_input); let int_condition_input = create_int_condition_input(); schema_builder = schema_builder.register(int_condition_input); // Register AggregationOrderBy input type let aggregation_order_by_input = create_aggregation_order_by_input(); schema_builder = schema_builder.register(aggregation_order_by_input); // Register DateInterval enum for date truncation let date_interval_enum = create_date_interval_enum(); schema_builder = schema_builder.register(date_interval_enum); // Register PageInfo type let page_info_type = create_page_info_type(); schema_builder = schema_builder.register(page_info_type); // Register RecordUpdate type for subscriptions let record_update_type = create_record_update_type(); schema_builder = schema_builder.register(record_update_type); // Register JetstreamLogEntry type let jetstream_log_entry_type = create_jetstream_log_entry_type(); schema_builder = schema_builder.register(jetstream_log_entry_type); // Register SyncJob and SyncJobResult types let sync_job_type = create_sync_job_type(); schema_builder = schema_builder.register(sync_job_type); let sync_job_result_type = create_sync_job_result_type(); schema_builder = schema_builder.register(sync_job_result_type); // Register StartSyncOutput type (created via schema_ext) let start_sync_output = create_start_sync_output_type(); schema_builder = schema_builder.register(start_sync_output); // Register SyncSummary and CollectionSummary types let sync_summary_type = create_sync_summary_type(); schema_builder = schema_builder.register(sync_summary_type); let collection_summary_type = create_collection_summary_type(); schema_builder = schema_builder.register(collection_summary_type); // Register Sparklines types let sparkline_point_type = create_sparkline_point_type(); schema_builder = schema_builder.register(sparkline_point_type); let slice_sparkline_type = create_slice_sparkline_type(); schema_builder = schema_builder.register(slice_sparkline_type); // Register Stats types let collection_stats_type = create_collection_stats_type(); schema_builder = schema_builder.register(collection_stats_type); let slice_stats_type = create_slice_stats_type(); schema_builder = schema_builder.register(slice_stats_type); // Register Records types let slice_record_type = create_slice_record_type(); schema_builder = schema_builder.register(slice_record_type); let slice_record_edge_type = create_slice_record_edge_type(); schema_builder = schema_builder.register(slice_record_edge_type); let slice_records_connection_type = create_slice_records_connection_type(); schema_builder = schema_builder.register(slice_records_connection_type); let slice_records_where_input = create_slice_records_where_input(); schema_builder = schema_builder.register(slice_records_where_input); // Register DeleteSliceRecordsOutput type let delete_slice_records_output_type = create_delete_slice_records_output_type(); schema_builder = schema_builder.register(delete_slice_records_output_type); // Register all object types for obj in objects_to_register { schema_builder = schema_builder.register(obj); } // Register all WhereInput types for where_input in where_inputs_to_register { schema_builder = schema_builder.register(where_input); } // Register all GroupByField enums for group_by_enum in group_by_enums_to_register { schema_builder = schema_builder.register(group_by_enum); } // Register all mutation input types for mutation_input in mutation_inputs_to_register { schema_builder = schema_builder.register(mutation_input); } // Register all nested object types from the type registry for (_, nested_type) in type_registry { schema_builder = schema_builder.register(nested_type); } schema_builder .finish() .map_err(|e| format!("Schema build error: {:?}", e)) } /// Container to hold record data for resolvers #[derive(Clone)] pub struct RecordContainer { pub record: crate::models::IndexedRecord, } /// Container to hold blob data and DID for URL generation #[derive(Clone)] pub struct BlobContainer { pub blob_ref: String, // CID reference pub mime_type: String, // MIME type pub size: i64, // Size in bytes pub did: String, // DID for CDN URL generation } /// Creates a GraphQL Object type for a record collection fn create_record_type( type_name: &str, fields: &[GraphQLField], database: Database, slice_uri: String, all_collections: &[CollectionMeta], auth_base_url: String, type_registry: &mut TypeRegistry, all_lexicons: &[serde_json::Value], lexicon_nsid: &str, ) -> Object { let mut object = Object::new(type_name); // Check which field names exist in lexicon to avoid conflicts let lexicon_field_names: std::collections::HashSet<&str> = fields.iter().map(|f| f.name.as_str()).collect(); // Add global ID field for Relay (base64 encoded typename:uri) // Only add if lexicon doesn't already have an "id" field if !lexicon_field_names.contains("id") { let type_name_for_id = type_name.to_string(); object = object.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), move |ctx| { let typename = type_name_for_id.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let global_id = format!("{}:{}", typename, container.record.uri); let encoded_id = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, global_id.as_bytes()); Ok(Some(GraphQLValue::from(encoded_id))) }) })); } // Add standard AT Protocol fields only if they don't conflict with lexicon fields if !lexicon_field_names.contains("uri") { object = object.field(Field::new( "uri", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.record.uri.clone()))) }) }, )); } if !lexicon_field_names.contains("cid") { object = object.field(Field::new( "cid", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.record.cid.clone()))) }) }, )); } if !lexicon_field_names.contains("did") { object = object.field(Field::new( "did", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.record.did.clone()))) }) }, )); } if !lexicon_field_names.contains("indexedAt") { object = object.field(Field::new( "indexedAt", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from( container.record.indexed_at.clone(), ))) }) }, )); } // Add actor metadata field (handle from actors table) // Always add as "actorHandle" to avoid conflicts with lexicon fields let db_for_actor = database.clone(); let slice_for_actor = slice_uri.clone(); object = object.field(Field::new( "actorHandle", TypeRef::named(TypeRef::STRING), move |ctx| { let db = db_for_actor.clone(); let slice = slice_for_actor.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let did = &container.record.did; // Build where clause to find actor by DID let mut where_clause = crate::models::WhereClause { conditions: std::collections::HashMap::new(), or_conditions: None, and: None, or: None, }; where_clause.conditions.insert( "did".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(did.clone())), in_values: None, contains: None, fuzzy: None, }, ); match db .get_slice_actors(&slice, Some(1), None, Some(&where_clause)) .await { Ok((actors, _cursor)) => { if let Some(actor) = actors.first() { if let Some(handle) = &actor.handle { Ok(Some(GraphQLValue::from(handle.clone()))) } else { Ok(None) } } else { Ok(None) } } Err(e) => { tracing::debug!("Actor not found for {}: {}", did, e); Ok(None) } } }) }, )); // Add fields from lexicon for field in fields { let field_name = field.name.clone(); let field_name_for_field = field_name.clone(); // Need separate clone for Field::new let field_type = field.field_type.clone(); let db_clone = database.clone(); // Determine type ref - handle nested objects and lexicon refs specially let type_ref = match &field.field_type { GraphQLType::LexiconRef(ref_nsid) => { // Resolve lexicon ref and generate type for it let resolved_type_name = resolve_lexicon_ref_type( ref_nsid, lexicon_nsid, all_lexicons, type_registry, &database, ); if field.is_required { TypeRef::named_nn(resolved_type_name) } else { TypeRef::named(resolved_type_name) } } GraphQLType::Object(nested_fields) => { // Generate nested object type let nested_type_name = generate_nested_type_name(type_name, &field_name); let actual_type_name = generate_nested_object_type( &nested_type_name, nested_fields, type_registry, &database, ); if field.is_required { TypeRef::named_nn(actual_type_name) } else { TypeRef::named(actual_type_name) } } GraphQLType::Array(inner) => { match inner.as_ref() { GraphQLType::LexiconRef(ref_nsid) => { // Resolve lexicon ref for array items let resolved_type_name = resolve_lexicon_ref_type( ref_nsid, lexicon_nsid, all_lexicons, type_registry, &database, ); if field.is_required { TypeRef::named_nn_list(resolved_type_name) } else { TypeRef::named_list(resolved_type_name) } } GraphQLType::Object(nested_fields) => { // Generate nested object type for array items let nested_type_name = generate_nested_type_name(type_name, &field_name); let actual_type_name = generate_nested_object_type( &nested_type_name, nested_fields, type_registry, &database, ); if field.is_required { TypeRef::named_nn_list(actual_type_name) } else { TypeRef::named_list(actual_type_name) } } _ => graphql_type_to_typeref(&field.field_type, field.is_required), } } _ => graphql_type_to_typeref(&field.field_type, field.is_required), }; object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| { let field_name = field_name.clone(); let field_type = field_type.clone(); let db = db_clone.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let value = container.record.value.get(&field_name); if let Some(val) = value { // Check for explicit null value if val.is_null() { return Ok(None); } // Check if this is an array of blobs if let GraphQLType::Array(inner) = &field_type { if matches!(inner.as_ref(), GraphQLType::Blob) { if let Some(arr) = val.as_array() { let blob_containers: Vec = arr .iter() .filter_map(|blob_val| { let obj = blob_val.as_object()?; let blob_ref = obj .get("ref") .and_then(|r| r.as_object()) .and_then(|r| r.get("$link")) .and_then(|l| l.as_str()) .unwrap_or("") .to_string(); let mime_type = obj .get("mimeType") .and_then(|m| m.as_str()) .unwrap_or("image/jpeg") .to_string(); let size = obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0); let blob_container = BlobContainer { blob_ref, mime_type, size, did: container.record.did.clone(), }; Some(FieldValue::owned_any(blob_container)) }) .collect(); return Ok(Some(FieldValue::list(blob_containers))); } // If not a proper array, return empty list return Ok(Some(FieldValue::list(Vec::::new()))); } } // Check if this is a blob field if matches!(field_type, GraphQLType::Blob) { // Extract blob fields from JSON object if let Some(obj) = val.as_object() { let blob_ref = obj .get("ref") .and_then(|r| r.as_object()) .and_then(|r| r.get("$link")) .and_then(|l| l.as_str()) .unwrap_or("") .to_string(); let mime_type = obj .get("mimeType") .and_then(|m| m.as_str()) .unwrap_or("image/jpeg") .to_string(); let size = obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0); let blob_container = BlobContainer { blob_ref, mime_type, size, did: container.record.did.clone(), }; return Ok(Some(FieldValue::owned_any(blob_container))); } // If not a proper blob object, return None (field is null) return Ok(None); } // Check if this is a reference field that needs joining if matches!(field_type, GraphQLType::Ref) { // Extract URI from strongRef and fetch the linked record if let Some(uri) = crate::graphql::dataloaders::extract_uri_from_strong_ref(val) { match db.get_record(&uri).await { Ok(Some(linked_record)) => { // Convert the linked record to a JSON value let record_json = serde_json::to_value(linked_record).map_err(|e| { Error::new(format!("Serialization error: {}", e)) })?; // Convert serde_json::Value to async_graphql::Value let graphql_val = json_to_graphql_value(&record_json); return Ok(Some(FieldValue::value(graphql_val))); } Ok(None) => { return Ok(None); } Err(e) => { tracing::error!("Error fetching linked record: {}", e); return Ok(None); } } } } // Check if this is a lexicon ref field if matches!(field_type, GraphQLType::LexiconRef(_)) { let nested_container = NestedObjectContainer { data: val.clone(), }; return Ok(Some(FieldValue::owned_any(nested_container))); } // Check if this is a nested object field if matches!(field_type, GraphQLType::Object(_)) { let nested_container = NestedObjectContainer { data: val.clone(), }; return Ok(Some(FieldValue::owned_any(nested_container))); } // Check if this is an array of nested objects or lexicon refs if let GraphQLType::Array(inner) = &field_type { if matches!(inner.as_ref(), GraphQLType::LexiconRef(_)) || matches!(inner.as_ref(), GraphQLType::Object(_)) { if let Some(arr) = val.as_array() { let containers: Vec = arr .iter() .map(|item| { let nested_container = NestedObjectContainer { data: item.clone(), }; FieldValue::owned_any(nested_container) }) .collect(); return Ok(Some(FieldValue::list(containers))); } return Ok(Some(FieldValue::list(Vec::::new()))); } } // For non-ref, non-object fields, return the raw JSON value let graphql_val = json_to_graphql_value(val); Ok(Some(FieldValue::value(graphql_val))) } else { Ok(None) } }) })); } // Add join fields for cross-referencing other collections for collection in all_collections { let field_name = nsid_to_join_field_name(&collection.nsid); // Skip if this would conflict with existing field if lexicon_field_names.contains(field_name.as_str()) { continue; } // Skip self-referencing join fields (collection referencing itself) if collection.type_name == type_name { continue; } // Collect all string fields with format "at-uri" that might reference this collection // We'll check each one at runtime to see if it contains a URI to this collection let uri_ref_fields: Vec<_> = fields .iter() .filter(|f| matches!(f.format.as_deref(), Some("at-uri"))) .collect(); let collection_nsid = collection.nsid.clone(); let key_type = collection.key_type.clone(); let db_for_join = database.clone(); // If we found at-uri fields, create a resolver that checks each one at runtime // But skip this for literal:self collections - they should use DID-based joins instead if !uri_ref_fields.is_empty() && key_type != "literal:self" { let ref_field_names: Vec = uri_ref_fields.iter().map(|f| f.name.clone()).collect(); let db_for_uri_join = database.clone(); let target_collection = collection_nsid.clone(); object = object.field(Field::new( &field_name, TypeRef::named(&collection.type_name), move |ctx| { let db = db_for_uri_join.clone(); let field_names = ref_field_names.clone(); let expected_collection = target_collection.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; // Try each at-uri field to find one that references this collection for field_name in &field_names { if let Some(uri_value) = container.record.value.get(field_name) { if let Some(uri) = uri_value.as_str() { // Check if the URI is for the expected collection if uri.contains(&format!("/{}/", expected_collection)) { // Fetch the record at this URI match db.get_record(uri).await { Ok(Some(record)) => { let new_container = RecordContainer { record }; return Ok(Some(FieldValue::owned_any( new_container, ))); } Ok(None) => continue, // Try next field Err(_) => continue, // Try next field } } } } } // No matching URI found in any field Ok(None) }) }, )); // Skip DID-based join logic for non-literal:self collections continue; } // Determine type and resolver based on key_type match key_type.as_str() { "literal:self" => { // Single record per DID - return nullable object of the collection's type let slice_for_self_join = slice_uri.clone(); object = object.field(Field::new( &field_name, TypeRef::named(&collection.type_name), move |ctx| { let db = db_for_join.clone(); let nsid = collection_nsid.clone(); let slice = slice_for_self_join.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let did = container.record.did.clone(); let uri = format!("at://{}/{}/self", did, nsid); // Query with slice_uri filter to ensure we get the right record let mut where_clause = crate::models::WhereClause { conditions: std::collections::HashMap::new(), or_conditions: None, and: None, or: None, }; where_clause.conditions.insert( "collection".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(nsid.clone())), in_values: None, contains: None, fuzzy: None, }, ); where_clause.conditions.insert( "did".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(did.clone())), in_values: None, contains: None, fuzzy: None, }, ); match db.get_slice_collections_records( &slice, Some(1), None, None, Some(&where_clause), ).await { Ok((records, _cursor)) => { if let Some(record) = records.into_iter().next() { let indexed_record = crate::models::IndexedRecord { uri: record.uri, cid: record.cid, did: record.did, collection: record.collection, value: record.json, indexed_at: record.indexed_at.to_rfc3339(), }; let new_container = RecordContainer { record: indexed_record }; Ok(Some(FieldValue::owned_any(new_container))) } else { Ok(None) } } Err(e) => { tracing::debug!("Join query error for {}: {}", uri, e); Ok(None) } } }) }, )); } "tid" | "any" => { // Skip - these are handled as plural reverse joins below with URI filtering continue; // Multiple records per DID - return array of the collection's type (DISABLED) /*object = object.field( Field::new( &field_name, TypeRef::named_nn_list_nn(&collection.type_name), move |ctx| { let nsid = collection_nsid.clone(); let slice = slice_for_join.clone(); let db_fallback = db_for_join.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let did = &container.record.did; // Get limit from argument, default to 50 let limit = ctx.args.get("limit") .and_then(|v| v.i64().ok()) .map(|i| i as usize) .unwrap_or(50) .min(100); // Cap at 100 to prevent abuse // Try to get DataLoader from context if let Some(gql_ctx) = ctx.data_opt::() { // Use DataLoader for batched loading let key = CollectionDidKey { slice_uri: slice.clone(), collection: nsid.clone(), did: did.clone(), }; match gql_ctx.collection_did_loader.load_one(key).await { Ok(Some(mut records)) => { // Apply limit after loading records.truncate(limit); let values: Vec = records .into_iter() .map(|indexed_record| { let container = RecordContainer { record: indexed_record, }; FieldValue::owned_any(container) }) .collect(); Ok(Some(FieldValue::list(values))) } Ok(None) => { Ok(Some(FieldValue::list(Vec::::new()))) } Err(e) => { tracing::debug!("DataLoader error for {}: {:?}", nsid, e); Ok(Some(FieldValue::list(Vec::::new()))) } } } else { // Fallback to direct database query if DataLoader not available let db = db_fallback.clone(); let mut where_clause = crate::models::WhereClause { conditions: HashMap::new(), or_conditions: None, and: None, or: None, }; where_clause.conditions.insert( "collection".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(nsid.clone())), in_values: None, contains: None, fuzzy: None, }, ); where_clause.conditions.insert( "did".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(did.clone())), in_values: None, contains: None, fuzzy: None, }, ); match db.get_slice_collections_records( &slice, Some(limit as i32), None, // cursor None, // sort Some(&where_clause), ).await { Ok((records, _cursor)) => { let values: Vec = records .into_iter() .map(|record| { let indexed_record = crate::models::IndexedRecord { uri: record.uri, cid: record.cid, did: record.did, collection: record.collection, value: record.json, indexed_at: record.indexed_at.to_rfc3339(), }; let container = RecordContainer { record: indexed_record, }; FieldValue::owned_any(container) }) .collect(); Ok(Some(FieldValue::list(values))) } Err(e) => { tracing::debug!("Error querying {}: {}", nsid, e); Ok(Some(FieldValue::list(Vec::::new()))) } } } }) }, ) .argument(async_graphql::dynamic::InputValue::new( "limit", TypeRef::named(TypeRef::INT), )) );*/ } _ => { // Unknown key type, skip continue; } } } // Add reverse joins: for every other collection, add a field to query records by DID // This enables bidirectional traversal (e.g., profile.plays and play.profile) for collection in all_collections { let reverse_field_name = format!("{}s", nsid_to_join_field_name(&collection.nsid)); let slice_for_reverse = slice_uri.clone(); let collection_nsid = collection.nsid.clone(); let collection_type = collection.type_name.clone(); let at_uri_fields = collection.at_uri_fields.clone(); object = object.field( Field::new( &reverse_field_name, TypeRef::named_nn_list_nn(&collection_type), move |ctx| { let slice = slice_for_reverse.clone(); let nsid = collection_nsid.clone(); let ref_fields = at_uri_fields.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; // Get limit from argument, default to 50 let limit = ctx.args.get("limit") .and_then(|v| v.i64().ok()) .map(|i| i as usize) .unwrap_or(50) .min(100); // Cap at 100 to prevent abuse // Try to get DataLoader from context if let Some(gql_ctx) = ctx.data_opt::() { let parent_uri = &container.record.uri; // Try each at-uri field from the lexicon tracing::debug!( "Trying reverse join for {} with at-uri fields: {:?}", nsid, ref_fields ); for ref_field in &ref_fields { let key = crate::graphql::dataloader::CollectionUriKey { slice_uri: slice.clone(), collection: nsid.clone(), parent_uri: parent_uri.clone(), reference_field: ref_field.clone(), }; tracing::debug!( "Querying {} via field '{}' for URI: {}", nsid, ref_field, parent_uri ); match gql_ctx.collection_uri_loader.load_one(key).await { Ok(Some(mut records)) => { if !records.is_empty() { tracing::debug!( "Found {} {} records via '{}' field for parent URI: {}", records.len(), nsid, ref_field, parent_uri ); // Apply limit records.truncate(limit); let values: Vec = records .into_iter() .map(|indexed_record| { let container = RecordContainer { record: indexed_record, }; FieldValue::owned_any(container) }) .collect(); return Ok(Some(FieldValue::list(values))); } } Ok(None) => continue, Err(e) => { tracing::debug!("DataLoader error for {} field '{}': {:?}", nsid, ref_field, e); continue; } } } // No records found via any at-uri field tracing::debug!("No {} records found for parent URI: {}", nsid, parent_uri); return Ok(Some(FieldValue::list(Vec::::new()))); } // Fallback: DataLoader not available tracing::debug!("DataLoader not available for reverse join"); Ok(Some(FieldValue::list(Vec::::new()))) }) }, ) .argument(async_graphql::dynamic::InputValue::new( "limit", TypeRef::named(TypeRef::INT), )) ); // Add count field for the reverse join let count_field_name = format!("{}Count", reverse_field_name); let db_for_count = database.clone(); let slice_for_count = slice_uri.clone(); let collection_for_count = collection.nsid.clone(); let at_uri_fields_for_count = collection.at_uri_fields.clone(); object = object.field(Field::new( &count_field_name, TypeRef::named_nn(TypeRef::INT), move |ctx| { let slice = slice_for_count.clone(); let nsid = collection_for_count.clone(); let db = db_for_count.clone(); let ref_fields = at_uri_fields_for_count.clone(); FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let parent_uri = &container.record.uri; // Build where clause to count records referencing this URI for ref_field in &ref_fields { let mut where_clause = crate::models::WhereClause { conditions: HashMap::new(), or_conditions: None, and: None, or: None, }; where_clause.conditions.insert( "collection".to_string(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(nsid.clone())), in_values: None, contains: None, fuzzy: None, }, ); where_clause.conditions.insert( ref_field.clone(), crate::models::WhereCondition { gt: None, gte: None, lt: None, lte: None, eq: Some(serde_json::Value::String(parent_uri.clone())), in_values: None, contains: None, fuzzy: None, }, ); match db .count_slice_collections_records(&slice, Some(&where_clause)) .await { Ok(count) if count > 0 => { return Ok(Some(FieldValue::value(count as i32))); } Ok(_) => continue, Err(e) => { tracing::debug!("Count error for {}: {}", nsid, e); continue; } } } // No matching field found, return 0 Ok(Some(FieldValue::value(0))) }) }, )); } // Add sparklines, stats, and oauth clients fields for NetworkSlicesSlice type if type_name == "NetworkSlicesSlice" { object = add_sparklines_field_to_slice(object, database.clone()); object = add_stats_field_to_slice(object, database.clone()); object = add_oauth_clients_field_to_slice(object, auth_base_url); } object } /// Convert serde_json::Value to async_graphql::Value fn json_to_graphql_value(val: &serde_json::Value) -> GraphQLValue { match val { serde_json::Value::Null => GraphQLValue::Null, serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b), serde_json::Value::Number(n) => { if let Some(i) = n.as_i64() { GraphQLValue::Number((i as i32).into()) } else if let Some(f) = n.as_f64() { GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into()) } else { GraphQLValue::Null } } serde_json::Value::String(s) => GraphQLValue::String(s.clone()), serde_json::Value::Array(arr) => { GraphQLValue::List(arr.iter().map(json_to_graphql_value).collect()) } serde_json::Value::Object(obj) => { let mut map = async_graphql::indexmap::IndexMap::new(); for (k, v) in obj { map.insert( async_graphql::Name::new(k.as_str()), json_to_graphql_value(v), ); } GraphQLValue::Object(map) } } } /// Converts GraphQL type to TypeRef for async-graphql fn graphql_type_to_typeref(gql_type: &GraphQLType, is_required: bool) -> TypeRef { match gql_type { GraphQLType::String => { if is_required { TypeRef::named_nn(TypeRef::STRING) } else { TypeRef::named(TypeRef::STRING) } } GraphQLType::Int => { if is_required { TypeRef::named_nn(TypeRef::INT) } else { TypeRef::named(TypeRef::INT) } } GraphQLType::Boolean => { if is_required { TypeRef::named_nn(TypeRef::BOOLEAN) } else { TypeRef::named(TypeRef::BOOLEAN) } } GraphQLType::Float => { if is_required { TypeRef::named_nn(TypeRef::FLOAT) } else { TypeRef::named(TypeRef::FLOAT) } } GraphQLType::Blob => { // Blob object type with url resolver // Always nullable since blob data might be missing or malformed TypeRef::named("Blob") } GraphQLType::Json | GraphQLType::Ref | GraphQLType::LexiconRef(_) | GraphQLType::Object(_) | GraphQLType::Union => { // JSON scalar type - linked records, lexicon refs, and complex objects return as JSON (fallback) if is_required { TypeRef::named_nn("JSON") } else { TypeRef::named("JSON") } } GraphQLType::Array(inner) => { // For arrays of primitives, use typed arrays // For arrays of complex types, use JSON scalar match inner.as_ref() { GraphQLType::String | GraphQLType::Int | GraphQLType::Boolean | GraphQLType::Float => { let inner_ref = match inner.as_ref() { GraphQLType::String => TypeRef::STRING, GraphQLType::Int => TypeRef::INT, GraphQLType::Boolean => TypeRef::BOOLEAN, GraphQLType::Float => TypeRef::FLOAT, _ => unreachable!(), }; if is_required { TypeRef::named_nn_list_nn(inner_ref) } else { TypeRef::named_list(inner_ref) } } GraphQLType::Blob => { // Arrays of blobs - return list of Blob objects if is_required { TypeRef::named_nn_list("Blob") } else { TypeRef::named_list("Blob") } } _ => { // Arrays of complex types (objects, etc.) are just JSON if is_required { TypeRef::named_nn("JSON") } else { TypeRef::named("JSON") } } } } } } /// Creates the Blob GraphQL type with url resolver fn create_blob_type() -> Object { let mut blob = Object::new("Blob"); // ref field - CID reference blob = blob.field(Field::new( "ref", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.blob_ref.clone()))) }) }, )); // mimeType field blob = blob.field(Field::new( "mimeType", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.mime_type.clone()))) }) }, )); // size field blob = blob.field(Field::new("size", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.size as i32))) }) })); // url(preset) field with argument blob = blob.field( Field::new("url", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; // Get preset argument, default to "feed_fullsize" let preset: String = match ctx.args.get("preset") { Some(val) => val.string().unwrap_or("feed_fullsize").to_string(), None => "feed_fullsize".to_string(), }; // Build CDN URL: https://cdn.bsky.app/img/{preset}/plain/{did}/{cid}@jpeg let cdn_base_url = "https://cdn.bsky.app/img"; let url = format!( "{}/{}/plain/{}/{}@jpeg", cdn_base_url, preset, container.did, container.blob_ref ); Ok(Some(GraphQLValue::from(url))) }) }) .argument(async_graphql::dynamic::InputValue::new( "preset", TypeRef::named(TypeRef::STRING), )) .description("Generate CDN URL for the blob with the specified preset (avatar, banner, feed_thumbnail, feed_fullsize)"), ); blob } /// Creates the SyncResult GraphQL type for mutation responses fn create_sync_result_type() -> Object { let mut sync_result = Object::new("SyncResult"); sync_result = sync_result.field(Field::new( "success", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast sync result"))?; if let GraphQLValue::Object(obj) = value { if let Some(success) = obj.get("success") { return Ok(Some(success.clone())); } } Ok(None) }) }, )); sync_result = sync_result.field(Field::new( "reposProcessed", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast sync result"))?; if let GraphQLValue::Object(obj) = value { if let Some(repos) = obj.get("reposProcessed") { return Ok(Some(repos.clone())); } } Ok(None) }) }, )); sync_result = sync_result.field(Field::new( "recordsSynced", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast sync result"))?; if let GraphQLValue::Object(obj) = value { if let Some(records) = obj.get("recordsSynced") { return Ok(Some(records.clone())); } } Ok(None) }) }, )); sync_result = sync_result.field(Field::new( "timedOut", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast sync result"))?; if let GraphQLValue::Object(obj) = value { if let Some(timed_out) = obj.get("timedOut") { return Ok(Some(timed_out.clone())); } } Ok(None) }) }, )); sync_result = sync_result.field(Field::new( "message", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast sync result"))?; if let GraphQLValue::Object(obj) = value { if let Some(message) = obj.get("message") { return Ok(Some(message.clone())); } } Ok(None) }) }, )); sync_result } /// Creates the SortDirection enum type fn create_sort_direction_enum() -> Enum { Enum::new("SortDirection") .item(EnumItem::new("asc")) .item(EnumItem::new("desc")) } /// Creates the SortField input type fn create_sort_field_input() -> InputObject { InputObject::new("SortField") .field(InputValue::new("field", TypeRef::named_nn(TypeRef::STRING))) .field(InputValue::new( "direction", TypeRef::named_nn("SortDirection"), )) } /// Creates the StringCondition input type for string field filtering fn create_string_condition_input() -> InputObject { InputObject::new("StringCondition") .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING))) .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING))) .field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING))) } /// Creates the IntCondition input type for int field filtering fn create_int_condition_input() -> InputObject { InputObject::new("IntCondition") .field(InputValue::new("eq", TypeRef::named(TypeRef::INT))) .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT))) } /// Creates the PageInfo type for connection pagination fn create_page_info_type() -> Object { let mut page_info = Object::new("PageInfo"); page_info = page_info.field(Field::new( "hasNextPage", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; if let GraphQLValue::Object(obj) = value { if let Some(has_next) = obj.get("hasNextPage") { return Ok(Some(has_next.clone())); } } Ok(Some(GraphQLValue::from(false))) }) }, )); page_info = page_info.field(Field::new( "hasPreviousPage", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; if let GraphQLValue::Object(obj) = value { if let Some(has_prev) = obj.get("hasPreviousPage") { return Ok(Some(has_prev.clone())); } } Ok(Some(GraphQLValue::from(false))) }) }, )); page_info = page_info.field(Field::new( "startCursor", TypeRef::named(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; if let GraphQLValue::Object(obj) = value { if let Some(cursor) = obj.get("startCursor") { return Ok(Some(cursor.clone())); } } Ok(None) }) }, )); page_info = page_info.field(Field::new( "endCursor", TypeRef::named(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?; if let GraphQLValue::Object(obj) = value { if let Some(cursor) = obj.get("endCursor") { return Ok(Some(cursor.clone())); } } Ok(None) }) }, )); page_info } /// Connection data structure that holds all connection fields #[derive(Clone)] struct ConnectionData { total_count: i32, has_next_page: bool, end_cursor: Option, nodes: Vec, } /// Edge data structure for Relay connections #[derive(Clone)] struct EdgeData { node: RecordContainer, cursor: String, } /// Creates an Edge type for a given record type /// Example: For "Post" creates "PostEdge" with node and cursor fn create_edge_type(record_type_name: &str) -> Object { let edge_name = format!("{}Edge", record_type_name); let mut edge = Object::new(&edge_name); // Add node field let record_type = record_type_name.to_string(); edge = edge.field(Field::new("node", TypeRef::named_nn(&record_type), |ctx| { FieldFuture::new(async move { let edge_data = ctx.parent_value.try_downcast_ref::()?; Ok(Some(FieldValue::owned_any(edge_data.node.clone()))) }) })); // Add cursor field edge = edge.field(Field::new( "cursor", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let edge_data = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(edge_data.cursor.clone()))) }) }, )); edge } /// Creates a Connection type for a given record type /// Example: For "Post" creates "PostConnection" with edges, pageInfo, and totalCount fn create_connection_type(record_type_name: &str) -> Object { let connection_name = format!("{}Connection", record_type_name); let mut connection = Object::new(&connection_name); // Add totalCount field connection = connection.field(Field::new( "totalCount", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let data = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(data.total_count))) }) }, )); // Add pageInfo field connection = connection.field(Field::new( "pageInfo", TypeRef::named_nn("PageInfo"), |ctx| { FieldFuture::new(async move { let data = ctx.parent_value.try_downcast_ref::()?; let mut page_info = async_graphql::indexmap::IndexMap::new(); page_info.insert( async_graphql::Name::new("hasNextPage"), GraphQLValue::from(data.has_next_page), ); // For forward pagination only, hasPreviousPage is always false page_info.insert( async_graphql::Name::new("hasPreviousPage"), GraphQLValue::from(false), ); // Add startCursor (first node's cid if available) if !data.nodes.is_empty() { if let Some(first_record) = data.nodes.first() { let start_cursor = general_purpose::URL_SAFE_NO_PAD .encode(first_record.record.cid.clone()); page_info.insert( async_graphql::Name::new("startCursor"), GraphQLValue::from(start_cursor), ); } } // Add endCursor if let Some(ref cursor) = data.end_cursor { page_info.insert( async_graphql::Name::new("endCursor"), GraphQLValue::from(cursor.clone()), ); } Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info)))) }) }, )); // Add edges field (Relay standard) let edge_type = format!("{}Edge", record_type_name); connection = connection.field(Field::new( "edges", TypeRef::named_nn_list_nn(&edge_type), |ctx| { FieldFuture::new(async move { let data = ctx.parent_value.try_downcast_ref::()?; let field_values: Vec> = data .nodes .iter() .map(|node| { // Use base64-encoded CID as cursor let cursor = general_purpose::URL_SAFE_NO_PAD.encode(node.record.cid.clone()); let edge = EdgeData { node: node.clone(), cursor, }; FieldValue::owned_any(edge) }) .collect(); Ok(Some(FieldValue::list(field_values))) }) }, )); // Add nodes field (convenience, direct access to records without edges wrapper) connection = connection.field(Field::new( "nodes", TypeRef::named_nn_list_nn(record_type_name), |ctx| { FieldFuture::new(async move { let data = ctx.parent_value.try_downcast_ref::()?; let field_values: Vec> = data .nodes .iter() .map(|node| FieldValue::owned_any(node.clone())) .collect(); Ok(Some(FieldValue::list(field_values))) }) }, )); connection } /// Creates the Mutation root type with sync operations and dynamic collection mutations fn create_mutation_type( database: Database, slice_uri: String, lexicons: &[serde_json::Value], ) -> Object { let mut mutation = Object::new("Mutation"); // Add syncUserCollections mutation let db_clone = database.clone(); let slice_clone = slice_uri.clone(); mutation = mutation.field( Field::new( "syncUserCollections", TypeRef::named_nn("SyncResult"), move |ctx| { let db = db_clone.clone(); let slice = slice_clone.clone(); FieldFuture::new(async move { let did = ctx .args .get("did") .and_then(|v| v.string().ok()) .ok_or_else(|| Error::new("did argument is required"))?; // Create sync service and call sync_user_collections let cache_backend = crate::cache::CacheFactory::create_cache( crate::cache::CacheBackend::InMemory { ttl_seconds: None }, ) .await .map_err(|e| Error::new(format!("Failed to create cache: {}", e)))?; let cache = Arc::new(Mutex::new(crate::cache::SliceCache::new(cache_backend))); let sync_service = crate::sync::SyncService::with_cache( db.clone(), std::env::var("RELAY_ENDPOINT") .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()), cache, ); let result = sync_service .sync_user_collections(did, &slice, 30) // 30 second timeout .await .map_err(|e| Error::new(format!("Sync failed: {}", e)))?; // Convert result to GraphQL object let mut obj = async_graphql::indexmap::IndexMap::new(); obj.insert( async_graphql::Name::new("success"), GraphQLValue::from(result.success), ); obj.insert( async_graphql::Name::new("reposProcessed"), GraphQLValue::from(result.repos_processed), ); obj.insert( async_graphql::Name::new("recordsSynced"), GraphQLValue::from(result.records_synced), ); obj.insert( async_graphql::Name::new("timedOut"), GraphQLValue::from(result.timed_out), ); obj.insert( async_graphql::Name::new("message"), GraphQLValue::from(result.message), ); Ok(Some(FieldValue::owned_any(GraphQLValue::Object(obj)))) }) }, ) .argument(async_graphql::dynamic::InputValue::new( "did", TypeRef::named_nn(TypeRef::STRING), )) .description("Sync user collections for a given DID"), ); // Add dynamic mutations for each collection for lexicon in lexicons { if let (Some(nsid), Some(defs)) = ( lexicon.get("id").and_then(|n| n.as_str()), lexicon.get("defs"), ) { let fields = extract_collection_fields(defs); if !fields.is_empty() { let type_name = nsid_to_type_name(nsid); // Add create mutation mutation = add_create_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone()); // Add update mutation mutation = add_update_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone()); // Add delete mutation mutation = add_delete_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone()); } } } mutation } /// Converts NSID to GraphQL type name /// Example: app.bsky.feed.post -> AppBskyFeedPost fn nsid_to_type_name(nsid: &str) -> String { nsid.split('.') .map(|part| { let mut chars = part.chars(); match chars.next() { None => String::new(), Some(first) => first.to_uppercase().collect::() + chars.as_str(), } }) .collect::>() .join("") } /// Converts NSID to GraphQL query name in camelCase and pluralized /// Example: app.bsky.feed.post -> appBskyFeedPosts /// Example: fm.teal.alpha.feed.play -> fmTealAlphaFeedPlays fn nsid_to_query_name(nsid: &str) -> String { // First convert to camelCase like join fields let camel_case = nsid_to_join_field_name(nsid); // Then pluralize the end if camel_case.ends_with("s") || camel_case.ends_with("x") || camel_case.ends_with("ch") || camel_case.ends_with("sh") { format!("{}es", camel_case) // status -> statuses, box -> boxes } else if camel_case.ends_with("y") && camel_case.len() > 1 { let chars: Vec = camel_case.chars().collect(); if chars.len() > 1 && !['a', 'e', 'i', 'o', 'u'].contains(&chars[chars.len() - 2]) { format!("{}ies", &camel_case[..camel_case.len() - 1]) // party -> parties } else { format!("{}s", camel_case) // day -> days } } else { format!("{}s", camel_case) // post -> posts } } /// Converts NSID to GraphQL join field name in camelCase /// Example: app.bsky.actor.profile -> appBskyActorProfile fn nsid_to_join_field_name(nsid: &str) -> String { let parts: Vec<&str> = nsid.split('.').collect(); if parts.is_empty() { return nsid.to_string(); } // First part is lowercase, rest are capitalized let mut result = parts[0].to_string(); for part in &parts[1..] { let mut chars = part.chars(); if let Some(first) = chars.next() { result.push_str(&first.to_uppercase().collect::()); result.push_str(chars.as_str()); } } result } /// Creates an aggregated type for GROUP BY queries /// Returns a dynamic object with the grouped fields plus a count field fn create_aggregated_type(type_name: &str, fields: &[GraphQLField]) -> Object { let mut aggregated = Object::new(type_name); // Add fields from the lexicon that can be grouped // Use JSON type for all fields to support both strings and complex types for field in fields { let field_name = field.name.clone(); let field_name_clone = field_name.clone(); aggregated = aggregated.field(Field::new( &field_name, TypeRef::named("JSON"), move |ctx| { let field_name = field_name_clone.clone(); FieldFuture::new(async move { let json_value = ctx.parent_value.try_downcast_ref::()?; if let Some(obj) = json_value.as_object() { if let Some(value) = obj.get(&field_name) { // Convert serde_json::Value to async_graphql::Value let graphql_value = serde_json_to_graphql_value(value); return Ok(Some(graphql_value)); } } Ok(None) }) }, )); } // Add count field aggregated = aggregated.field(Field::new( "count", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let json_value = ctx.parent_value.try_downcast_ref::()?; if let Some(obj) = json_value.as_object() { if let Some(count) = obj.get("count") { if let Some(count_i64) = count.as_i64() { return Ok(Some(GraphQLValue::from(count_i64 as i32))); } } } Ok(Some(GraphQLValue::from(0))) }) }, )); aggregated } /// Creates the AggregationOrderBy input type for ordering by count fn create_aggregation_order_by_input() -> InputObject { InputObject::new("AggregationOrderBy") .field(InputValue::new("count", TypeRef::named("SortDirection"))) } /// Creates the DateInterval enum for date truncation fn create_date_interval_enum() -> Enum { Enum::new("DateInterval") .item(EnumItem::new("second")) .item(EnumItem::new("minute")) .item(EnumItem::new("hour")) .item(EnumItem::new("day")) .item(EnumItem::new("week")) .item(EnumItem::new("month")) .item(EnumItem::new("quarter")) .item(EnumItem::new("year")) } /// Converts a serde_json::Value to an async_graphql::Value fn serde_json_to_graphql_value(value: &serde_json::Value) -> GraphQLValue { match value { serde_json::Value::Null => GraphQLValue::Null, serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b), serde_json::Value::Number(n) => { if let Some(i) = n.as_i64() { GraphQLValue::Number(i.into()) } else if let Some(f) = n.as_f64() { GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into()) } else { GraphQLValue::Null } } serde_json::Value::String(s) => GraphQLValue::String(s.clone()), serde_json::Value::Array(arr) => { let values: Vec = arr.iter().map(serde_json_to_graphql_value).collect(); GraphQLValue::List(values) } serde_json::Value::Object(obj) => { let mut map = async_graphql::indexmap::IndexMap::new(); for (k, v) in obj { map.insert(async_graphql::Name::new(k), serde_json_to_graphql_value(v)); } GraphQLValue::Object(map) } } } /// Creates the RecordUpdate type for subscription events fn create_record_update_type() -> Object { let mut record_update = Object::new("RecordUpdate"); record_update = record_update.field(Field::new( "uri", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; if let GraphQLValue::Object(obj) = value { if let Some(uri) = obj.get("uri") { return Ok(Some(uri.clone())); } } Ok(None) }) }, )); record_update = record_update.field(Field::new( "cid", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; if let GraphQLValue::Object(obj) = value { if let Some(cid) = obj.get("cid") { return Ok(Some(cid.clone())); } } Ok(None) }) }, )); record_update = record_update.field(Field::new( "did", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; if let GraphQLValue::Object(obj) = value { if let Some(did) = obj.get("did") { return Ok(Some(did.clone())); } } Ok(None) }) }, )); record_update = record_update.field(Field::new( "collection", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; if let GraphQLValue::Object(obj) = value { if let Some(collection) = obj.get("collection") { return Ok(Some(collection.clone())); } } Ok(None) }) }, )); record_update = record_update.field(Field::new( "indexedAt", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; if let GraphQLValue::Object(obj) = value { if let Some(indexed_at) = obj.get("indexedAt") { return Ok(Some(indexed_at.clone())); } } Ok(None) }) }, )); record_update = record_update.field(Field::new( "operation", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; if let GraphQLValue::Object(obj) = value { if let Some(operation) = obj.get("operation") { return Ok(Some(operation.clone())); } } Ok(None) }) }, )); record_update = record_update.field(Field::new("value", TypeRef::named_nn("JSON"), |ctx| { FieldFuture::new(async move { let value = ctx .parent_value .downcast_ref::() .ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?; if let GraphQLValue::Object(obj) = value { if let Some(val) = obj.get("value") { return Ok(Some(val.clone())); } } Ok(None) }) })); record_update } /// Creates the Subscription root type with collection-specific subscriptions fn create_subscription_type(slice_uri: String, lexicons: &[serde_json::Value]) -> Subscription { let mut subscription = Subscription::new("Subscription"); // For each record collection, create {collection}Created, {collection}Updated, {collection}Deleted subscriptions for lexicon in lexicons { let nsid = match lexicon.get("id").and_then(|n| n.as_str()) { Some(n) => n, None => continue, }; let defs = match lexicon.get("defs") { Some(d) => d, None => continue, }; // Only process record types (skip queries, procedures, etc.) let is_record = defs .get("main") .and_then(|m| m.get("type")) .and_then(|t| t.as_str()) == Some("record"); if !is_record { continue; } let fields = extract_collection_fields(defs); if fields.is_empty() { continue; } let type_name = nsid_to_type_name(nsid); let field_base_name = nsid_to_join_field_name(nsid); // {collection}Created subscription let created_field_name = format!("{}Created", field_base_name); let slice_for_created = slice_uri.clone(); let nsid_for_created = nsid.to_string(); let type_name_for_created = type_name.clone(); subscription = subscription.field(SubscriptionField::new( &created_field_name, TypeRef::named_nn(&type_name_for_created), move |_ctx| { let slice_uri = slice_for_created.clone(); let collection = nsid_for_created.clone(); SubscriptionFieldFuture::new(async move { let mut receiver = PUBSUB.subscribe(&slice_uri).await; let stream = async_stream::stream! { while let Ok(event) = receiver.recv().await { // Filter by collection and operation if event.collection != collection || event.operation != crate::graphql::RecordOperation::Create { continue; } // Convert to RecordContainer and yield let indexed_record = crate::models::IndexedRecord { uri: event.uri, cid: event.cid, did: event.did, collection: event.collection, value: event.value, indexed_at: event.indexed_at, }; let container = RecordContainer { record: indexed_record, }; yield Ok(FieldValue::owned_any(container)); } }; Ok(stream) }) }, ) .description(format!("Subscribe to {} record creation events", nsid))); // {collection}Updated subscription let updated_field_name = format!("{}Updated", field_base_name); let slice_for_updated = slice_uri.clone(); let nsid_for_updated = nsid.to_string(); let type_name_for_updated = type_name.clone(); subscription = subscription.field(SubscriptionField::new( &updated_field_name, TypeRef::named_nn(&type_name_for_updated), move |_ctx| { let slice_uri = slice_for_updated.clone(); let collection = nsid_for_updated.clone(); SubscriptionFieldFuture::new(async move { let mut receiver = PUBSUB.subscribe(&slice_uri).await; let stream = async_stream::stream! { while let Ok(event) = receiver.recv().await { // Filter by collection and operation if event.collection != collection || event.operation != crate::graphql::RecordOperation::Update { continue; } // Convert to RecordContainer and yield let indexed_record = crate::models::IndexedRecord { uri: event.uri, cid: event.cid, did: event.did, collection: event.collection, value: event.value, indexed_at: event.indexed_at, }; let container = RecordContainer { record: indexed_record, }; yield Ok(FieldValue::owned_any(container)); } }; Ok(stream) }) }, ) .description(format!("Subscribe to {} record update events", nsid))); // {collection}Deleted subscription - returns just the URI string let deleted_field_name = format!("{}Deleted", field_base_name); let slice_for_deleted = slice_uri.clone(); let nsid_for_deleted = nsid.to_string(); subscription = subscription.field(SubscriptionField::new( &deleted_field_name, TypeRef::named_nn(TypeRef::STRING), move |_ctx| { let slice_uri = slice_for_deleted.clone(); let collection = nsid_for_deleted.clone(); SubscriptionFieldFuture::new(async move { let mut receiver = PUBSUB.subscribe(&slice_uri).await; let stream = async_stream::stream! { while let Ok(event) = receiver.recv().await { // Filter by collection and operation if event.collection != collection || event.operation != crate::graphql::RecordOperation::Delete { continue; } // For deletes, just return the URI yield Ok(FieldValue::value(GraphQLValue::String(event.uri))); } }; Ok(stream) }) }, ) .description(format!("Subscribe to {} record deletion events. Returns the URI of deleted records.", nsid))); } subscription } /// Helper function to parse GraphQL where clause recursively fn parse_where_clause( where_obj: async_graphql::dynamic::ObjectAccessor, ) -> crate::models::WhereClause { let mut where_clause = crate::models::WhereClause { conditions: HashMap::new(), or_conditions: None, and: None, or: None, }; for (field_name, condition_val) in where_obj.iter() { let field_str = field_name.as_str(); // Handle nested AND array if field_str == "and" { if let Ok(and_list) = condition_val.list() { let mut and_clauses = Vec::new(); for item in and_list.iter() { if let Ok(obj) = item.object() { and_clauses.push(parse_where_clause(obj)); } } if !and_clauses.is_empty() { where_clause.and = Some(and_clauses); } } continue; } // Handle nested OR array if field_str == "or" { if let Ok(or_list) = condition_val.list() { let mut or_clauses = Vec::new(); for item in or_list.iter() { if let Ok(obj) = item.object() { or_clauses.push(parse_where_clause(obj)); } } if !or_clauses.is_empty() { where_clause.or = Some(or_clauses); } } continue; } // Handle regular field conditions if let Ok(condition_obj) = condition_val.object() { let mut where_condition = crate::models::WhereCondition { eq: None, in_values: None, contains: None, fuzzy: None, gt: None, gte: None, lt: None, lte: None, }; // Parse eq condition if let Some(eq_val) = condition_obj.get("eq") { if let Ok(eq_str) = eq_val.string() { where_condition.eq = Some(serde_json::Value::String(eq_str.to_string())); } else if let Ok(eq_i64) = eq_val.i64() { where_condition.eq = Some(serde_json::Value::Number(eq_i64.into())); } } // Parse in condition if let Some(in_val) = condition_obj.get("in") { if let Ok(in_list) = in_val.list() { let mut values = Vec::new(); for item in in_list.iter() { if let Ok(s) = item.string() { values.push(serde_json::Value::String(s.to_string())); } else if let Ok(i) = item.i64() { values.push(serde_json::Value::Number(i.into())); } } where_condition.in_values = Some(values); } } // Parse contains condition if let Some(contains_val) = condition_obj.get("contains") { if let Ok(contains_str) = contains_val.string() { where_condition.contains = Some(contains_str.to_string()); } } // Parse fuzzy condition if let Some(fuzzy_val) = condition_obj.get("fuzzy") { if let Ok(fuzzy_str) = fuzzy_val.string() { where_condition.fuzzy = Some(fuzzy_str.to_string()); } } // Parse gt condition if let Some(gt_val) = condition_obj.get("gt") { if let Ok(gt_str) = gt_val.string() { where_condition.gt = Some(serde_json::Value::String(gt_str.to_string())); } else if let Ok(gt_i64) = gt_val.i64() { where_condition.gt = Some(serde_json::Value::Number(gt_i64.into())); } } // Parse gte condition if let Some(gte_val) = condition_obj.get("gte") { if let Ok(gte_str) = gte_val.string() { where_condition.gte = Some(serde_json::Value::String(gte_str.to_string())); } else if let Ok(gte_i64) = gte_val.i64() { where_condition.gte = Some(serde_json::Value::Number(gte_i64.into())); } } // Parse lt condition if let Some(lt_val) = condition_obj.get("lt") { if let Ok(lt_str) = lt_val.string() { where_condition.lt = Some(serde_json::Value::String(lt_str.to_string())); } else if let Ok(lt_i64) = lt_val.i64() { where_condition.lt = Some(serde_json::Value::Number(lt_i64.into())); } } // Parse lte condition if let Some(lte_val) = condition_obj.get("lte") { if let Ok(lte_str) = lte_val.string() { where_condition.lte = Some(serde_json::Value::String(lte_str.to_string())); } else if let Ok(lte_i64) = lte_val.i64() { where_condition.lte = Some(serde_json::Value::Number(lte_i64.into())); } } // Convert indexedAt to indexed_at for database column let db_field_name = if field_str == "indexedAt" { "indexed_at".to_string() } else { field_str.to_string() }; where_clause .conditions .insert(db_field_name, where_condition); } } where_clause } /// Creates an input type for mutations from lexicon fields fn create_mutation_input_type(type_name: &str, fields: &[GraphQLField]) -> InputObject { let mut input = InputObject::new(format!("{}Input", type_name)); for field in fields { let field_type_ref = match &field.field_type { GraphQLType::String => TypeRef::named(TypeRef::STRING), GraphQLType::Int => TypeRef::named(TypeRef::INT), GraphQLType::Boolean => TypeRef::named(TypeRef::BOOLEAN), GraphQLType::Float => TypeRef::named(TypeRef::FLOAT), GraphQLType::Array(inner) => match **inner { GraphQLType::String => TypeRef::named_list(TypeRef::STRING), GraphQLType::Int => TypeRef::named_list(TypeRef::INT), _ => TypeRef::named("JSON"), }, _ => TypeRef::named("JSON"), }; let field_type_ref = if field.is_required { TypeRef::NonNull(Box::new(field_type_ref)) } else { field_type_ref }; input = input.field(InputValue::new(&field.name, field_type_ref)); } input } /// Transforms fields in record data from GraphQL format to AT Protocol format /// /// Blob fields: /// - GraphQL format: `{ref: "bafyrei...", mimeType: "...", size: 123}` /// - AT Protocol format: `{$type: "blob", ref: {$link: "bafyrei..."}, mimeType: "...", size: 123}` /// /// Lexicon ref fields: /// - Adds `$type: "{ref_nsid}"` to objects (e.g., `{$type: "community.lexicon.location.hthree#main", ...}`) /// /// Nested objects: /// - Recursively processes nested objects and arrays fn transform_fields_for_atproto( mut data: serde_json::Value, fields: &[GraphQLField], ) -> serde_json::Value { if let serde_json::Value::Object(ref mut map) = data { for field in fields { if let Some(field_value) = map.get_mut(&field.name) { match &field.field_type { GraphQLType::Blob => { // Transform single blob field if let Some(blob_obj) = field_value.as_object_mut() { // Add $type: "blob" blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string())); // Check if ref is a string (GraphQL format) if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") { // Transform to {$link: "cid"} (AT Protocol format) let link_obj = serde_json::json!({ "$link": cid }); blob_obj.insert("ref".to_string(), link_obj); } } } GraphQLType::LexiconRef(ref_nsid) => { // Transform lexicon ref field by adding $type if let Some(ref_obj) = field_value.as_object_mut() { ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone())); } } GraphQLType::Object(nested_fields) => { // Recursively transform nested objects *field_value = transform_fields_for_atproto(field_value.clone(), nested_fields); } GraphQLType::Array(inner) => { match inner.as_ref() { GraphQLType::Blob => { // Transform array of blobs if let Some(arr) = field_value.as_array_mut() { for blob_value in arr { if let Some(blob_obj) = blob_value.as_object_mut() { // Add $type: "blob" blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string())); if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") { let link_obj = serde_json::json!({ "$link": cid }); blob_obj.insert("ref".to_string(), link_obj); } } } } } GraphQLType::LexiconRef(ref_nsid) => { // Transform array of lexicon refs if let Some(arr) = field_value.as_array_mut() { for ref_value in arr { if let Some(ref_obj) = ref_value.as_object_mut() { ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone())); } } } } GraphQLType::Object(nested_fields) => { // Transform array of objects recursively if let Some(arr) = field_value.as_array_mut() { for item in arr { *item = transform_fields_for_atproto(item.clone(), nested_fields); } } } _ => {} // Other array types don't need transformation } } _ => {} // Other field types don't need transformation } } } } data } /// Adds a create mutation for a collection fn add_create_mutation( mutation: Object, type_name: &str, nsid: &str, fields: &[GraphQLField], database: Database, slice_uri: String, ) -> Object { let mutation_name = format!("create{}", type_name); let nsid = nsid.to_string(); let nsid_clone = nsid.clone(); let fields = fields.to_vec(); mutation.field( Field::new( mutation_name, TypeRef::named_nn(type_name), move |ctx| { let db = database.clone(); let slice = slice_uri.clone(); let collection = nsid.clone(); let fields = fields.clone(); FieldFuture::new(async move { // Get GraphQL context which contains auth info let gql_ctx = ctx.data::() .map_err(|_| Error::new("Missing GraphQL context"))?; // Check if user is authenticated let token = gql_ctx.auth_token.as_ref() .ok_or_else(|| Error::new("Authentication required"))?; // Extract input data let input = ctx.args.get("input") .ok_or_else(|| Error::new("Missing input argument"))?; // Convert GraphQL value to JSON using deserialize let mut record_data: serde_json::Value = input.deserialize() .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?; // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs) record_data = transform_fields_for_atproto(record_data, &fields); // Optional rkey argument let rkey = ctx.args.get("rkey") .and_then(|v| v.string().ok()) .filter(|s| !s.is_empty()) .map(|s| s.to_string()); // Verify OAuth token and get user info let user_info = crate::auth::verify_oauth_token_cached( token, &gql_ctx.auth_base_url, gql_ctx.auth_cache.clone(), ) .await .map_err(|e| Error::new(format!("Invalid token: {}", e)))?; // Get AT Protocol DPoP auth and PDS URL let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached( token, &gql_ctx.auth_base_url, gql_ctx.auth_cache.clone(), ) .await .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?; let repo = user_info.did.unwrap_or(user_info.sub); // Validate record against lexicon if not network.slices.lexicon let validation_slice_uri = if collection == "network.slices.lexicon" { std::env::var("SYSTEM_SLICE_URI").unwrap_or_default() } else { slice.clone() }; if !validation_slice_uri.is_empty() { if let Ok(lexicons) = db.get_lexicons_by_slice(&validation_slice_uri).await { if !lexicons.is_empty() { if let Err(e) = slices_lexicon::validate_record( lexicons, &collection, record_data.clone() ) { return Err(Error::new(format!("Validation error: {}", e))); } } } } // Create record using AT Protocol let http_client = reqwest::Client::new(); let create_request = atproto_client::com::atproto::repo::CreateRecordRequest { repo: repo.clone(), collection: collection.clone(), record_key: rkey, record: record_data.clone(), swap_commit: None, validate: false, }; let result = atproto_client::com::atproto::repo::create_record( &http_client, &atproto_client::client::Auth::DPoP(dpop_auth), &pds_url, create_request, ) .await .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?; // Extract URI and CID from the response let (uri, cid) = match result { atproto_client::com::atproto::repo::CreateRecordResponse::StrongRef { uri, cid, .. } => (uri, cid), atproto_client::com::atproto::repo::CreateRecordResponse::Error(e) => { return Err(Error::new(format!("AT Protocol error: {} - {}", e.error.unwrap_or_default(), e.message.unwrap_or_default()))); } }; // Extract the target slice URI for lexicon records before moving record_data let target_slice_for_invalidation = if collection == "network.slices.lexicon" { record_data.get("slice") .and_then(|v| v.as_str()) .map(|s| s.to_string()) } else { None }; // Store in local database for indexing let record = crate::models::Record { uri: uri.clone(), cid: cid.clone(), did: repo, collection: collection.clone(), json: record_data, indexed_at: chrono::Utc::now(), slice_uri: Some(slice.clone()), }; let _ = db.insert_record(&record).await; // Invalidate GraphQL schema cache if this is a lexicon record if let Some(target_slice) = target_slice_for_invalidation { crate::graphql::invalidate_schema_cache(&target_slice).await; } // Fetch the created record and return it so Relay can update its cache let created_record = db.get_record(&uri).await .map_err(|e| Error::new(format!("Failed to fetch created record: {}", e)))? .ok_or_else(|| Error::new("Created record not found"))?; // Wrap the record in a RecordContainer so the type fields can access it let container = RecordContainer { record: created_record, }; Ok(Some(FieldValue::owned_any(container))) }) }, ) .argument(InputValue::new("input", TypeRef::named_nn(format!("{}Input", type_name)))) .argument(InputValue::new("rkey", TypeRef::named(TypeRef::STRING))) .description(format!("Create a new {} record", nsid_clone)) ) } /// Adds an update mutation for a collection fn add_update_mutation( mutation: Object, type_name: &str, nsid: &str, fields: &[GraphQLField], database: Database, slice_uri: String, ) -> Object { let mutation_name = format!("update{}", type_name); let nsid = nsid.to_string(); let nsid_clone = nsid.clone(); let fields = fields.to_vec(); mutation.field( Field::new( mutation_name, TypeRef::named_nn(type_name), move |ctx| { let db = database.clone(); let slice = slice_uri.clone(); let collection = nsid.clone(); let fields = fields.clone(); FieldFuture::new(async move { // Get GraphQL context which contains auth info let gql_ctx = ctx.data::() .map_err(|_| Error::new("Missing GraphQL context"))?; // Check if user is authenticated let token = gql_ctx.auth_token.as_ref() .ok_or_else(|| Error::new("Authentication required"))?; // Get rkey let rkey = ctx.args.get("rkey") .and_then(|v| v.string().ok()) .ok_or_else(|| Error::new("Missing rkey argument"))? .to_string(); // Extract input data let input = ctx.args.get("input") .ok_or_else(|| Error::new("Missing input argument"))?; // Convert GraphQL value to JSON using deserialize let mut record_data: serde_json::Value = input.deserialize() .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?; // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs) record_data = transform_fields_for_atproto(record_data, &fields); // Verify OAuth token and get user info let user_info = crate::auth::verify_oauth_token_cached( token, &gql_ctx.auth_base_url, gql_ctx.auth_cache.clone(), ) .await .map_err(|e| Error::new(format!("Invalid token: {}", e)))?; // Get AT Protocol DPoP auth and PDS URL let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached( token, &gql_ctx.auth_base_url, gql_ctx.auth_cache.clone(), ) .await .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?; let repo = user_info.did.unwrap_or(user_info.sub); // Validate record against lexicon let validation_slice_uri = if collection == "network.slices.lexicon" { std::env::var("SYSTEM_SLICE_URI").unwrap_or_default() } else { slice.clone() }; if !validation_slice_uri.is_empty() { if let Ok(lexicons) = db.get_lexicons_by_slice(&validation_slice_uri).await { if !lexicons.is_empty() { if let Err(e) = slices_lexicon::validate_record( lexicons, &collection, record_data.clone() ) { return Err(Error::new(format!("Validation error: {}", e))); } } } } // Update record using AT Protocol let http_client = reqwest::Client::new(); let put_request = atproto_client::com::atproto::repo::PutRecordRequest { repo: repo.clone(), collection: collection.clone(), record_key: rkey.clone(), record: record_data.clone(), swap_record: None, swap_commit: None, validate: false, }; let result = atproto_client::com::atproto::repo::put_record( &http_client, &atproto_client::client::Auth::DPoP(dpop_auth), &pds_url, put_request, ) .await .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?; // Extract URI and CID from the response let (uri, cid) = match result { atproto_client::com::atproto::repo::PutRecordResponse::StrongRef { uri, cid, .. } => (uri, cid), atproto_client::com::atproto::repo::PutRecordResponse::Error(e) => { return Err(Error::new(format!("AT Protocol error: {} - {}", e.error.unwrap_or_default(), e.message.unwrap_or_default()))); } }; // Extract the target slice URI for lexicon records before moving record_data let target_slice_for_invalidation = if collection == "network.slices.lexicon" { record_data.get("slice") .and_then(|v| v.as_str()) .map(|s| s.to_string()) } else { None }; // Update in local database let record = crate::models::Record { uri: uri.clone(), cid: cid.clone(), did: repo, collection: collection.clone(), json: record_data, indexed_at: chrono::Utc::now(), slice_uri: Some(slice.clone()), }; let _ = db.update_record(&record).await; // Invalidate GraphQL schema cache if this is a lexicon record if let Some(target_slice) = target_slice_for_invalidation { crate::graphql::invalidate_schema_cache(&target_slice).await; } // Fetch the updated record and return it so Relay can update its cache let updated_record = db.get_record(&uri).await .map_err(|e| Error::new(format!("Failed to fetch updated record: {}", e)))? .ok_or_else(|| Error::new("Updated record not found"))?; // Wrap the record in a RecordContainer so the type fields can access it let container = RecordContainer { record: updated_record, }; Ok(Some(FieldValue::owned_any(container))) }) }, ) .argument(InputValue::new("rkey", TypeRef::named_nn(TypeRef::STRING))) .argument(InputValue::new("input", TypeRef::named_nn(format!("{}Input", type_name)))) .description(format!("Update a {} record", nsid_clone)) ) } /// Adds a delete mutation for a collection fn add_delete_mutation( mutation: Object, type_name: &str, nsid: &str, database: Database, slice_uri: String, ) -> Object { let mutation_name = format!("delete{}", type_name); let nsid = nsid.to_string(); let nsid_clone = nsid.clone(); mutation.field( Field::new( mutation_name, TypeRef::named_nn(type_name), move |ctx| { let _db = database.clone(); let _slice = slice_uri.clone(); let _collection = nsid.clone(); FieldFuture::new(async move { // Get GraphQL context which contains auth info let gql_ctx = ctx.data::() .map_err(|_| Error::new("Missing GraphQL context"))?; // Check if user is authenticated let token = gql_ctx.auth_token.as_ref() .ok_or_else(|| Error::new("Authentication required"))?; // Get rkey let rkey = ctx.args.get("rkey") .and_then(|v| v.string().ok()) .ok_or_else(|| Error::new("Missing rkey argument"))? .to_string(); // Verify OAuth token and get user info let user_info = crate::auth::verify_oauth_token_cached( token, &gql_ctx.auth_base_url, gql_ctx.auth_cache.clone(), ) .await .map_err(|e| Error::new(format!("Invalid token: {}", e)))?; // Get AT Protocol DPoP auth and PDS URL let (dpop_auth, pds_url) = crate::auth::get_atproto_auth_for_user_cached( token, &gql_ctx.auth_base_url, gql_ctx.auth_cache.clone(), ) .await .map_err(|e| Error::new(format!("Failed to get DPoP auth: {}", e)))?; let repo = user_info.did.unwrap_or(user_info.sub); let uri = format!("at://{}/{}/{}", repo, _collection, rkey); // Fetch the record before deleting it let deleted_record = _db.get_record(&uri).await .map_err(|e| Error::new(format!("Failed to fetch record: {}", e)))? .ok_or_else(|| Error::new("Record not found"))?; // Extract the target slice URI for lexicon records before deletion let target_slice_for_invalidation = if _collection == "network.slices.lexicon" { deleted_record.value.get("slice") .and_then(|v| v.as_str()) .map(|s| s.to_string()) } else { None }; // Delete record using AT Protocol let http_client = reqwest::Client::new(); let delete_request = atproto_client::com::atproto::repo::DeleteRecordRequest { repo: repo.clone(), collection: _collection.clone(), record_key: rkey.clone(), swap_record: None, swap_commit: None, }; atproto_client::com::atproto::repo::delete_record( &http_client, &atproto_client::client::Auth::DPoP(dpop_auth), &pds_url, delete_request, ) .await .map_err(|e| Error::new(format!("AT Protocol request failed: {}", e)))?; // Handle cascade deletion if let Err(e) = _db.handle_cascade_deletion(&uri, &_collection).await { tracing::warn!("Cascade deletion failed for {}: {}", uri, e); } // Delete from local database let _ = _db.delete_record_by_uri(&uri, None).await; // Invalidate GraphQL schema cache if this is a lexicon record if let Some(target_slice) = target_slice_for_invalidation { crate::graphql::invalidate_schema_cache(&target_slice).await; } // Wrap the record in a RecordContainer so the type fields can access it let container = RecordContainer { record: deleted_record, }; Ok(Some(FieldValue::owned_any(container))) }) }, ) .argument(InputValue::new("rkey", TypeRef::named_nn(TypeRef::STRING))) .description(format!("Delete a {} record", nsid_clone)) ) } /// Creates the MutationResponse type for create/update mutations fn create_mutation_response_type() -> Object { Object::new("MutationResponse") .field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { // Parent value is a GraphQLValue, extract the Object variant if let Ok(value) = ctx.parent_value.try_downcast_ref::() { if let GraphQLValue::Object(obj) = value { if let Some(uri) = obj.get(&async_graphql::Name::new("uri")) { return Ok(Some(uri.clone())); } } } Ok(None) }) })) .field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { // Parent value is a GraphQLValue, extract the Object variant if let Ok(value) = ctx.parent_value.try_downcast_ref::() { if let GraphQLValue::Object(obj) = value { if let Some(cid) = obj.get(&async_graphql::Name::new("cid")) { return Ok(Some(cid.clone())); } } } Ok(None) }) })) }