···19192020# HTTP client and server
2121reqwest = { version = "0.12", features = ["json", "stream"] }
2222-axum = { version = "0.7", features = ["ws", "macros"] }
2323-axum-extra = { version = "0.9", features = ["form"] }
2222+axum = { version = "0.8", features = ["ws", "macros"] }
2323+axum-extra = { version = "0.10", features = ["form"] }
2424tower = "0.5"
2525tower-http = { version = "0.6", features = ["cors", "trace"] }
2626···65656666# Redis for caching
6767redis = { version = "0.32", features = ["tokio-comp", "connection-manager"] }
6868+6969+# GraphQL server
7070+async-graphql = { version = "7.0", features = ["dynamic-schema", "dataloader"] }
7171+async-graphql-axum = "7.0"
7272+lazy_static = "1.5"
+35
api/src/database/actors.rs
···251251 .await?;
252252 Ok(result.rows_affected())
253253 }
254254+255255+ /// Resolves actor handles to DIDs for a specific slice.
256256+ ///
257257+ /// # Arguments
258258+ /// * `handles` - List of handles to resolve
259259+ /// * `slice_uri` - AT-URI of the slice
260260+ ///
261261+ /// # Returns
262262+ /// Vec of DIDs corresponding to the handles
263263+ pub async fn resolve_handles_to_dids(
264264+ &self,
265265+ handles: &[String],
266266+ slice_uri: &str,
267267+ ) -> Result<Vec<String>, DatabaseError> {
268268+ if handles.is_empty() {
269269+ return Ok(Vec::new());
270270+ }
271271+272272+ let placeholders: Vec<String> = (1..=handles.len())
273273+ .map(|i| format!("${}", i))
274274+ .collect();
275275+ let query_sql = format!(
276276+ "SELECT DISTINCT did FROM actor WHERE handle = ANY(ARRAY[{}]) AND slice_uri = ${}",
277277+ placeholders.join(", "),
278278+ handles.len() + 1
279279+ );
280280+281281+ let mut query = sqlx::query_scalar::<_, String>(&query_sql);
282282+ for handle in handles {
283283+ query = query.bind(handle);
284284+ }
285285+ query = query.bind(slice_uri);
286286+287287+ Ok(query.fetch_all(&self.pool).await?)
288288+ }
254289}
255290256291/// Builds WHERE conditions specifically for actor queries.
+27
api/src/graphql/dataloaders.rs
···11+//! DataLoader utilities for extracting references from records
22+33+use serde_json::Value;
44+55+/// Extract URI from a strongRef value
66+/// strongRef format: { "$type": "com.atproto.repo.strongRef", "uri": "at://...", "cid": "..." }
77+pub fn extract_uri_from_strong_ref(value: &Value) -> Option<String> {
88+ if let Some(obj) = value.as_object() {
99+ // Check if this is a strongRef
1010+ if let Some(type_val) = obj.get("$type") {
1111+ if type_val.as_str() == Some("com.atproto.repo.strongRef") {
1212+ return obj.get("uri").and_then(|u| u.as_str()).map(|s| s.to_string());
1313+ }
1414+ }
1515+1616+ // Also support direct uri field (some lexicons might use this)
1717+ if let Some(uri) = obj.get("uri") {
1818+ if let Some(uri_str) = uri.as_str() {
1919+ if uri_str.starts_with("at://") {
2020+ return Some(uri_str.to_string());
2121+ }
2222+ }
2323+ }
2424+ }
2525+2626+ None
2727+}
···11+//! GraphQL endpoint implementation for Slices
22+//!
33+//! This module provides a GraphQL interface to query slice records with support
44+//! for joining linked records through AT Protocol strongRef references.
55+66+mod schema_builder;
77+mod dataloaders;
88+mod types;
99+pub mod handler;
1010+1111+pub use schema_builder::build_graphql_schema;
1212+pub use handler::{graphql_handler, graphql_playground};
+1431
api/src/graphql/schema_builder.rs
···11+//! Dynamic GraphQL schema builder from AT Protocol lexicons
22+//!
33+//! This module generates GraphQL schemas at runtime based on lexicon definitions
44+//! stored in the database, enabling flexible querying of slice records.
55+66+use async_graphql::dynamic::{Field, FieldFuture, FieldValue, Object, Schema, Scalar, TypeRef, InputObject, InputValue, Enum, EnumItem};
77+use async_graphql::{Error, Value as GraphQLValue};
88+use base64::engine::general_purpose;
99+use base64::Engine;
1010+use serde_json;
1111+use std::collections::HashMap;
1212+use std::sync::Arc;
1313+use tokio::sync::Mutex;
1414+1515+use crate::database::Database;
1616+use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType};
1717+1818+/// Metadata about a collection for cross-referencing
1919+#[derive(Clone)]
2020+struct CollectionMeta {
2121+ nsid: String,
2222+ key_type: String, // "tid", "literal:self", or "any"
2323+ type_name: String, // GraphQL type name for this collection
2424+}
2525+2626+/// Builds a dynamic GraphQL schema from lexicons for a given slice
2727+pub async fn build_graphql_schema(
2828+ database: Database,
2929+ slice_uri: String,
3030+) -> Result<Schema, String> {
3131+ // Fetch all lexicons for this slice
3232+ let lexicons = database
3333+ .get_lexicons_by_slice(&slice_uri)
3434+ .await
3535+ .map_err(|e| format!("Failed to load lexicons: {}", e))?;
3636+3737+ // Build Query root type and collect all object types
3838+ let mut query = Object::new("Query");
3939+ let mut objects_to_register = Vec::new();
4040+4141+ // First pass: collect metadata about all collections for cross-referencing
4242+ let mut all_collections: Vec<CollectionMeta> = Vec::new();
4343+ for lexicon in &lexicons {
4444+ let nsid = lexicon
4545+ .get("id")
4646+ .and_then(|n| n.as_str())
4747+ .ok_or_else(|| "Lexicon missing id".to_string())?;
4848+4949+ let defs = lexicon
5050+ .get("defs")
5151+ .ok_or_else(|| format!("Lexicon {} missing defs", nsid))?;
5252+5353+ let fields = extract_collection_fields(defs);
5454+ if !fields.is_empty() {
5555+ if let Some(key_type) = extract_record_key(defs) {
5656+ all_collections.push(CollectionMeta {
5757+ nsid: nsid.to_string(),
5858+ key_type,
5959+ type_name: nsid_to_type_name(nsid),
6060+ });
6161+ }
6262+ }
6363+ }
6464+6565+ // Second pass: create types and queries
6666+ for lexicon in &lexicons {
6767+ // get_lexicons_by_slice returns {lexicon: 1, id: "nsid", defs: {...}}
6868+ let nsid = lexicon
6969+ .get("id")
7070+ .and_then(|n| n.as_str())
7171+ .ok_or_else(|| "Lexicon missing id".to_string())?;
7272+7373+ let defs = lexicon
7474+ .get("defs")
7575+ .ok_or_else(|| format!("Lexicon {} missing defs", nsid))?
7676+ .clone();
7777+7878+ // Extract fields from lexicon
7979+ let fields = extract_collection_fields(&defs);
8080+8181+ if !fields.is_empty() {
8282+ // Create a GraphQL type for this collection
8383+ let type_name = nsid_to_type_name(nsid);
8484+ let record_type = create_record_type(&type_name, &fields, database.clone(), slice_uri.clone(), &all_collections);
8585+8686+ // Create edge and connection types for this collection (Relay standard)
8787+ let edge_type = create_edge_type(&type_name);
8888+ let connection_type = create_connection_type(&type_name);
8989+9090+ // Collect the types to register with schema later
9191+ objects_to_register.push(record_type);
9292+ objects_to_register.push(edge_type);
9393+ objects_to_register.push(connection_type);
9494+9595+ // Add query field for this collection
9696+ let collection_query_name = nsid_to_query_name(nsid);
9797+ let db_clone = database.clone();
9898+ let slice_clone = slice_uri.clone();
9999+ let nsid_clone = nsid.to_string();
100100+101101+ let connection_type_name = format!("{}Connection", &type_name);
102102+ query = query.field(
103103+ Field::new(
104104+ &collection_query_name,
105105+ TypeRef::named_nn(&connection_type_name),
106106+ move |ctx| {
107107+ let db = db_clone.clone();
108108+ let slice = slice_clone.clone();
109109+ let collection = nsid_clone.clone();
110110+111111+ FieldFuture::new(async move {
112112+ // Get Relay-standard pagination arguments
113113+ let first: i32 = match ctx.args.get("first") {
114114+ Some(val) => val.i64().unwrap_or(50) as i32,
115115+ None => 50,
116116+ };
117117+118118+ let after: Option<&str> = match ctx.args.get("after") {
119119+ Some(val) => val.string().ok(),
120120+ None => None,
121121+ };
122122+123123+ // Parse sortBy argument
124124+ let sort_by: Option<Vec<crate::models::SortField>> = match ctx.args.get("sortBy") {
125125+ Some(val) => {
126126+ if let Ok(list) = val.list() {
127127+ let mut sort_fields = Vec::new();
128128+ for item in list.iter() {
129129+ if let Ok(obj) = item.object() {
130130+ let field = obj.get("field")
131131+ .and_then(|v| v.string().ok())
132132+ .unwrap_or("indexedAt")
133133+ .to_string();
134134+ let direction = obj.get("direction")
135135+ .and_then(|v| v.string().ok())
136136+ .unwrap_or("desc")
137137+ .to_string();
138138+ sort_fields.push(crate::models::SortField { field, direction });
139139+ }
140140+ }
141141+ Some(sort_fields)
142142+ } else {
143143+ None
144144+ }
145145+ },
146146+ None => None,
147147+ };
148148+149149+ // Build where clause for this collection
150150+ let mut where_clause = crate::models::WhereClause {
151151+ conditions: HashMap::new(),
152152+ or_conditions: None,
153153+ };
154154+155155+ // Always filter by collection
156156+ where_clause.conditions.insert(
157157+ "collection".to_string(),
158158+ crate::models::WhereCondition {
159159+ eq: Some(serde_json::Value::String(collection.clone())),
160160+ in_values: None,
161161+ contains: None,
162162+ },
163163+ );
164164+165165+ // Parse where argument if provided
166166+ if let Some(where_val) = ctx.args.get("where") {
167167+ // Try to parse as JSON object
168168+ if let Ok(where_obj) = where_val.object() {
169169+ for (field_name, condition_val) in where_obj.iter() {
170170+ if let Ok(condition_obj) = condition_val.object() {
171171+ let mut where_condition = crate::models::WhereCondition {
172172+ eq: None,
173173+ in_values: None,
174174+ contains: None,
175175+ };
176176+177177+ // Parse eq condition
178178+ if let Some(eq_val) = condition_obj.get("eq") {
179179+ if let Ok(eq_str) = eq_val.string() {
180180+ where_condition.eq = Some(serde_json::Value::String(eq_str.to_string()));
181181+ } else if let Ok(eq_i64) = eq_val.i64() {
182182+ where_condition.eq = Some(serde_json::Value::Number(eq_i64.into()));
183183+ }
184184+ }
185185+186186+ // Parse in condition
187187+ if let Some(in_val) = condition_obj.get("in") {
188188+ if let Ok(in_list) = in_val.list() {
189189+ let mut values = Vec::new();
190190+ for item in in_list.iter() {
191191+ if let Ok(s) = item.string() {
192192+ values.push(serde_json::Value::String(s.to_string()));
193193+ } else if let Ok(i) = item.i64() {
194194+ values.push(serde_json::Value::Number(i.into()));
195195+ }
196196+ }
197197+ where_condition.in_values = Some(values);
198198+ }
199199+ }
200200+201201+ // Parse contains condition
202202+ if let Some(contains_val) = condition_obj.get("contains") {
203203+ if let Ok(contains_str) = contains_val.string() {
204204+ where_condition.contains = Some(contains_str.to_string());
205205+ }
206206+ }
207207+208208+ where_clause.conditions.insert(field_name.to_string(), where_condition);
209209+ }
210210+ }
211211+ }
212212+ }
213213+214214+ // Resolve actorHandle to did if present
215215+ if let Some(actor_handle_condition) = where_clause.conditions.remove("actorHandle") {
216216+ // Collect handles to resolve
217217+ let mut handles = Vec::new();
218218+ if let Some(eq_value) = &actor_handle_condition.eq {
219219+ if let Some(handle_str) = eq_value.as_str() {
220220+ handles.push(handle_str.to_string());
221221+ }
222222+ }
223223+ if let Some(in_values) = &actor_handle_condition.in_values {
224224+ for value in in_values {
225225+ if let Some(handle_str) = value.as_str() {
226226+ handles.push(handle_str.to_string());
227227+ }
228228+ }
229229+ }
230230+231231+ // Resolve handles to DIDs from actor table
232232+ if !handles.is_empty() {
233233+ match db.resolve_handles_to_dids(&handles, &slice).await {
234234+ Ok(dids) => {
235235+ if !dids.is_empty() {
236236+ // Replace actorHandle condition with did condition
237237+ let did_condition = if dids.len() == 1 {
238238+ crate::models::WhereCondition {
239239+ eq: Some(serde_json::Value::String(dids[0].clone())),
240240+ in_values: None,
241241+ contains: None,
242242+ }
243243+ } else {
244244+ crate::models::WhereCondition {
245245+ eq: None,
246246+ in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
247247+ contains: None,
248248+ }
249249+ };
250250+ where_clause.conditions.insert("did".to_string(), did_condition);
251251+ }
252252+ // If no DIDs found, the query will return 0 results naturally
253253+ }
254254+ Err(_) => {
255255+ // If resolution fails, skip the condition (will return 0 results)
256256+ }
257257+ }
258258+ }
259259+ }
260260+261261+ // Query database for records
262262+ let (records, next_cursor) = db
263263+ .get_slice_collections_records(
264264+ &slice,
265265+ Some(first),
266266+ after,
267267+ sort_by.as_ref(),
268268+ Some(&where_clause),
269269+ )
270270+ .await
271271+ .map_err(|e| {
272272+ Error::new(format!("Database query failed: {}", e))
273273+ })?;
274274+275275+ // Query database for total count
276276+ let total_count = db
277277+ .count_slice_collections_records(&slice, Some(&where_clause))
278278+ .await
279279+ .map_err(|e| {
280280+ Error::new(format!("Count query failed: {}", e))
281281+ })? as i32;
282282+283283+ // Convert records to RecordContainers
284284+ let record_containers: Vec<RecordContainer> = records
285285+ .into_iter()
286286+ .map(|record| {
287287+ // Convert Record to IndexedRecord
288288+ let indexed_record = crate::models::IndexedRecord {
289289+ uri: record.uri,
290290+ cid: record.cid,
291291+ did: record.did,
292292+ collection: record.collection,
293293+ value: record.json,
294294+ indexed_at: record.indexed_at.to_rfc3339(),
295295+ };
296296+ RecordContainer {
297297+ record: indexed_record,
298298+ }
299299+ })
300300+ .collect();
301301+302302+ // Build Connection data
303303+ let connection_data = ConnectionData {
304304+ total_count,
305305+ has_next_page: next_cursor.is_some(),
306306+ end_cursor: next_cursor,
307307+ nodes: record_containers,
308308+ };
309309+310310+ Ok(Some(FieldValue::owned_any(connection_data)))
311311+ })
312312+ },
313313+ )
314314+ .argument(async_graphql::dynamic::InputValue::new(
315315+ "first",
316316+ TypeRef::named(TypeRef::INT),
317317+ ))
318318+ .argument(async_graphql::dynamic::InputValue::new(
319319+ "after",
320320+ TypeRef::named(TypeRef::STRING),
321321+ ))
322322+ .argument(async_graphql::dynamic::InputValue::new(
323323+ "last",
324324+ TypeRef::named(TypeRef::INT),
325325+ ))
326326+ .argument(async_graphql::dynamic::InputValue::new(
327327+ "before",
328328+ TypeRef::named(TypeRef::STRING),
329329+ ))
330330+ .argument(async_graphql::dynamic::InputValue::new(
331331+ "sortBy",
332332+ TypeRef::named_list("SortField"),
333333+ ))
334334+ .argument(async_graphql::dynamic::InputValue::new(
335335+ "where",
336336+ TypeRef::named("JSON"),
337337+ ))
338338+ .description(format!("Query {} records", nsid)),
339339+ );
340340+ }
341341+ }
342342+343343+ // Build Mutation type
344344+ let mutation = create_mutation_type(database.clone(), slice_uri.clone());
345345+346346+ // Build and return the schema
347347+ let mut schema_builder = Schema::build(query.type_name(), Some(mutation.type_name()), None)
348348+ .register(query)
349349+ .register(mutation);
350350+351351+ // Register JSON scalar type for complex fields
352352+ let json_scalar = Scalar::new("JSON");
353353+ schema_builder = schema_builder.register(json_scalar);
354354+355355+ // Register Blob type
356356+ let blob_type = create_blob_type();
357357+ schema_builder = schema_builder.register(blob_type);
358358+359359+ // Register SyncResult type for mutations
360360+ let sync_result_type = create_sync_result_type();
361361+ schema_builder = schema_builder.register(sync_result_type);
362362+363363+ // Register SortDirection enum
364364+ let sort_direction_enum = create_sort_direction_enum();
365365+ schema_builder = schema_builder.register(sort_direction_enum);
366366+367367+ // Register SortField input type
368368+ let sort_field_input = create_sort_field_input();
369369+ schema_builder = schema_builder.register(sort_field_input);
370370+371371+ // Register condition input types for where clauses
372372+ let string_condition_input = create_string_condition_input();
373373+ schema_builder = schema_builder.register(string_condition_input);
374374+375375+ let int_condition_input = create_int_condition_input();
376376+ schema_builder = schema_builder.register(int_condition_input);
377377+378378+ // Register PageInfo type
379379+ let page_info_type = create_page_info_type();
380380+ schema_builder = schema_builder.register(page_info_type);
381381+382382+ // Register all object types
383383+ for obj in objects_to_register {
384384+ schema_builder = schema_builder.register(obj);
385385+ }
386386+387387+ schema_builder
388388+ .finish()
389389+ .map_err(|e| format!("Schema build error: {:?}", e))
390390+}
391391+392392+/// Container to hold record data for resolvers
393393+#[derive(Clone)]
394394+struct RecordContainer {
395395+ record: crate::models::IndexedRecord,
396396+}
397397+398398+/// Container to hold blob data and DID for URL generation
399399+#[derive(Clone)]
400400+struct BlobContainer {
401401+ blob_ref: String, // CID reference
402402+ mime_type: String, // MIME type
403403+ size: i64, // Size in bytes
404404+ did: String, // DID for CDN URL generation
405405+}
406406+407407+/// Creates a GraphQL Object type for a record collection
408408+fn create_record_type(
409409+ type_name: &str,
410410+ fields: &[GraphQLField],
411411+ database: Database,
412412+ slice_uri: String,
413413+ all_collections: &[CollectionMeta],
414414+) -> Object {
415415+ let mut object = Object::new(type_name);
416416+417417+ // Check which field names exist in lexicon to avoid conflicts
418418+ let lexicon_field_names: std::collections::HashSet<&str> =
419419+ fields.iter().map(|f| f.name.as_str()).collect();
420420+421421+ // Add standard AT Protocol fields only if they don't conflict with lexicon fields
422422+ if !lexicon_field_names.contains("uri") {
423423+ object = object.field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
424424+ FieldFuture::new(async move {
425425+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
426426+ Ok(Some(GraphQLValue::from(container.record.uri.clone())))
427427+ })
428428+ }));
429429+ }
430430+431431+ if !lexicon_field_names.contains("cid") {
432432+ object = object.field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| {
433433+ FieldFuture::new(async move {
434434+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
435435+ Ok(Some(GraphQLValue::from(container.record.cid.clone())))
436436+ })
437437+ }));
438438+ }
439439+440440+ if !lexicon_field_names.contains("did") {
441441+ object = object.field(Field::new("did", TypeRef::named_nn(TypeRef::STRING), |ctx| {
442442+ FieldFuture::new(async move {
443443+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
444444+ Ok(Some(GraphQLValue::from(container.record.did.clone())))
445445+ })
446446+ }));
447447+ }
448448+449449+ if !lexicon_field_names.contains("indexedAt") {
450450+ object = object.field(Field::new(
451451+ "indexedAt",
452452+ TypeRef::named_nn(TypeRef::STRING),
453453+ |ctx| {
454454+ FieldFuture::new(async move {
455455+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
456456+ Ok(Some(GraphQLValue::from(
457457+ container.record.indexed_at.clone(),
458458+ )))
459459+ })
460460+ },
461461+ ));
462462+ }
463463+464464+ // Add actor metadata field (handle from actors table)
465465+ // Always add as "actorHandle" to avoid conflicts with lexicon fields
466466+ let db_for_actor = database.clone();
467467+ let slice_for_actor = slice_uri.clone();
468468+ object = object.field(Field::new(
469469+ "actorHandle",
470470+ TypeRef::named(TypeRef::STRING),
471471+ move |ctx| {
472472+ let db = db_for_actor.clone();
473473+ let slice = slice_for_actor.clone();
474474+ FieldFuture::new(async move {
475475+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
476476+ let did = &container.record.did;
477477+478478+ // Build where clause to find actor by DID
479479+ let mut where_clause = crate::models::WhereClause {
480480+ conditions: std::collections::HashMap::new(),
481481+ or_conditions: None,
482482+ };
483483+ where_clause.conditions.insert(
484484+ "did".to_string(),
485485+ crate::models::WhereCondition {
486486+ eq: Some(serde_json::Value::String(did.clone())),
487487+ in_values: None,
488488+ contains: None,
489489+ },
490490+ );
491491+492492+ match db.get_slice_actors(&slice, Some(1), None, Some(&where_clause)).await {
493493+ Ok((actors, _cursor)) => {
494494+ if let Some(actor) = actors.first() {
495495+ if let Some(handle) = &actor.handle {
496496+ Ok(Some(GraphQLValue::from(handle.clone())))
497497+ } else {
498498+ Ok(None)
499499+ }
500500+ } else {
501501+ Ok(None)
502502+ }
503503+ }
504504+ Err(e) => {
505505+ tracing::debug!("Actor not found for {}: {}", did, e);
506506+ Ok(None)
507507+ }
508508+ }
509509+ })
510510+ },
511511+ ));
512512+513513+ // Add fields from lexicon
514514+ for field in fields {
515515+ let field_name = field.name.clone();
516516+ let field_name_for_field = field_name.clone(); // Need separate clone for Field::new
517517+ let field_type = field.field_type.clone();
518518+ let db_clone = database.clone();
519519+520520+ let type_ref = graphql_type_to_typeref(&field.field_type, field.is_required);
521521+522522+ object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| {
523523+ let field_name = field_name.clone();
524524+ let field_type = field_type.clone();
525525+ let db = db_clone.clone();
526526+527527+ FieldFuture::new(async move {
528528+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
529529+ let value = container.record.value.get(&field_name);
530530+531531+ if let Some(val) = value {
532532+ // Check for explicit null value
533533+ if val.is_null() {
534534+ return Ok(Some(FieldValue::NULL));
535535+ }
536536+537537+ // Check if this is a blob field
538538+ if matches!(field_type, GraphQLType::Blob) {
539539+ // Extract blob fields from JSON object
540540+ if let Some(obj) = val.as_object() {
541541+ let blob_ref = obj
542542+ .get("ref")
543543+ .and_then(|r| r.as_object())
544544+ .and_then(|r| r.get("$link"))
545545+ .and_then(|l| l.as_str())
546546+ .unwrap_or("")
547547+ .to_string();
548548+549549+ let mime_type = obj
550550+ .get("mimeType")
551551+ .and_then(|m| m.as_str())
552552+ .unwrap_or("image/jpeg")
553553+ .to_string();
554554+555555+ let size = obj
556556+ .get("size")
557557+ .and_then(|s| s.as_i64())
558558+ .unwrap_or(0);
559559+560560+ let blob_container = BlobContainer {
561561+ blob_ref,
562562+ mime_type,
563563+ size,
564564+ did: container.record.did.clone(),
565565+ };
566566+567567+ return Ok(Some(FieldValue::owned_any(blob_container)));
568568+ }
569569+570570+ // If not a proper blob object, return NULL
571571+ return Ok(Some(FieldValue::NULL));
572572+ }
573573+574574+ // Check if this is a reference field that needs joining
575575+ if matches!(field_type, GraphQLType::Ref) {
576576+ // Extract URI from strongRef and fetch the linked record
577577+ if let Some(uri) =
578578+ crate::graphql::dataloaders::extract_uri_from_strong_ref(val)
579579+ {
580580+ match db.get_record(&uri).await {
581581+ Ok(Some(linked_record)) => {
582582+ // Convert the linked record to a JSON value
583583+ let record_json = serde_json::to_value(linked_record)
584584+ .map_err(|e| {
585585+ Error::new(format!("Serialization error: {}", e))
586586+ })?;
587587+588588+ // Convert serde_json::Value to async_graphql::Value
589589+ let graphql_val = json_to_graphql_value(&record_json);
590590+ return Ok(Some(FieldValue::value(graphql_val)));
591591+ }
592592+ Ok(None) => {
593593+ return Ok(Some(FieldValue::NULL));
594594+ }
595595+ Err(e) => {
596596+ tracing::error!("Error fetching linked record: {}", e);
597597+ return Ok(Some(FieldValue::NULL));
598598+ }
599599+ }
600600+ }
601601+ }
602602+603603+ // For non-ref fields, return the raw JSON value
604604+ let graphql_val = json_to_graphql_value(val);
605605+ Ok(Some(FieldValue::value(graphql_val)))
606606+ } else {
607607+ Ok(Some(FieldValue::NULL))
608608+ }
609609+ })
610610+ }));
611611+ }
612612+613613+ // Add join fields for cross-referencing other collections by DID
614614+ for collection in all_collections {
615615+ let field_name = nsid_to_join_field_name(&collection.nsid);
616616+617617+ // Skip if this would conflict with existing field
618618+ if lexicon_field_names.contains(field_name.as_str()) {
619619+ continue;
620620+ }
621621+622622+ let collection_nsid = collection.nsid.clone();
623623+ let key_type = collection.key_type.clone();
624624+ let db_for_join = database.clone();
625625+ let slice_for_join = slice_uri.clone();
626626+627627+ // Determine type and resolver based on key_type
628628+ match key_type.as_str() {
629629+ "literal:self" => {
630630+ // Single record per DID - return nullable object of the collection's type
631631+ object = object.field(Field::new(
632632+ &field_name,
633633+ TypeRef::named(&collection.type_name),
634634+ move |ctx| {
635635+ let db = db_for_join.clone();
636636+ let nsid = collection_nsid.clone();
637637+ FieldFuture::new(async move {
638638+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
639639+ let uri = format!("at://{}/{}/self", container.record.did, nsid);
640640+641641+ match db.get_record(&uri).await {
642642+ Ok(Some(record)) => {
643643+ let new_container = RecordContainer {
644644+ record,
645645+ };
646646+ Ok(Some(FieldValue::owned_any(new_container)))
647647+ }
648648+ Ok(None) => Ok(None),
649649+ Err(e) => {
650650+ tracing::debug!("Record not found for {}: {}", uri, e);
651651+ Ok(None)
652652+ }
653653+ }
654654+ })
655655+ },
656656+ ));
657657+ }
658658+ "tid" | "any" => {
659659+ // Multiple records per DID - return array of the collection's type
660660+ object = object.field(
661661+ Field::new(
662662+ &field_name,
663663+ TypeRef::named_nn_list_nn(&collection.type_name),
664664+ move |ctx| {
665665+ let db = db_for_join.clone();
666666+ let nsid = collection_nsid.clone();
667667+ let slice = slice_for_join.clone();
668668+ FieldFuture::new(async move {
669669+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
670670+ let did = &container.record.did;
671671+672672+ // Get limit from argument, default to 50
673673+ let limit = ctx.args.get("limit")
674674+ .and_then(|v| v.i64().ok())
675675+ .map(|i| i as i32)
676676+ .unwrap_or(50)
677677+ .min(100); // Cap at 100 to prevent abuse
678678+679679+ // Build where clause to find all records of this collection for this DID
680680+ let mut where_clause = crate::models::WhereClause {
681681+ conditions: HashMap::new(),
682682+ or_conditions: None,
683683+ };
684684+ where_clause.conditions.insert(
685685+ "collection".to_string(),
686686+ crate::models::WhereCondition {
687687+ eq: Some(serde_json::Value::String(nsid.clone())),
688688+ in_values: None,
689689+ contains: None,
690690+ },
691691+ );
692692+ where_clause.conditions.insert(
693693+ "did".to_string(),
694694+ crate::models::WhereCondition {
695695+ eq: Some(serde_json::Value::String(did.clone())),
696696+ in_values: None,
697697+ contains: None,
698698+ },
699699+ );
700700+701701+ match db.get_slice_collections_records(
702702+ &slice,
703703+ Some(limit),
704704+ None, // cursor
705705+ None, // sort
706706+ Some(&where_clause),
707707+ ).await {
708708+ Ok((records, _cursor)) => {
709709+ let values: Vec<FieldValue> = records
710710+ .into_iter()
711711+ .map(|record| {
712712+ // Convert Record to IndexedRecord
713713+ let indexed_record = crate::models::IndexedRecord {
714714+ uri: record.uri,
715715+ cid: record.cid,
716716+ did: record.did,
717717+ collection: record.collection,
718718+ value: record.json,
719719+ indexed_at: record.indexed_at.to_rfc3339(),
720720+ };
721721+ let container = RecordContainer {
722722+ record: indexed_record,
723723+ };
724724+ FieldValue::owned_any(container)
725725+ })
726726+ .collect();
727727+ Ok(Some(FieldValue::list(values)))
728728+ }
729729+ Err(e) => {
730730+ tracing::debug!("Error querying {}: {}", nsid, e);
731731+ Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
732732+ }
733733+ }
734734+ })
735735+ },
736736+ )
737737+ .argument(async_graphql::dynamic::InputValue::new(
738738+ "limit",
739739+ TypeRef::named(TypeRef::INT),
740740+ ))
741741+ );
742742+ }
743743+ _ => {
744744+ // Unknown key type, skip
745745+ continue;
746746+ }
747747+ }
748748+ }
749749+750750+ // Add reverse joins: for every other collection, add a field to query records by DID
751751+ // This enables bidirectional traversal (e.g., profile.plays and play.profile)
752752+ for collection in all_collections {
753753+ let reverse_field_name = format!("{}s", nsid_to_join_field_name(&collection.nsid));
754754+ let db_for_reverse = database.clone();
755755+ let slice_for_reverse = slice_uri.clone();
756756+ let collection_nsid = collection.nsid.clone();
757757+ let collection_type = collection.type_name.clone();
758758+759759+ object = object.field(
760760+ Field::new(
761761+ &reverse_field_name,
762762+ TypeRef::named_nn_list_nn(&collection_type),
763763+ move |ctx| {
764764+ let db = db_for_reverse.clone();
765765+ let slice = slice_for_reverse.clone();
766766+ let nsid = collection_nsid.clone();
767767+ FieldFuture::new(async move {
768768+ let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
769769+ let did = &container.record.did;
770770+771771+ // Get limit from argument, default to 50
772772+ let limit = ctx.args.get("limit")
773773+ .and_then(|v| v.i64().ok())
774774+ .map(|i| i as i32)
775775+ .unwrap_or(50)
776776+ .min(100); // Cap at 100 to prevent abuse
777777+778778+ // Build where clause to find all records of this collection for this DID
779779+ let mut where_clause = crate::models::WhereClause {
780780+ conditions: HashMap::new(),
781781+ or_conditions: None,
782782+ };
783783+ where_clause.conditions.insert(
784784+ "collection".to_string(),
785785+ crate::models::WhereCondition {
786786+ eq: Some(serde_json::Value::String(nsid.clone())),
787787+ in_values: None,
788788+ contains: None,
789789+ },
790790+ );
791791+ where_clause.conditions.insert(
792792+ "did".to_string(),
793793+ crate::models::WhereCondition {
794794+ eq: Some(serde_json::Value::String(did.clone())),
795795+ in_values: None,
796796+ contains: None,
797797+ },
798798+ );
799799+800800+ match db.get_slice_collections_records(
801801+ &slice,
802802+ Some(limit),
803803+ None, // cursor
804804+ None, // sort
805805+ Some(&where_clause),
806806+ ).await {
807807+ Ok((records, _cursor)) => {
808808+ let values: Vec<FieldValue> = records
809809+ .into_iter()
810810+ .map(|record| {
811811+ // Convert Record to IndexedRecord
812812+ let indexed_record = crate::models::IndexedRecord {
813813+ uri: record.uri,
814814+ cid: record.cid,
815815+ did: record.did,
816816+ collection: record.collection,
817817+ value: record.json,
818818+ indexed_at: record.indexed_at.to_rfc3339(),
819819+ };
820820+ let container = RecordContainer {
821821+ record: indexed_record,
822822+ };
823823+ FieldValue::owned_any(container)
824824+ })
825825+ .collect();
826826+ Ok(Some(FieldValue::list(values)))
827827+ }
828828+ Err(e) => {
829829+ tracing::debug!("Error querying {}: {}", nsid, e);
830830+ Ok(Some(FieldValue::list(Vec::<FieldValue>::new())))
831831+ }
832832+ }
833833+ })
834834+ },
835835+ )
836836+ .argument(async_graphql::dynamic::InputValue::new(
837837+ "limit",
838838+ TypeRef::named(TypeRef::INT),
839839+ ))
840840+ );
841841+ }
842842+843843+ object
844844+}
845845+846846+/// Convert serde_json::Value to async_graphql::Value
847847+fn json_to_graphql_value(val: &serde_json::Value) -> GraphQLValue {
848848+ match val {
849849+ serde_json::Value::Null => GraphQLValue::Null,
850850+ serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b),
851851+ serde_json::Value::Number(n) => {
852852+ if let Some(i) = n.as_i64() {
853853+ GraphQLValue::Number((i as i32).into())
854854+ } else if let Some(f) = n.as_f64() {
855855+ GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into())
856856+ } else {
857857+ GraphQLValue::Null
858858+ }
859859+ }
860860+ serde_json::Value::String(s) => GraphQLValue::String(s.clone()),
861861+ serde_json::Value::Array(arr) => {
862862+ GraphQLValue::List(arr.iter().map(json_to_graphql_value).collect())
863863+ }
864864+ serde_json::Value::Object(obj) => {
865865+ let mut map = async_graphql::indexmap::IndexMap::new();
866866+ for (k, v) in obj {
867867+ map.insert(
868868+ async_graphql::Name::new(k.as_str()),
869869+ json_to_graphql_value(v),
870870+ );
871871+ }
872872+ GraphQLValue::Object(map)
873873+ }
874874+ }
875875+}
876876+877877+/// Converts GraphQL type to TypeRef for async-graphql
878878+fn graphql_type_to_typeref(gql_type: &GraphQLType, is_required: bool) -> TypeRef {
879879+ match gql_type {
880880+ GraphQLType::String => {
881881+ if is_required {
882882+ TypeRef::named_nn(TypeRef::STRING)
883883+ } else {
884884+ TypeRef::named(TypeRef::STRING)
885885+ }
886886+ }
887887+ GraphQLType::Int => {
888888+ if is_required {
889889+ TypeRef::named_nn(TypeRef::INT)
890890+ } else {
891891+ TypeRef::named(TypeRef::INT)
892892+ }
893893+ }
894894+ GraphQLType::Boolean => {
895895+ if is_required {
896896+ TypeRef::named_nn(TypeRef::BOOLEAN)
897897+ } else {
898898+ TypeRef::named(TypeRef::BOOLEAN)
899899+ }
900900+ }
901901+ GraphQLType::Float => {
902902+ if is_required {
903903+ TypeRef::named_nn(TypeRef::FLOAT)
904904+ } else {
905905+ TypeRef::named(TypeRef::FLOAT)
906906+ }
907907+ }
908908+ GraphQLType::Blob => {
909909+ // Blob object type with url resolver
910910+ if is_required {
911911+ TypeRef::named_nn("Blob")
912912+ } else {
913913+ TypeRef::named("Blob")
914914+ }
915915+ }
916916+ GraphQLType::Json | GraphQLType::Ref | GraphQLType::Object(_) | GraphQLType::Union => {
917917+ // JSON scalar type - linked records and complex objects return as JSON
918918+ if is_required {
919919+ TypeRef::named_nn("JSON")
920920+ } else {
921921+ TypeRef::named("JSON")
922922+ }
923923+ }
924924+ GraphQLType::Array(inner) => {
925925+ // For arrays of primitives, use typed arrays
926926+ // For arrays of complex types, use JSON scalar
927927+ match inner.as_ref() {
928928+ GraphQLType::String | GraphQLType::Int | GraphQLType::Boolean | GraphQLType::Float => {
929929+ let inner_ref = match inner.as_ref() {
930930+ GraphQLType::String => TypeRef::STRING,
931931+ GraphQLType::Int => TypeRef::INT,
932932+ GraphQLType::Boolean => TypeRef::BOOLEAN,
933933+ GraphQLType::Float => TypeRef::FLOAT,
934934+ _ => unreachable!(),
935935+ };
936936+937937+ if is_required {
938938+ TypeRef::named_nn_list_nn(inner_ref)
939939+ } else {
940940+ TypeRef::named_list(inner_ref)
941941+ }
942942+ }
943943+ _ => {
944944+ // Arrays of complex types (objects, etc.) are just JSON
945945+ if is_required {
946946+ TypeRef::named_nn("JSON")
947947+ } else {
948948+ TypeRef::named("JSON")
949949+ }
950950+ }
951951+ }
952952+ }
953953+ }
954954+}
955955+956956+/// Creates the Blob GraphQL type with url resolver
957957+fn create_blob_type() -> Object {
958958+ let mut blob = Object::new("Blob");
959959+960960+ // ref field - CID reference
961961+ blob = blob.field(Field::new("ref", TypeRef::named_nn(TypeRef::STRING), |ctx| {
962962+ FieldFuture::new(async move {
963963+ let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
964964+ Ok(Some(GraphQLValue::from(container.blob_ref.clone())))
965965+ })
966966+ }));
967967+968968+ // mimeType field
969969+ blob = blob.field(Field::new("mimeType", TypeRef::named_nn(TypeRef::STRING), |ctx| {
970970+ FieldFuture::new(async move {
971971+ let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
972972+ Ok(Some(GraphQLValue::from(container.mime_type.clone())))
973973+ })
974974+ }));
975975+976976+ // size field
977977+ blob = blob.field(Field::new("size", TypeRef::named_nn(TypeRef::INT), |ctx| {
978978+ FieldFuture::new(async move {
979979+ let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
980980+ Ok(Some(GraphQLValue::from(container.size as i32)))
981981+ })
982982+ }));
983983+984984+ // url(preset) field with argument
985985+ blob = blob.field(
986986+ Field::new("url", TypeRef::named_nn(TypeRef::STRING), |ctx| {
987987+ FieldFuture::new(async move {
988988+ let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
989989+990990+ // Get preset argument, default to "feed_fullsize"
991991+ let preset: String = match ctx.args.get("preset") {
992992+ Some(val) => val.string().unwrap_or("feed_fullsize").to_string(),
993993+ None => "feed_fullsize".to_string(),
994994+ };
995995+996996+ // Build CDN URL: https://cdn.bsky.app/img/{preset}/plain/{did}/{cid}@jpeg
997997+ let cdn_base_url = "https://cdn.bsky.app/img";
998998+ let url = format!(
999999+ "{}/{}/plain/{}/{}@jpeg",
10001000+ cdn_base_url,
10011001+ preset,
10021002+ container.did,
10031003+ container.blob_ref
10041004+ );
10051005+10061006+ Ok(Some(GraphQLValue::from(url)))
10071007+ })
10081008+ })
10091009+ .argument(async_graphql::dynamic::InputValue::new(
10101010+ "preset",
10111011+ TypeRef::named(TypeRef::STRING),
10121012+ ))
10131013+ .description("Generate CDN URL for the blob with the specified preset (avatar, banner, feed_thumbnail, feed_fullsize)"),
10141014+ );
10151015+10161016+ blob
10171017+}
10181018+10191019+/// Creates the SyncResult GraphQL type for mutation responses
10201020+fn create_sync_result_type() -> Object {
10211021+ let mut sync_result = Object::new("SyncResult");
10221022+10231023+ sync_result = sync_result.field(Field::new("success", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
10241024+ FieldFuture::new(async move {
10251025+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
10261026+ .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
10271027+ if let GraphQLValue::Object(obj) = value {
10281028+ if let Some(success) = obj.get("success") {
10291029+ return Ok(Some(success.clone()));
10301030+ }
10311031+ }
10321032+ Ok(None)
10331033+ })
10341034+ }));
10351035+10361036+ sync_result = sync_result.field(Field::new("reposProcessed", TypeRef::named_nn(TypeRef::INT), |ctx| {
10371037+ FieldFuture::new(async move {
10381038+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
10391039+ .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
10401040+ if let GraphQLValue::Object(obj) = value {
10411041+ if let Some(repos) = obj.get("reposProcessed") {
10421042+ return Ok(Some(repos.clone()));
10431043+ }
10441044+ }
10451045+ Ok(None)
10461046+ })
10471047+ }));
10481048+10491049+ sync_result = sync_result.field(Field::new("recordsSynced", TypeRef::named_nn(TypeRef::INT), |ctx| {
10501050+ FieldFuture::new(async move {
10511051+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
10521052+ .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
10531053+ if let GraphQLValue::Object(obj) = value {
10541054+ if let Some(records) = obj.get("recordsSynced") {
10551055+ return Ok(Some(records.clone()));
10561056+ }
10571057+ }
10581058+ Ok(None)
10591059+ })
10601060+ }));
10611061+10621062+ sync_result = sync_result.field(Field::new("timedOut", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
10631063+ FieldFuture::new(async move {
10641064+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
10651065+ .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
10661066+ if let GraphQLValue::Object(obj) = value {
10671067+ if let Some(timed_out) = obj.get("timedOut") {
10681068+ return Ok(Some(timed_out.clone()));
10691069+ }
10701070+ }
10711071+ Ok(None)
10721072+ })
10731073+ }));
10741074+10751075+ sync_result = sync_result.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| {
10761076+ FieldFuture::new(async move {
10771077+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
10781078+ .ok_or_else(|| Error::new("Failed to downcast sync result"))?;
10791079+ if let GraphQLValue::Object(obj) = value {
10801080+ if let Some(message) = obj.get("message") {
10811081+ return Ok(Some(message.clone()));
10821082+ }
10831083+ }
10841084+ Ok(None)
10851085+ })
10861086+ }));
10871087+10881088+ sync_result
10891089+}
10901090+10911091+/// Creates the SortDirection enum type
10921092+fn create_sort_direction_enum() -> Enum {
10931093+ Enum::new("SortDirection")
10941094+ .item(EnumItem::new("asc"))
10951095+ .item(EnumItem::new("desc"))
10961096+}
10971097+10981098+/// Creates the SortField input type
10991099+fn create_sort_field_input() -> InputObject {
11001100+ InputObject::new("SortField")
11011101+ .field(InputValue::new("field", TypeRef::named_nn(TypeRef::STRING)))
11021102+ .field(InputValue::new(
11031103+ "direction",
11041104+ TypeRef::named_nn("SortDirection"),
11051105+ ))
11061106+}
11071107+11081108+/// Creates the StringCondition input type for string field filtering
11091109+fn create_string_condition_input() -> InputObject {
11101110+ InputObject::new("StringCondition")
11111111+ .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
11121112+ .field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING)))
11131113+ .field(InputValue::new("contains", TypeRef::named(TypeRef::STRING)))
11141114+}
11151115+11161116+/// Creates the IntCondition input type for int field filtering
11171117+fn create_int_condition_input() -> InputObject {
11181118+ InputObject::new("IntCondition")
11191119+ .field(InputValue::new("eq", TypeRef::named(TypeRef::INT)))
11201120+ .field(InputValue::new("in", TypeRef::named_list(TypeRef::INT)))
11211121+}
11221122+11231123+/// Creates the PageInfo type for connection pagination
11241124+fn create_page_info_type() -> Object {
11251125+ let mut page_info = Object::new("PageInfo");
11261126+11271127+ page_info = page_info.field(Field::new("hasNextPage", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
11281128+ FieldFuture::new(async move {
11291129+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
11301130+ .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
11311131+ if let GraphQLValue::Object(obj) = value {
11321132+ if let Some(has_next) = obj.get("hasNextPage") {
11331133+ return Ok(Some(has_next.clone()));
11341134+ }
11351135+ }
11361136+ Ok(Some(GraphQLValue::from(false)))
11371137+ })
11381138+ }));
11391139+11401140+ page_info = page_info.field(Field::new("hasPreviousPage", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
11411141+ FieldFuture::new(async move {
11421142+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
11431143+ .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
11441144+ if let GraphQLValue::Object(obj) = value {
11451145+ if let Some(has_prev) = obj.get("hasPreviousPage") {
11461146+ return Ok(Some(has_prev.clone()));
11471147+ }
11481148+ }
11491149+ Ok(Some(GraphQLValue::from(false)))
11501150+ })
11511151+ }));
11521152+11531153+ page_info = page_info.field(Field::new("startCursor", TypeRef::named(TypeRef::STRING), |ctx| {
11541154+ FieldFuture::new(async move {
11551155+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
11561156+ .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
11571157+ if let GraphQLValue::Object(obj) = value {
11581158+ if let Some(cursor) = obj.get("startCursor") {
11591159+ return Ok(Some(cursor.clone()));
11601160+ }
11611161+ }
11621162+ Ok(None)
11631163+ })
11641164+ }));
11651165+11661166+ page_info = page_info.field(Field::new("endCursor", TypeRef::named(TypeRef::STRING), |ctx| {
11671167+ FieldFuture::new(async move {
11681168+ let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
11691169+ .ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
11701170+ if let GraphQLValue::Object(obj) = value {
11711171+ if let Some(cursor) = obj.get("endCursor") {
11721172+ return Ok(Some(cursor.clone()));
11731173+ }
11741174+ }
11751175+ Ok(None)
11761176+ })
11771177+ }));
11781178+11791179+ page_info
11801180+}
11811181+11821182+/// Connection data structure that holds all connection fields
11831183+#[derive(Clone)]
11841184+struct ConnectionData {
11851185+ total_count: i32,
11861186+ has_next_page: bool,
11871187+ end_cursor: Option<String>,
11881188+ nodes: Vec<RecordContainer>,
11891189+}
11901190+11911191+/// Edge data structure for Relay connections
11921192+#[derive(Clone)]
11931193+struct EdgeData {
11941194+ node: RecordContainer,
11951195+ cursor: String,
11961196+}
11971197+11981198+/// Creates an Edge type for a given record type
11991199+/// Example: For "Post" creates "PostEdge" with node and cursor
12001200+fn create_edge_type(record_type_name: &str) -> Object {
12011201+ let edge_name = format!("{}Edge", record_type_name);
12021202+ let mut edge = Object::new(&edge_name);
12031203+12041204+ // Add node field
12051205+ let record_type = record_type_name.to_string();
12061206+ edge = edge.field(Field::new("node", TypeRef::named_nn(&record_type), |ctx| {
12071207+ FieldFuture::new(async move {
12081208+ let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?;
12091209+ Ok(Some(FieldValue::owned_any(edge_data.node.clone())))
12101210+ })
12111211+ }));
12121212+12131213+ // Add cursor field
12141214+ edge = edge.field(Field::new("cursor", TypeRef::named_nn(TypeRef::STRING), |ctx| {
12151215+ FieldFuture::new(async move {
12161216+ let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?;
12171217+ Ok(Some(GraphQLValue::from(edge_data.cursor.clone())))
12181218+ })
12191219+ }));
12201220+12211221+ edge
12221222+}
12231223+12241224+/// Creates a Connection type for a given record type
12251225+/// Example: For "Post" creates "PostConnection" with edges, pageInfo, and totalCount
12261226+fn create_connection_type(record_type_name: &str) -> Object {
12271227+ let connection_name = format!("{}Connection", record_type_name);
12281228+ let mut connection = Object::new(&connection_name);
12291229+12301230+ // Add totalCount field
12311231+ connection = connection.field(Field::new("totalCount", TypeRef::named_nn(TypeRef::INT), |ctx| {
12321232+ FieldFuture::new(async move {
12331233+ let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
12341234+ Ok(Some(GraphQLValue::from(data.total_count)))
12351235+ })
12361236+ }));
12371237+12381238+ // Add pageInfo field
12391239+ connection = connection.field(Field::new("pageInfo", TypeRef::named_nn("PageInfo"), |ctx| {
12401240+ FieldFuture::new(async move {
12411241+ let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
12421242+12431243+ let mut page_info = async_graphql::indexmap::IndexMap::new();
12441244+ page_info.insert(
12451245+ async_graphql::Name::new("hasNextPage"),
12461246+ GraphQLValue::from(data.has_next_page)
12471247+ );
12481248+ // For forward pagination only, hasPreviousPage is always false
12491249+ page_info.insert(
12501250+ async_graphql::Name::new("hasPreviousPage"),
12511251+ GraphQLValue::from(false)
12521252+ );
12531253+12541254+ // Add startCursor (first node's cid if available)
12551255+ if !data.nodes.is_empty() {
12561256+ if let Some(first_record) = data.nodes.first() {
12571257+ let start_cursor = general_purpose::URL_SAFE_NO_PAD.encode(first_record.record.cid.clone());
12581258+ page_info.insert(
12591259+ async_graphql::Name::new("startCursor"),
12601260+ GraphQLValue::from(start_cursor)
12611261+ );
12621262+ }
12631263+ }
12641264+12651265+ // Add endCursor
12661266+ if let Some(ref cursor) = data.end_cursor {
12671267+ page_info.insert(
12681268+ async_graphql::Name::new("endCursor"),
12691269+ GraphQLValue::from(cursor.clone())
12701270+ );
12711271+ }
12721272+12731273+ Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info))))
12741274+ })
12751275+ }));
12761276+12771277+ // Add edges field (Relay standard)
12781278+ let edge_type = format!("{}Edge", record_type_name);
12791279+ connection = connection.field(Field::new("edges", TypeRef::named_nn_list_nn(&edge_type), |ctx| {
12801280+ FieldFuture::new(async move {
12811281+ let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
12821282+12831283+ let field_values: Vec<FieldValue<'_>> = data.nodes.iter()
12841284+ .map(|node| {
12851285+ // Use base64-encoded CID as cursor
12861286+ let cursor = general_purpose::URL_SAFE_NO_PAD.encode(node.record.cid.clone());
12871287+ let edge = EdgeData {
12881288+ node: node.clone(),
12891289+ cursor,
12901290+ };
12911291+ FieldValue::owned_any(edge)
12921292+ })
12931293+ .collect();
12941294+12951295+ Ok(Some(FieldValue::list(field_values)))
12961296+ })
12971297+ }));
12981298+12991299+ // Add nodes field (convenience, direct access to records without edges wrapper)
13001300+ connection = connection.field(Field::new("nodes", TypeRef::named_nn_list_nn(record_type_name), |ctx| {
13011301+ FieldFuture::new(async move {
13021302+ let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
13031303+13041304+ let field_values: Vec<FieldValue<'_>> = data.nodes.iter()
13051305+ .map(|node| FieldValue::owned_any(node.clone()))
13061306+ .collect();
13071307+13081308+ Ok(Some(FieldValue::list(field_values)))
13091309+ })
13101310+ }));
13111311+13121312+ connection
13131313+}
13141314+13151315+/// Creates the Mutation root type with sync operations
13161316+fn create_mutation_type(database: Database, slice_uri: String) -> Object {
13171317+ let mut mutation = Object::new("Mutation");
13181318+13191319+ // Add syncUserCollections mutation
13201320+ let db_clone = database.clone();
13211321+ let slice_clone = slice_uri.clone();
13221322+13231323+ mutation = mutation.field(
13241324+ Field::new(
13251325+ "syncUserCollections",
13261326+ TypeRef::named_nn("SyncResult"),
13271327+ move |ctx| {
13281328+ let db = db_clone.clone();
13291329+ let slice = slice_clone.clone();
13301330+13311331+ FieldFuture::new(async move {
13321332+ let did = ctx.args.get("did")
13331333+ .and_then(|v| v.string().ok())
13341334+ .ok_or_else(|| Error::new("did argument is required"))?;
13351335+13361336+ // Create sync service and call sync_user_collections
13371337+ let cache_backend = crate::cache::CacheFactory::create_cache(
13381338+ crate::cache::CacheBackend::InMemory { ttl_seconds: None }
13391339+ ).await.map_err(|e| Error::new(format!("Failed to create cache: {}", e)))?;
13401340+ let cache = Arc::new(Mutex::new(crate::cache::SliceCache::new(cache_backend)));
13411341+ let sync_service = crate::sync::SyncService::with_cache(
13421342+ db.clone(),
13431343+ std::env::var("RELAY_ENDPOINT")
13441344+ .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()),
13451345+ cache,
13461346+ );
13471347+13481348+ let result = sync_service
13491349+ .sync_user_collections(did, &slice, 30) // 30 second timeout
13501350+ .await
13511351+ .map_err(|e| Error::new(format!("Sync failed: {}", e)))?;
13521352+13531353+ // Convert result to GraphQL object
13541354+ let mut obj = async_graphql::indexmap::IndexMap::new();
13551355+ obj.insert(async_graphql::Name::new("success"), GraphQLValue::from(result.success));
13561356+ obj.insert(async_graphql::Name::new("reposProcessed"), GraphQLValue::from(result.repos_processed));
13571357+ obj.insert(async_graphql::Name::new("recordsSynced"), GraphQLValue::from(result.records_synced));
13581358+ obj.insert(async_graphql::Name::new("timedOut"), GraphQLValue::from(result.timed_out));
13591359+ obj.insert(async_graphql::Name::new("message"), GraphQLValue::from(result.message));
13601360+13611361+ Ok(Some(FieldValue::owned_any(GraphQLValue::Object(obj))))
13621362+ })
13631363+ },
13641364+ )
13651365+ .argument(async_graphql::dynamic::InputValue::new(
13661366+ "did",
13671367+ TypeRef::named_nn(TypeRef::STRING),
13681368+ ))
13691369+ .description("Sync user collections for a given DID")
13701370+ );
13711371+13721372+ mutation
13731373+}
13741374+13751375+/// Converts NSID to GraphQL type name
13761376+/// Example: app.bsky.feed.post -> AppBskyFeedPost
13771377+fn nsid_to_type_name(nsid: &str) -> String {
13781378+ nsid.split('.')
13791379+ .map(|part| {
13801380+ let mut chars = part.chars();
13811381+ match chars.next() {
13821382+ None => String::new(),
13831383+ Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
13841384+ }
13851385+ })
13861386+ .collect::<Vec<_>>()
13871387+ .join("")
13881388+}
13891389+13901390+/// Converts NSID to GraphQL query name in camelCase and pluralized
13911391+/// Example: app.bsky.feed.post -> appBskyFeedPosts
13921392+/// Example: fm.teal.alpha.feed.play -> fmTealAlphaFeedPlays
13931393+fn nsid_to_query_name(nsid: &str) -> String {
13941394+ // First convert to camelCase like join fields
13951395+ let camel_case = nsid_to_join_field_name(nsid);
13961396+13971397+ // Then pluralize the end
13981398+ if camel_case.ends_with("s") || camel_case.ends_with("x") || camel_case.ends_with("ch") || camel_case.ends_with("sh") {
13991399+ format!("{}es", camel_case) // status -> statuses, box -> boxes
14001400+ } else if camel_case.ends_with("y") && camel_case.len() > 1 {
14011401+ let chars: Vec<char> = camel_case.chars().collect();
14021402+ if chars.len() > 1 && !['a', 'e', 'i', 'o', 'u'].contains(&chars[chars.len() - 2]) {
14031403+ format!("{}ies", &camel_case[..camel_case.len() - 1]) // party -> parties
14041404+ } else {
14051405+ format!("{}s", camel_case) // day -> days
14061406+ }
14071407+ } else {
14081408+ format!("{}s", camel_case) // post -> posts
14091409+ }
14101410+}
14111411+14121412+/// Converts NSID to GraphQL join field name in camelCase
14131413+/// Example: app.bsky.actor.profile -> appBskyActorProfile
14141414+fn nsid_to_join_field_name(nsid: &str) -> String {
14151415+ let parts: Vec<&str> = nsid.split('.').collect();
14161416+ if parts.is_empty() {
14171417+ return nsid.to_string();
14181418+ }
14191419+14201420+ // First part is lowercase, rest are capitalized
14211421+ let mut result = parts[0].to_string();
14221422+ for part in &parts[1..] {
14231423+ let mut chars = part.chars();
14241424+ if let Some(first) = chars.next() {
14251425+ result.push_str(&first.to_uppercase().collect::<String>());
14261426+ result.push_str(chars.as_str());
14271427+ }
14281428+ }
14291429+14301430+ result
14311431+}
+166
api/src/graphql/types.rs
···11+//! GraphQL type definitions and mappings from AT Protocol lexicons
22+33+use serde_json::Value;
44+55+/// Represents a mapped GraphQL field from a lexicon property
66+#[derive(Debug, Clone)]
77+pub struct GraphQLField {
88+ pub name: String,
99+ pub field_type: GraphQLType,
1010+ pub is_required: bool,
1111+}
1212+1313+/// GraphQL type representation mapped from lexicon types
1414+#[derive(Debug, Clone)]
1515+pub enum GraphQLType {
1616+ String,
1717+ Int,
1818+ Boolean,
1919+ Float,
2020+ /// Reference to another record (for strongRef)
2121+ Ref,
2222+ /// Array of a type
2323+ Array(Box<GraphQLType>),
2424+ /// Object with nested fields
2525+ Object(Vec<GraphQLField>),
2626+ /// Union of multiple types
2727+ Union,
2828+ /// Blob reference with CDN URL support
2929+ Blob,
3030+ /// Any JSON value
3131+ Json,
3232+}
3333+3434+/// Maps AT Protocol lexicon type to GraphQL type
3535+pub fn map_lexicon_type_to_graphql(
3636+ type_name: &str,
3737+ lexicon_def: &Value,
3838+) -> GraphQLType {
3939+ match type_name {
4040+ "string" => GraphQLType::String,
4141+ "integer" => GraphQLType::Int,
4242+ "boolean" => GraphQLType::Boolean,
4343+ "number" => GraphQLType::Float,
4444+ "bytes" => GraphQLType::String, // Base64 encoded
4545+ "cid-link" => GraphQLType::String,
4646+ "blob" => GraphQLType::Blob,
4747+ "unknown" => GraphQLType::Json,
4848+ "null" => GraphQLType::Json,
4949+ "ref" => {
5050+ // Check if this is a strongRef (link to another record)
5151+ let ref_name = lexicon_def
5252+ .get("ref")
5353+ .and_then(|r| r.as_str())
5454+ .unwrap_or("");
5555+5656+ if ref_name == "com.atproto.repo.strongRef" {
5757+ GraphQLType::Ref
5858+ } else {
5959+ GraphQLType::Json
6060+ }
6161+ }
6262+ "array" => {
6363+ let items = lexicon_def.get("items");
6464+ let item_type = if let Some(items_obj) = items {
6565+ let item_type_name = items_obj
6666+ .get("type")
6767+ .and_then(|t| t.as_str())
6868+ .unwrap_or("unknown");
6969+ map_lexicon_type_to_graphql(item_type_name, items_obj)
7070+ } else {
7171+ GraphQLType::Json
7272+ };
7373+ GraphQLType::Array(Box::new(item_type))
7474+ }
7575+ "object" => {
7676+ let properties = lexicon_def
7777+ .get("properties")
7878+ .and_then(|p| p.as_object());
7979+8080+ let required_fields: Vec<String> = lexicon_def
8181+ .get("required")
8282+ .and_then(|r| r.as_array())
8383+ .map(|arr| {
8484+ arr.iter()
8585+ .filter_map(|v| v.as_str().map(|s| s.to_string()))
8686+ .collect()
8787+ })
8888+ .unwrap_or_default();
8989+9090+ if let Some(props) = properties {
9191+ let fields = props
9292+ .iter()
9393+ .map(|(field_name, field_def)| {
9494+ let field_type_name = field_def
9595+ .get("type")
9696+ .and_then(|t| t.as_str())
9797+ .unwrap_or("unknown");
9898+9999+ GraphQLField {
100100+ name: field_name.clone(),
101101+ field_type: map_lexicon_type_to_graphql(
102102+ field_type_name,
103103+ field_def,
104104+ ),
105105+ is_required: required_fields.contains(&field_name.to_string()),
106106+ }
107107+ })
108108+ .collect();
109109+110110+ GraphQLType::Object(fields)
111111+ } else {
112112+ GraphQLType::Json
113113+ }
114114+ }
115115+ "union" => {
116116+ GraphQLType::Union
117117+ }
118118+ _ => GraphQLType::Json,
119119+ }
120120+}
121121+122122+/// Extract collection schema from lexicon definitions
123123+pub fn extract_collection_fields(
124124+ lexicon_defs: &Value,
125125+) -> Vec<GraphQLField> {
126126+ let main_def = lexicon_defs
127127+ .get("main")
128128+ .or_else(|| lexicon_defs.get("record"));
129129+130130+ if let Some(main) = main_def {
131131+ let type_name = main
132132+ .get("type")
133133+ .and_then(|t| t.as_str())
134134+ .unwrap_or("object");
135135+136136+ // For "record" type, the actual object definition is nested under "record" field
137137+ let object_def = if type_name == "record" {
138138+ main.get("record").unwrap_or(main)
139139+ } else {
140140+ main
141141+ };
142142+143143+ if type_name == "record" || type_name == "object" {
144144+ let object_type_name = object_def
145145+ .get("type")
146146+ .and_then(|t| t.as_str())
147147+ .unwrap_or("object");
148148+149149+ if let GraphQLType::Object(fields) = map_lexicon_type_to_graphql(object_type_name, object_def) {
150150+ return fields;
151151+ }
152152+ }
153153+ }
154154+155155+ vec![]
156156+}
157157+158158+/// Extract the record key type from lexicon definitions
159159+/// Returns Some("tid"), Some("literal:self"), Some("any"), or None
160160+pub fn extract_record_key(lexicon_defs: &Value) -> Option<String> {
161161+ lexicon_defs
162162+ .get("main")?
163163+ .get("key")
164164+ .and_then(|k| k.as_str())
165165+ .map(|s| s.to_string())
166166+}