forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1//! GraphQL schema extensions for cross-collection record queries
2
3use async_graphql::dynamic::{Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef};
4use async_graphql::Value as GraphQLValue;
5use serde_json::Value;
6
7use crate::models::{Record, WhereClause};
8use crate::database::Database;
9
10/// Container for record data
11#[derive(Clone)]
12pub struct SliceRecordContainer {
13 pub uri: String,
14 pub cid: String,
15 pub did: String,
16 pub collection: String,
17 pub value: Value,
18 pub indexed_at: String,
19}
20
21/// Container for slice records response
22#[derive(Clone)]
23pub struct SliceRecordsConnection {
24 pub records: Vec<Record>,
25 pub total_count: i64,
26 pub has_next_page: bool,
27 pub end_cursor: Option<String>,
28}
29
30/// Edge data structure for Relay connections
31#[derive(Clone)]
32pub struct SliceRecordEdge {
33 pub node: SliceRecordContainer,
34 pub cursor: String,
35}
36
37/// Creates the SliceRecord GraphQL type
38pub fn create_slice_record_type() -> Object {
39 let mut slice_record = Object::new("SliceRecord");
40
41 slice_record = slice_record.field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
42 FieldFuture::new(async move {
43 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?;
44 Ok(Some(GraphQLValue::from(container.uri.clone())))
45 })
46 }));
47
48 slice_record = slice_record.field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| {
49 FieldFuture::new(async move {
50 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?;
51 Ok(Some(GraphQLValue::from(container.cid.clone())))
52 })
53 }));
54
55 slice_record = slice_record.field(Field::new("did", TypeRef::named_nn(TypeRef::STRING), |ctx| {
56 FieldFuture::new(async move {
57 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?;
58 Ok(Some(GraphQLValue::from(container.did.clone())))
59 })
60 }));
61
62 slice_record = slice_record.field(Field::new("collection", TypeRef::named_nn(TypeRef::STRING), |ctx| {
63 FieldFuture::new(async move {
64 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?;
65 Ok(Some(GraphQLValue::from(container.collection.clone())))
66 })
67 }));
68
69 slice_record = slice_record.field(Field::new("value", TypeRef::named_nn(TypeRef::STRING), |ctx| {
70 FieldFuture::new(async move {
71 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?;
72 let json_str = serde_json::to_string(&container.value)
73 .unwrap_or_else(|_| "{}".to_string());
74 Ok(Some(GraphQLValue::from(json_str)))
75 })
76 }));
77
78 slice_record = slice_record.field(Field::new("indexedAt", TypeRef::named_nn(TypeRef::STRING), |ctx| {
79 FieldFuture::new(async move {
80 let container = ctx.parent_value.try_downcast_ref::<SliceRecordContainer>()?;
81 Ok(Some(GraphQLValue::from(container.indexed_at.clone())))
82 })
83 }));
84
85 slice_record
86}
87
88/// Creates the SliceRecordEdge GraphQL type
89pub fn create_slice_record_edge_type() -> Object {
90 let mut edge = Object::new("SliceRecordEdge");
91
92 edge = edge.field(Field::new("node", TypeRef::named_nn("SliceRecord"), |ctx| {
93 FieldFuture::new(async move {
94 let edge_data = ctx.parent_value.try_downcast_ref::<SliceRecordEdge>()?;
95 Ok(Some(FieldValue::owned_any(edge_data.node.clone())))
96 })
97 }));
98
99 edge = edge.field(Field::new("cursor", TypeRef::named_nn(TypeRef::STRING), |ctx| {
100 FieldFuture::new(async move {
101 let edge_data = ctx.parent_value.try_downcast_ref::<SliceRecordEdge>()?;
102 Ok(Some(GraphQLValue::from(edge_data.cursor.clone())))
103 })
104 }));
105
106 edge
107}
108
109/// Creates the SliceRecordsConnection GraphQL type
110pub fn create_slice_records_connection_type() -> Object {
111 let mut connection = Object::new("SliceRecordsConnection");
112
113 // Add totalCount field
114 connection = connection.field(Field::new("totalCount", TypeRef::named_nn(TypeRef::INT), |ctx| {
115 FieldFuture::new(async move {
116 let container = ctx.parent_value.try_downcast_ref::<SliceRecordsConnection>()?;
117 Ok(Some(GraphQLValue::from(container.total_count as i32)))
118 })
119 }));
120
121 // Add edges field (Relay standard)
122 connection = connection.field(Field::new("edges", TypeRef::named_nn_list_nn("SliceRecordEdge"), |ctx| {
123 FieldFuture::new(async move {
124 let container = ctx.parent_value.try_downcast_ref::<SliceRecordsConnection>()?;
125 let edges: Vec<FieldValue<'_>> = container.records
126 .iter()
127 .map(|record| {
128 let record_container = SliceRecordContainer {
129 uri: record.uri.clone(),
130 cid: record.cid.clone(),
131 did: record.did.clone(),
132 collection: record.collection.clone(),
133 value: record.json.clone(),
134 indexed_at: record.indexed_at.to_rfc3339(),
135 };
136 let edge = SliceRecordEdge {
137 node: record_container,
138 cursor: record.cid.clone(), // Use CID as cursor
139 };
140 FieldValue::owned_any(edge)
141 })
142 .collect();
143 Ok(Some(FieldValue::list(edges)))
144 })
145 }));
146
147 // Add pageInfo field
148 connection = connection.field(Field::new("pageInfo", TypeRef::named_nn("PageInfo"), |ctx| {
149 FieldFuture::new(async move {
150 let container = ctx.parent_value.try_downcast_ref::<SliceRecordsConnection>()?;
151
152 let mut page_info = async_graphql::indexmap::IndexMap::new();
153 page_info.insert(
154 async_graphql::Name::new("hasNextPage"),
155 GraphQLValue::from(container.has_next_page),
156 );
157 page_info.insert(
158 async_graphql::Name::new("hasPreviousPage"),
159 GraphQLValue::from(false),
160 );
161
162 // Add endCursor
163 if let Some(ref cursor) = container.end_cursor {
164 page_info.insert(
165 async_graphql::Name::new("endCursor"),
166 GraphQLValue::from(cursor.clone()),
167 );
168 }
169
170 // Add startCursor (first record's CID if available)
171 if let Some(first_record) = container.records.first() {
172 page_info.insert(
173 async_graphql::Name::new("startCursor"),
174 GraphQLValue::from(first_record.cid.clone()),
175 );
176 }
177
178 Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info))))
179 })
180 }));
181
182 connection
183}
184
185/// Parse a where input object into a WhereClause
186fn parse_where_input(value: async_graphql::dynamic::ValueAccessor) -> Option<WhereClause> {
187 if let Ok(obj) = value.object() {
188 let mut conditions = std::collections::HashMap::new();
189 let mut or_clauses: Vec<WhereClause> = Vec::new();
190
191 // Manually check for each filter field
192 let fields = ["collection", "did", "uri", "cid", "indexedAt", "json"];
193 for field_name in fields {
194 if let Some(filter_value) = obj.get(field_name) {
195 if let Ok(filter_obj) = filter_value.object() {
196 let mut condition = crate::database::WhereCondition {
197 eq: None,
198 in_values: None,
199 contains: None,
200 fuzzy: None,
201 gt: None,
202 gte: None,
203 lt: None,
204 lte: None,
205 };
206
207 if let Some(eq_val) = filter_obj.get("eq") {
208 if let Ok(s) = eq_val.string() {
209 condition.eq = Some(serde_json::Value::String(s.to_string()));
210 }
211 }
212
213 if let Some(contains_val) = filter_obj.get("contains") {
214 if let Ok(s) = contains_val.string() {
215 condition.contains = Some(s.to_string());
216 }
217 }
218
219 conditions.insert(field_name.to_string(), condition);
220 }
221 }
222 }
223
224 // Handle OR conditions
225 if let Some(or_value) = obj.get("or") {
226 if let Ok(or_list) = or_value.list() {
227 let len = or_list.len();
228 for i in 0..len {
229 if let Some(or_item) = or_list.get(i) {
230 if let Some(or_clause) = parse_where_input(or_item) {
231 or_clauses.push(or_clause);
232 }
233 }
234 }
235 }
236 }
237
238 if !conditions.is_empty() || !or_clauses.is_empty() {
239 Some(WhereClause {
240 conditions,
241 or_conditions: None,
242 and: None,
243 or: if or_clauses.is_empty() { None } else { Some(or_clauses) },
244 })
245 } else {
246 None
247 }
248 } else {
249 None
250 }
251}
252
253/// Creates the SliceRecordsWhereInput type for filtering
254pub fn create_slice_records_where_input() -> InputObject {
255 InputObject::new("SliceRecordsWhereInput")
256 .field(InputValue::new("collection", TypeRef::named("StringFilter")))
257 .field(InputValue::new("did", TypeRef::named("StringFilter")))
258 .field(InputValue::new("uri", TypeRef::named("StringFilter")))
259 .field(InputValue::new("cid", TypeRef::named("StringFilter")))
260 .field(InputValue::new("indexedAt", TypeRef::named("DateTimeFilter")))
261 .field(InputValue::new("json", TypeRef::named("StringFilter")))
262 .field(InputValue::new("or", TypeRef::named_list("SliceRecordsWhereInput")))
263}
264
265/// Container for delete slice records response
266#[derive(Clone)]
267pub struct DeleteSliceRecordsOutput {
268 pub message: String,
269 pub records_deleted: i64,
270 pub actors_deleted: i64,
271}
272
273/// Creates the DeleteSliceRecordsOutput GraphQL type
274pub fn create_delete_slice_records_output_type() -> Object {
275 let mut output = Object::new("DeleteSliceRecordsOutput");
276
277 output = output.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| {
278 FieldFuture::new(async move {
279 let container = ctx.parent_value.try_downcast_ref::<DeleteSliceRecordsOutput>()?;
280 Ok(Some(GraphQLValue::from(container.message.clone())))
281 })
282 }));
283
284 output = output.field(Field::new("recordsDeleted", TypeRef::named_nn(TypeRef::INT), |ctx| {
285 FieldFuture::new(async move {
286 let container = ctx.parent_value.try_downcast_ref::<DeleteSliceRecordsOutput>()?;
287 Ok(Some(GraphQLValue::from(container.records_deleted as i32)))
288 })
289 }));
290
291 output = output.field(Field::new("actorsDeleted", TypeRef::named_nn(TypeRef::INT), |ctx| {
292 FieldFuture::new(async move {
293 let container = ctx.parent_value.try_downcast_ref::<DeleteSliceRecordsOutput>()?;
294 Ok(Some(GraphQLValue::from(container.actors_deleted as i32)))
295 })
296 }));
297
298 output
299}
300
301/// Add sliceRecords query to the Query type
302pub fn add_slice_records_query(
303 query: Object,
304 database: Database,
305) -> Object {
306 let db_for_records = database.clone();
307
308 query.field(
309 Field::new(
310 "sliceRecords",
311 TypeRef::named_nn("SliceRecordsConnection"),
312 move |ctx| {
313 let db = db_for_records.clone();
314 FieldFuture::new(async move {
315 // Get slice URI - either directly or by looking up via actorHandle + rkey
316 let slice_uri = if let Some(uri) = ctx.args.get("sliceUri").and_then(|v| v.string().ok()) {
317 uri.to_string()
318 } else if let (Some(actor_handle), Some(rkey)) = (
319 ctx.args.get("actorHandle").and_then(|v| v.string().ok()),
320 ctx.args.get("rkey").and_then(|v| v.string().ok()),
321 ) {
322 // Look up the slice by actorHandle and rkey using the dedicated database method
323 db.get_slice_uri_by_handle_and_rkey(actor_handle, rkey)
324 .await
325 .map_err(|e| async_graphql::Error::new(format!("Database error: {}", e)))?
326 .ok_or_else(|| async_graphql::Error::new("Slice not found"))?
327 } else {
328 return Err(async_graphql::Error::new("Either sliceUri or both actorHandle and rkey are required"));
329 };
330
331 // Relay-style pagination arguments
332 let limit: i32 = ctx.args.get("first")
333 .and_then(|v| v.i64().ok())
334 .map(|v| v as i32)
335 .unwrap_or(50);
336
337 let cursor: Option<&str> = ctx.args.get("after")
338 .and_then(|v| v.string().ok());
339
340 // Parse where clause if provided
341 let where_clause: Option<WhereClause> = ctx.args.get("where")
342 .and_then(|v| parse_where_input(v));
343
344 // Query the database for records with default sort order
345 let default_sort = vec![
346 crate::database::SortField {
347 field: "indexed_at".to_string(),
348 direction: "desc".to_string(),
349 }
350 ];
351 let (records, next_cursor) = db.get_slice_collections_records(
352 &slice_uri,
353 Some(limit),
354 cursor,
355 Some(&default_sort),
356 where_clause.as_ref(),
357 )
358 .await
359 .map_err(|e| async_graphql::Error::new(format!("Failed to fetch records: {}", e)))?;
360
361 // Get total count for the query
362 let total_count = db.count_slice_collections_records(
363 &slice_uri,
364 where_clause.as_ref(),
365 )
366 .await
367 .map_err(|e| async_graphql::Error::new(format!("Failed to count records: {}", e)))?;
368
369 // Determine if there's a next page
370 let has_next_page = next_cursor.is_some();
371
372 let connection = SliceRecordsConnection {
373 records,
374 total_count,
375 has_next_page,
376 end_cursor: next_cursor,
377 };
378
379 Ok(Some(FieldValue::owned_any(connection)))
380 })
381 },
382 )
383 .argument(InputValue::new("sliceUri", TypeRef::named(TypeRef::STRING)))
384 .argument(InputValue::new("actorHandle", TypeRef::named(TypeRef::STRING)))
385 .argument(InputValue::new("rkey", TypeRef::named(TypeRef::STRING)))
386 .argument(InputValue::new("first", TypeRef::named(TypeRef::INT)))
387 .argument(InputValue::new("after", TypeRef::named(TypeRef::STRING)))
388 .argument(InputValue::new("where", TypeRef::named("SliceRecordsWhereInput")))
389 .description("Query records across all collections in a slice with filtering and pagination. Provide either sliceUri or both actorHandle and rkey.")
390 )
391}
392
393/// Add deleteSliceRecords mutation to the Mutation type
394pub fn add_delete_slice_records_mutation(
395 mutation: Object,
396 slice_uri: String,
397) -> Object {
398 mutation.field(
399 Field::new(
400 "deleteSliceRecords",
401 TypeRef::named_nn("DeleteSliceRecordsOutput"),
402 move |ctx| {
403 let current_slice = slice_uri.clone();
404
405 FieldFuture::new(async move {
406 // Get user_did from context (set by auth middleware)
407 let user_did = ctx
408 .data::<String>()
409 .map_err(|_| async_graphql::Error::new("Authentication required"))?
410 .clone();
411
412 // Get slice parameter (defaults to current slice)
413 let slice: String = match ctx.args.get("slice") {
414 Some(val) => val.string()?.to_string(),
415 None => current_slice,
416 };
417
418 // Verify user owns this slice
419 if !slice.starts_with(&format!("at://{}/", user_did)) {
420 return Err(async_graphql::Error::new(
421 "You do not have permission to clear this slice"
422 ));
423 }
424
425 // Get pool from GraphQL context
426 let pool = ctx.data::<sqlx::PgPool>()
427 .map_err(|_| async_graphql::Error::new("Database pool not found in context"))?;
428
429 // Create Database instance from pool
430 let db = Database::new(pool.clone());
431
432 // Delete all records for this slice
433 let records_deleted = db
434 .delete_all_records_for_slice(&slice)
435 .await
436 .map_err(|e| async_graphql::Error::new(format!("Failed to delete records: {}", e)))?;
437
438 // Delete all actors for this slice
439 let actors_deleted = db
440 .delete_all_actors_for_slice(&slice)
441 .await
442 .map_err(|e| async_graphql::Error::new(format!("Failed to delete actors: {}", e)))?;
443
444 let output = DeleteSliceRecordsOutput {
445 message: format!(
446 "Slice index cleared successfully. Deleted {} records and {} actors.",
447 records_deleted, actors_deleted
448 ),
449 records_deleted: records_deleted as i64,
450 actors_deleted: actors_deleted as i64,
451 };
452
453 Ok(Some(FieldValue::owned_any(output)))
454 })
455 },
456 )
457 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING)))
458 .description("Delete all records and actors from a slice index. Requires authentication and slice ownership.")
459 )
460}