Highly ambitious ATProtocol AppView service and sdks
at main 460 lines 19 kB view raw
1//! GraphQL schema extensions for cross-collection record queries 2 3use async_graphql::dynamic::{Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef}; 4use async_graphql::Value as GraphQLValue; 5use serde_json::Value; 6 7use crate::models::{Record, WhereClause}; 8use crate::database::Database; 9 10/// Container for record data 11#[derive(Clone)] 12pub struct SliceRecordContainer { 13 pub uri: String, 14 pub cid: String, 15 pub did: String, 16 pub collection: String, 17 pub value: Value, 18 pub indexed_at: String, 19} 20 21/// Container for slice records response 22#[derive(Clone)] 23pub struct SliceRecordsConnection { 24 pub records: Vec<Record>, 25 pub total_count: i64, 26 pub has_next_page: bool, 27 pub end_cursor: Option<String>, 28} 29 30/// Edge data structure for Relay connections 31#[derive(Clone)] 32pub struct SliceRecordEdge { 33 pub node: SliceRecordContainer, 34 pub cursor: String, 35} 36 37/// Creates the SliceRecord GraphQL type 38pub fn create_slice_record_type() -> Object { 39 let mut slice_record = Object::new("SliceRecord"); 40 41 slice_record = slice_record.field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| { 42 FieldFuture::new(async move { 43 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?; 44 Ok(Some(GraphQLValue::from(container.uri.clone()))) 45 }) 46 })); 47 48 slice_record = slice_record.field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| { 49 FieldFuture::new(async move { 50 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?; 51 Ok(Some(GraphQLValue::from(container.cid.clone()))) 52 }) 53 })); 54 55 slice_record = slice_record.field(Field::new("did", TypeRef::named_nn(TypeRef::STRING), |ctx| { 56 FieldFuture::new(async move { 57 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?; 58 Ok(Some(GraphQLValue::from(container.did.clone()))) 59 }) 60 })); 61 62 slice_record = slice_record.field(Field::new("collection", TypeRef::named_nn(TypeRef::STRING), |ctx| { 63 FieldFuture::new(async move { 64 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?; 65 Ok(Some(GraphQLValue::from(container.collection.clone()))) 66 }) 67 })); 68 69 slice_record = slice_record.field(Field::new("value", TypeRef::named_nn(TypeRef::STRING), |ctx| { 70 FieldFuture::new(async move { 71 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?; 72 let json_str = serde_json::to_string(&container.value) 73 .unwrap_or_else(|_| "{}".to_string()); 74 Ok(Some(GraphQLValue::from(json_str))) 75 }) 76 })); 77 78 slice_record = slice_record.field(Field::new("indexedAt", TypeRef::named_nn(TypeRef::STRING), |ctx| { 79 FieldFuture::new(async move { 80 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?; 81 Ok(Some(GraphQLValue::from(container.indexed_at.clone()))) 82 }) 83 })); 84 85 slice_record 86} 87 88/// Creates the SliceRecordEdge GraphQL type 89pub fn create_slice_record_edge_type() -> Object { 90 let mut edge = Object::new("SliceRecordEdge"); 91 92 edge = edge.field(Field::new("node", TypeRef::named_nn("SliceRecord"), |ctx| { 93 FieldFuture::new(async move { 94 let edge_data = ctx.parent_value.try_downcast_ref::<SliceRecordEdge>()?; 95 Ok(Some(FieldValue::owned_any(edge_data.node.clone()))) 96 }) 97 })); 98 99 edge = edge.field(Field::new("cursor", TypeRef::named_nn(TypeRef::STRING), |ctx| { 100 FieldFuture::new(async move { 101 let edge_data = ctx.parent_value.try_downcast_ref::<SliceRecordEdge>()?; 102 Ok(Some(GraphQLValue::from(edge_data.cursor.clone()))) 103 }) 104 })); 105 106 edge 107} 108 109/// Creates the SliceRecordsConnection GraphQL type 110pub fn create_slice_records_connection_type() -> Object { 111 let mut connection = Object::new("SliceRecordsConnection"); 112 113 // Add totalCount field 114 connection = connection.field(Field::new("totalCount", TypeRef::named_nn(TypeRef::INT), |ctx| { 115 FieldFuture::new(async move { 116 let container = ctx.parent_value.try_downcast_ref::<SliceRecordsConnection>()?; 117 Ok(Some(GraphQLValue::from(container.total_count as i32))) 118 }) 119 })); 120 121 // Add edges field (Relay standard) 122 connection = connection.field(Field::new("edges", TypeRef::named_nn_list_nn("SliceRecordEdge"), |ctx| { 123 FieldFuture::new(async move { 124 let container = ctx.parent_value.try_downcast_ref::<SliceRecordsConnection>()?; 125 let edges: Vec<FieldValue<'_>> = container.records 126 .iter() 127 .map(|record| { 128 let record_container = SliceRecordContainer { 129 uri: record.uri.clone(), 130 cid: record.cid.clone(), 131 did: record.did.clone(), 132 collection: record.collection.clone(), 133 value: record.json.clone(), 134 indexed_at: record.indexed_at.to_rfc3339(), 135 }; 136 let edge = SliceRecordEdge { 137 node: record_container, 138 cursor: record.cid.clone(), // Use CID as cursor 139 }; 140 FieldValue::owned_any(edge) 141 }) 142 .collect(); 143 Ok(Some(FieldValue::list(edges))) 144 }) 145 })); 146 147 // Add pageInfo field 148 connection = connection.field(Field::new("pageInfo", TypeRef::named_nn("PageInfo"), |ctx| { 149 FieldFuture::new(async move { 150 let container = ctx.parent_value.try_downcast_ref::<SliceRecordsConnection>()?; 151 152 let mut page_info = async_graphql::indexmap::IndexMap::new(); 153 page_info.insert( 154 async_graphql::Name::new("hasNextPage"), 155 GraphQLValue::from(container.has_next_page), 156 ); 157 page_info.insert( 158 async_graphql::Name::new("hasPreviousPage"), 159 GraphQLValue::from(false), 160 ); 161 162 // Add endCursor 163 if let Some(ref cursor) = container.end_cursor { 164 page_info.insert( 165 async_graphql::Name::new("endCursor"), 166 GraphQLValue::from(cursor.clone()), 167 ); 168 } 169 170 // Add startCursor (first record's CID if available) 171 if let Some(first_record) = container.records.first() { 172 page_info.insert( 173 async_graphql::Name::new("startCursor"), 174 GraphQLValue::from(first_record.cid.clone()), 175 ); 176 } 177 178 Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info)))) 179 }) 180 })); 181 182 connection 183} 184 185/// Parse a where input object into a WhereClause 186fn parse_where_input(value: async_graphql::dynamic::ValueAccessor) -> Option<WhereClause> { 187 if let Ok(obj) = value.object() { 188 let mut conditions = std::collections::HashMap::new(); 189 let mut or_clauses: Vec<WhereClause> = Vec::new(); 190 191 // Manually check for each filter field 192 let fields = ["collection", "did", "uri", "cid", "indexedAt", "json"]; 193 for field_name in fields { 194 if let Some(filter_value) = obj.get(field_name) { 195 if let Ok(filter_obj) = filter_value.object() { 196 let mut condition = crate::database::WhereCondition { 197 eq: None, 198 in_values: None, 199 contains: None, 200 fuzzy: None, 201 gt: None, 202 gte: None, 203 lt: None, 204 lte: None, 205 }; 206 207 if let Some(eq_val) = filter_obj.get("eq") { 208 if let Ok(s) = eq_val.string() { 209 condition.eq = Some(serde_json::Value::String(s.to_string())); 210 } 211 } 212 213 if let Some(contains_val) = filter_obj.get("contains") { 214 if let Ok(s) = contains_val.string() { 215 condition.contains = Some(s.to_string()); 216 } 217 } 218 219 conditions.insert(field_name.to_string(), condition); 220 } 221 } 222 } 223 224 // Handle OR conditions 225 if let Some(or_value) = obj.get("or") { 226 if let Ok(or_list) = or_value.list() { 227 let len = or_list.len(); 228 for i in 0..len { 229 if let Some(or_item) = or_list.get(i) { 230 if let Some(or_clause) = parse_where_input(or_item) { 231 or_clauses.push(or_clause); 232 } 233 } 234 } 235 } 236 } 237 238 if !conditions.is_empty() || !or_clauses.is_empty() { 239 Some(WhereClause { 240 conditions, 241 or_conditions: None, 242 and: None, 243 or: if or_clauses.is_empty() { None } else { Some(or_clauses) }, 244 }) 245 } else { 246 None 247 } 248 } else { 249 None 250 } 251} 252 253/// Creates the SliceRecordsWhereInput type for filtering 254pub fn create_slice_records_where_input() -> InputObject { 255 InputObject::new("SliceRecordsWhereInput") 256 .field(InputValue::new("collection", TypeRef::named("StringFilter"))) 257 .field(InputValue::new("did", TypeRef::named("StringFilter"))) 258 .field(InputValue::new("uri", TypeRef::named("StringFilter"))) 259 .field(InputValue::new("cid", TypeRef::named("StringFilter"))) 260 .field(InputValue::new("indexedAt", TypeRef::named("DateTimeFilter"))) 261 .field(InputValue::new("json", TypeRef::named("StringFilter"))) 262 .field(InputValue::new("or", TypeRef::named_list("SliceRecordsWhereInput"))) 263} 264 265/// Container for delete slice records response 266#[derive(Clone)] 267pub struct DeleteSliceRecordsOutput { 268 pub message: String, 269 pub records_deleted: i64, 270 pub actors_deleted: i64, 271} 272 273/// Creates the DeleteSliceRecordsOutput GraphQL type 274pub fn create_delete_slice_records_output_type() -> Object { 275 let mut output = Object::new("DeleteSliceRecordsOutput"); 276 277 output = output.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| { 278 FieldFuture::new(async move { 279 let container = ctx.parent_value.try_downcast_ref::<DeleteSliceRecordsOutput>()?; 280 Ok(Some(GraphQLValue::from(container.message.clone()))) 281 }) 282 })); 283 284 output = output.field(Field::new("recordsDeleted", TypeRef::named_nn(TypeRef::INT), |ctx| { 285 FieldFuture::new(async move { 286 let container = ctx.parent_value.try_downcast_ref::<DeleteSliceRecordsOutput>()?; 287 Ok(Some(GraphQLValue::from(container.records_deleted as i32))) 288 }) 289 })); 290 291 output = output.field(Field::new("actorsDeleted", TypeRef::named_nn(TypeRef::INT), |ctx| { 292 FieldFuture::new(async move { 293 let container = ctx.parent_value.try_downcast_ref::<DeleteSliceRecordsOutput>()?; 294 Ok(Some(GraphQLValue::from(container.actors_deleted as i32))) 295 }) 296 })); 297 298 output 299} 300 301/// Add sliceRecords query to the Query type 302pub fn add_slice_records_query( 303 query: Object, 304 database: Database, 305) -> Object { 306 let db_for_records = database.clone(); 307 308 query.field( 309 Field::new( 310 "sliceRecords", 311 TypeRef::named_nn("SliceRecordsConnection"), 312 move |ctx| { 313 let db = db_for_records.clone(); 314 FieldFuture::new(async move { 315 // Get slice URI - either directly or by looking up via actorHandle + rkey 316 let slice_uri = if let Some(uri) = ctx.args.get("sliceUri").and_then(|v| v.string().ok()) { 317 uri.to_string() 318 } else if let (Some(actor_handle), Some(rkey)) = ( 319 ctx.args.get("actorHandle").and_then(|v| v.string().ok()), 320 ctx.args.get("rkey").and_then(|v| v.string().ok()), 321 ) { 322 // Look up the slice by actorHandle and rkey using the dedicated database method 323 db.get_slice_uri_by_handle_and_rkey(actor_handle, rkey) 324 .await 325 .map_err(|e| async_graphql::Error::new(format!("Database error: {}", e)))? 326 .ok_or_else(|| async_graphql::Error::new("Slice not found"))? 327 } else { 328 return Err(async_graphql::Error::new("Either sliceUri or both actorHandle and rkey are required")); 329 }; 330 331 // Relay-style pagination arguments 332 let limit: i32 = ctx.args.get("first") 333 .and_then(|v| v.i64().ok()) 334 .map(|v| v as i32) 335 .unwrap_or(50); 336 337 let cursor: Option<&str> = ctx.args.get("after") 338 .and_then(|v| v.string().ok()); 339 340 // Parse where clause if provided 341 let where_clause: Option<WhereClause> = ctx.args.get("where") 342 .and_then(|v| parse_where_input(v)); 343 344 // Query the database for records with default sort order 345 let default_sort = vec![ 346 crate::database::SortField { 347 field: "indexed_at".to_string(), 348 direction: "desc".to_string(), 349 } 350 ]; 351 let (records, next_cursor) = db.get_slice_collections_records( 352 &slice_uri, 353 Some(limit), 354 cursor, 355 Some(&default_sort), 356 where_clause.as_ref(), 357 ) 358 .await 359 .map_err(|e| async_graphql::Error::new(format!("Failed to fetch records: {}", e)))?; 360 361 // Get total count for the query 362 let total_count = db.count_slice_collections_records( 363 &slice_uri, 364 where_clause.as_ref(), 365 ) 366 .await 367 .map_err(|e| async_graphql::Error::new(format!("Failed to count records: {}", e)))?; 368 369 // Determine if there's a next page 370 let has_next_page = next_cursor.is_some(); 371 372 let connection = SliceRecordsConnection { 373 records, 374 total_count, 375 has_next_page, 376 end_cursor: next_cursor, 377 }; 378 379 Ok(Some(FieldValue::owned_any(connection))) 380 }) 381 }, 382 ) 383 .argument(InputValue::new("sliceUri", TypeRef::named(TypeRef::STRING))) 384 .argument(InputValue::new("actorHandle", TypeRef::named(TypeRef::STRING))) 385 .argument(InputValue::new("rkey", TypeRef::named(TypeRef::STRING))) 386 .argument(InputValue::new("first", TypeRef::named(TypeRef::INT))) 387 .argument(InputValue::new("after", TypeRef::named(TypeRef::STRING))) 388 .argument(InputValue::new("where", TypeRef::named("SliceRecordsWhereInput"))) 389 .description("Query records across all collections in a slice with filtering and pagination. Provide either sliceUri or both actorHandle and rkey.") 390 ) 391} 392 393/// Add deleteSliceRecords mutation to the Mutation type 394pub fn add_delete_slice_records_mutation( 395 mutation: Object, 396 slice_uri: String, 397) -> Object { 398 mutation.field( 399 Field::new( 400 "deleteSliceRecords", 401 TypeRef::named_nn("DeleteSliceRecordsOutput"), 402 move |ctx| { 403 let current_slice = slice_uri.clone(); 404 405 FieldFuture::new(async move { 406 // Get user_did from context (set by auth middleware) 407 let user_did = ctx 408 .data::<String>() 409 .map_err(|_| async_graphql::Error::new("Authentication required"))? 410 .clone(); 411 412 // Get slice parameter (defaults to current slice) 413 let slice: String = match ctx.args.get("slice") { 414 Some(val) => val.string()?.to_string(), 415 None => current_slice, 416 }; 417 418 // Verify user owns this slice 419 if !slice.starts_with(&format!("at://{}/", user_did)) { 420 return Err(async_graphql::Error::new( 421 "You do not have permission to clear this slice" 422 )); 423 } 424 425 // Get pool from GraphQL context 426 let pool = ctx.data::<sqlx::PgPool>() 427 .map_err(|_| async_graphql::Error::new("Database pool not found in context"))?; 428 429 // Create Database instance from pool 430 let db = Database::new(pool.clone()); 431 432 // Delete all records for this slice 433 let records_deleted = db 434 .delete_all_records_for_slice(&slice) 435 .await 436 .map_err(|e| async_graphql::Error::new(format!("Failed to delete records: {}", e)))?; 437 438 // Delete all actors for this slice 439 let actors_deleted = db 440 .delete_all_actors_for_slice(&slice) 441 .await 442 .map_err(|e| async_graphql::Error::new(format!("Failed to delete actors: {}", e)))?; 443 444 let output = DeleteSliceRecordsOutput { 445 message: format!( 446 "Slice index cleared successfully. Deleted {} records and {} actors.", 447 records_deleted, actors_deleted 448 ), 449 records_deleted: records_deleted as i64, 450 actors_deleted: actors_deleted as i64, 451 }; 452 453 Ok(Some(FieldValue::owned_any(output))) 454 }) 455 }, 456 ) 457 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING))) 458 .description("Delete all records and actors from a slice index. Requires authentication and slice ownership.") 459 ) 460}