forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1//! GraphQL schema extensions for sync jobs
2
3use async_graphql::dynamic::{Field, FieldFuture, FieldValue, InputValue, Object, TypeRef, SubscriptionField, SubscriptionFieldFuture};
4use async_graphql::{Error, Value as GraphQLValue};
5use crate::jobs::{JobStatus, enqueue_sync_job, cancel_job, delete_job};
6use crate::models::BulkSyncParams;
7use tokio::sync::broadcast;
8use std::sync::{Arc, OnceLock};
9use tokio::sync::Mutex;
10use uuid::Uuid;
11use base64::engine::general_purpose;
12use base64::Engine;
13use redis::aio::ConnectionManager;
14use redis::{Client, AsyncCommands};
15use futures_util::StreamExt;
16
17/// Global broadcast channel for sync job status updates
18/// This allows real-time job status streaming to GraphQL subscriptions
19static JOB_CHANNEL: OnceLock<Arc<Mutex<broadcast::Sender<JobStatus>>>> = OnceLock::new();
20
21/// Global Redis client for cross-process pub/sub (optional)
22static REDIS_CLIENT: OnceLock<Option<Client>> = OnceLock::new();
23
24/// Initialize or get the global job channel
25fn get_job_channel() -> Arc<Mutex<broadcast::Sender<JobStatus>>> {
26 JOB_CHANNEL
27 .get_or_init(|| {
28 let (tx, _) = broadcast::channel(1000); // Buffer up to 1000 job status updates
29 Arc::new(Mutex::new(tx))
30 })
31 .clone()
32}
33
34/// Publish a sync job status update to subscribers
35pub async fn publish_sync_job_update(job_status: JobStatus) {
36 // Publish to in-memory broadcast channel (for same-process subscribers)
37 let sender = get_job_channel();
38 let sender_lock = sender.lock().await;
39 let _ = sender_lock.send(job_status.clone()); // Ignore errors if no subscribers
40 drop(sender_lock);
41
42 // Also publish to Redis for cross-process communication (if Redis is configured)
43 if let Some(Some(client)) = REDIS_CLIENT.get() {
44 if let Err(e) = publish_to_redis(client, &job_status).await {
45 tracing::warn!("Failed to publish job status to Redis: {}", e);
46 }
47 }
48}
49
50/// Publish job status to Redis for cross-process communication
51async fn publish_to_redis(client: &Client, job_status: &JobStatus) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
52 let mut conn = ConnectionManager::new(client.clone()).await?;
53 let payload = serde_json::to_string(job_status)?;
54 let _: () = conn.publish("sync_job_updates", payload).await?;
55 Ok(())
56}
57
58/// Container for JobStatus to implement Any trait for GraphQL
59#[derive(Clone)]
60struct JobStatusContainer {
61 status: JobStatus,
62}
63
64/// Container for SyncJobResult
65#[derive(Clone)]
66struct SyncJobResultContainer {
67 result: crate::jobs::SyncJobResult,
68}
69
70/// Container for StartSyncOutput
71#[derive(Clone)]
72struct StartSyncOutputContainer {
73 job_id: String,
74 message: String,
75}
76
77/// Container for SyncSummary
78#[derive(Clone)]
79struct SyncSummaryContainer {
80 summary: crate::xrpc::network::slices::slice::get_sync_summary::Output,
81}
82
83/// Container for CollectionSummary
84#[derive(Clone)]
85struct CollectionSummaryContainer {
86 summary: crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary,
87}
88
89/// Creates the CollectionSummary GraphQL type
90pub fn create_collection_summary_type() -> Object {
91 let mut collection_summary = Object::new("CollectionSummary");
92
93 collection_summary = collection_summary.field(Field::new("collection", TypeRef::named_nn(TypeRef::STRING), |ctx| {
94 FieldFuture::new(async move {
95 let container = ctx.parent_value.try_downcast_ref::<CollectionSummaryContainer>()?;
96 Ok(Some(GraphQLValue::from(container.summary.collection.clone())))
97 })
98 }));
99
100 collection_summary = collection_summary.field(Field::new("estimatedRepos", TypeRef::named_nn(TypeRef::INT), |ctx| {
101 FieldFuture::new(async move {
102 let container = ctx.parent_value.try_downcast_ref::<CollectionSummaryContainer>()?;
103 Ok(Some(GraphQLValue::from(container.summary.estimated_repos as i32)))
104 })
105 }));
106
107 collection_summary = collection_summary.field(Field::new("isExternal", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
108 FieldFuture::new(async move {
109 let container = ctx.parent_value.try_downcast_ref::<CollectionSummaryContainer>()?;
110 Ok(Some(GraphQLValue::from(container.summary.is_external)))
111 })
112 }));
113
114 collection_summary
115}
116
117/// Creates the SyncSummary GraphQL type
118pub fn create_sync_summary_type() -> Object {
119 let mut sync_summary = Object::new("SyncSummary");
120
121 sync_summary = sync_summary.field(Field::new("totalRepos", TypeRef::named_nn(TypeRef::INT), |ctx| {
122 FieldFuture::new(async move {
123 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?;
124 Ok(Some(GraphQLValue::from(container.summary.total_repos as i32)))
125 })
126 }));
127
128 sync_summary = sync_summary.field(Field::new("cappedRepos", TypeRef::named_nn(TypeRef::INT), |ctx| {
129 FieldFuture::new(async move {
130 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?;
131 Ok(Some(GraphQLValue::from(container.summary.capped_repos as i32)))
132 })
133 }));
134
135 sync_summary = sync_summary.field(Field::new("wouldBeCapped", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
136 FieldFuture::new(async move {
137 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?;
138 Ok(Some(GraphQLValue::from(container.summary.would_be_capped)))
139 })
140 }));
141
142 sync_summary = sync_summary.field(Field::new("appliedLimit", TypeRef::named_nn(TypeRef::INT), |ctx| {
143 FieldFuture::new(async move {
144 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?;
145 Ok(Some(GraphQLValue::from(container.summary.applied_limit)))
146 })
147 }));
148
149 sync_summary = sync_summary.field(Field::new("collectionsSummary", TypeRef::named_nn_list_nn("CollectionSummary"), |ctx| {
150 FieldFuture::new(async move {
151 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?;
152 let field_values: Vec<FieldValue<'_>> = container.summary.collections_summary
153 .iter()
154 .map(|col| {
155 let col_container = CollectionSummaryContainer { summary: col.clone() };
156 FieldValue::owned_any(col_container)
157 })
158 .collect();
159 Ok(Some(FieldValue::list(field_values)))
160 })
161 }));
162
163 sync_summary
164}
165
166/// Creates the StartSyncOutput GraphQL type
167pub fn create_start_sync_output_type() -> Object {
168 let mut output = Object::new("StartSyncOutput");
169
170 output = output.field(Field::new("jobId", TypeRef::named_nn(TypeRef::STRING), |ctx| {
171 FieldFuture::new(async move {
172 let container = ctx.parent_value.try_downcast_ref::<StartSyncOutputContainer>()?;
173 Ok(Some(GraphQLValue::from(container.job_id.clone())))
174 })
175 }));
176
177 output = output.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| {
178 FieldFuture::new(async move {
179 let container = ctx.parent_value.try_downcast_ref::<StartSyncOutputContainer>()?;
180 Ok(Some(GraphQLValue::from(container.message.clone())))
181 })
182 }));
183
184 output
185}
186
187/// Creates the SyncJobResult GraphQL type
188pub fn create_sync_job_result_type() -> Object {
189 let mut result = Object::new("SyncJobResult");
190
191 result = result.field(Field::new("success", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
192 FieldFuture::new(async move {
193 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?;
194 Ok(Some(GraphQLValue::from(container.result.success)))
195 })
196 }));
197
198 result = result.field(Field::new("totalRecords", TypeRef::named_nn(TypeRef::INT), |ctx| {
199 FieldFuture::new(async move {
200 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?;
201 Ok(Some(GraphQLValue::from(container.result.total_records as i32)))
202 })
203 }));
204
205 result = result.field(Field::new("collectionsSynced", TypeRef::named_nn_list_nn(TypeRef::STRING), |ctx| {
206 FieldFuture::new(async move {
207 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?;
208 let values: Vec<GraphQLValue> = container.result.collections_synced
209 .iter()
210 .map(|s| GraphQLValue::from(s.clone()))
211 .collect();
212 Ok(Some(GraphQLValue::List(values)))
213 })
214 }));
215
216 result = result.field(Field::new("reposProcessed", TypeRef::named_nn(TypeRef::INT), |ctx| {
217 FieldFuture::new(async move {
218 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?;
219 Ok(Some(GraphQLValue::from(container.result.repos_processed as i32)))
220 })
221 }));
222
223 result = result.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| {
224 FieldFuture::new(async move {
225 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?;
226 Ok(Some(GraphQLValue::from(container.result.message.clone())))
227 })
228 }));
229
230 result
231}
232
233/// Creates the SyncJob GraphQL type
234pub fn create_sync_job_type() -> Object {
235 let mut job = Object::new("SyncJob");
236
237 // Add global ID field for Relay
238 job = job.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), |ctx| {
239 FieldFuture::new(async move {
240 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
241 // Create Relay-style global ID: base64("SyncJob:uuid")
242 let global_id = format!("SyncJob:{}", container.status.job_id);
243 let encoded = general_purpose::STANDARD.encode(global_id.as_bytes());
244 Ok(Some(GraphQLValue::from(encoded)))
245 })
246 }));
247
248 job = job.field(Field::new("jobId", TypeRef::named_nn(TypeRef::STRING), |ctx| {
249 FieldFuture::new(async move {
250 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
251 Ok(Some(GraphQLValue::from(container.status.job_id.to_string())))
252 })
253 }));
254
255 job = job.field(Field::new("sliceUri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
256 FieldFuture::new(async move {
257 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
258 Ok(Some(GraphQLValue::from(container.status.slice_uri.clone())))
259 })
260 }));
261
262 job = job.field(Field::new("status", TypeRef::named_nn(TypeRef::STRING), |ctx| {
263 FieldFuture::new(async move {
264 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
265 Ok(Some(GraphQLValue::from(container.status.status.clone())))
266 })
267 }));
268
269 job = job.field(Field::new("createdAt", TypeRef::named_nn(TypeRef::STRING), |ctx| {
270 FieldFuture::new(async move {
271 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
272 Ok(Some(GraphQLValue::from(container.status.created_at.to_rfc3339())))
273 })
274 }));
275
276 job = job.field(Field::new("startedAt", TypeRef::named(TypeRef::STRING), |ctx| {
277 FieldFuture::new(async move {
278 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
279 Ok(container.status.started_at.map(|dt| GraphQLValue::from(dt.to_rfc3339())))
280 })
281 }));
282
283 job = job.field(Field::new("completedAt", TypeRef::named(TypeRef::STRING), |ctx| {
284 FieldFuture::new(async move {
285 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
286 Ok(container.status.completed_at.map(|dt| GraphQLValue::from(dt.to_rfc3339())))
287 })
288 }));
289
290 job = job.field(Field::new("result", TypeRef::named("SyncJobResult"), |ctx| {
291 FieldFuture::new(async move {
292 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
293 if let Some(result) = &container.status.result {
294 let result_container = SyncJobResultContainer { result: result.clone() };
295 Ok(Some(FieldValue::owned_any(result_container)))
296 } else {
297 Ok(None)
298 }
299 })
300 }));
301
302 job = job.field(Field::new("error", TypeRef::named(TypeRef::STRING), |ctx| {
303 FieldFuture::new(async move {
304 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
305 Ok(container.status.error.as_ref().map(|s| GraphQLValue::from(s.clone())))
306 })
307 }));
308
309 job = job.field(Field::new("retryCount", TypeRef::named_nn(TypeRef::INT), |ctx| {
310 FieldFuture::new(async move {
311 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
312 Ok(Some(GraphQLValue::from(container.status.retry_count as i32)))
313 })
314 }));
315
316 job
317}
318
319/// Add startSync mutation to the Mutation type
320pub fn add_start_sync_mutation(mutation: Object, slice_uri: String) -> Object {
321 mutation.field(
322 Field::new(
323 "startSync",
324 TypeRef::named_nn("StartSyncOutput"),
325 move |ctx| {
326 let current_slice = slice_uri.clone();
327
328 FieldFuture::new(async move {
329 // Get user_did from context (set by auth middleware)
330 let user_did = ctx
331 .data::<String>()
332 .map_err(|_| Error::new("Authentication required"))?
333 .clone();
334
335 // Get slice parameter (defaults to current slice)
336 let slice: String = match ctx.args.get("slice") {
337 Some(val) => val.string()?.to_string(),
338 None => current_slice,
339 };
340
341 // Get optional collections
342 let collections: Option<Vec<String>> = ctx.args.get("collections")
343 .and_then(|val| {
344 val.list().ok().map(|list| {
345 list.iter()
346 .filter_map(|v| v.string().ok().map(|s| s.to_string()))
347 .collect()
348 })
349 });
350
351 // Get optional external collections
352 let external_collections: Option<Vec<String>> = ctx.args.get("externalCollections")
353 .and_then(|val| {
354 val.list().ok().map(|list| {
355 list.iter()
356 .filter_map(|v| v.string().ok().map(|s| s.to_string()))
357 .collect()
358 })
359 });
360
361 // Get optional repos
362 let repos: Option<Vec<String>> = ctx.args.get("repos")
363 .and_then(|val| {
364 val.list().ok().map(|list| {
365 list.iter()
366 .filter_map(|v| v.string().ok().map(|s| s.to_string()))
367 .collect()
368 })
369 });
370
371 // Get optional limit_per_repo
372 let limit_per_repo: Option<i32> = ctx.args.get("limitPerRepo")
373 .and_then(|val| val.i64().ok().map(|i| i as i32));
374
375 // Get optional skip_validation
376 let skip_validation: Option<bool> = ctx.args.get("skipValidation")
377 .and_then(|val| val.boolean().ok());
378
379 // Get optional max_repos
380 let max_repos: Option<i32> = ctx.args.get("maxRepos")
381 .and_then(|val| val.i64().ok().map(|i| i as i32));
382
383 let params = BulkSyncParams {
384 collections,
385 external_collections,
386 repos,
387 limit_per_repo,
388 skip_validation,
389 max_repos,
390 };
391
392 // Enqueue the sync job
393 // Get pool from GraphQL context
394 let pool = ctx.data::<sqlx::PgPool>()
395 .map_err(|_| Error::new("Database pool not found in context"))?;
396
397 let job_id = enqueue_sync_job(pool, user_did, slice.clone(), params)
398 .await
399 .map_err(|e| Error::new(format!("Failed to enqueue sync job: {}", e)))?;
400
401 // Return output as container
402 let output = StartSyncOutputContainer {
403 job_id: job_id.to_string(),
404 message: format!("Sync job {} enqueued successfully", job_id),
405 };
406
407 Ok(Some(FieldValue::owned_any(output)))
408 })
409 },
410 )
411 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING)))
412 .argument(InputValue::new("collections", TypeRef::named_list(TypeRef::STRING)))
413 .argument(InputValue::new("externalCollections", TypeRef::named_list(TypeRef::STRING)))
414 .argument(InputValue::new("repos", TypeRef::named_list(TypeRef::STRING)))
415 .argument(InputValue::new("limitPerRepo", TypeRef::named(TypeRef::INT)))
416 .argument(InputValue::new("skipValidation", TypeRef::named(TypeRef::BOOLEAN)))
417 .argument(InputValue::new("maxRepos", TypeRef::named(TypeRef::INT)))
418 .description("Start a sync job to backfill collections from the ATProto relay")
419 )
420}
421
422/// Add syncJob query to the Query type
423pub fn add_sync_job_query(query: Object) -> Object {
424 query.field(
425 Field::new(
426 "syncJob",
427 TypeRef::named("SyncJob"),
428 move |ctx| {
429 FieldFuture::new(async move {
430 // Get job_id argument
431 let job_id_str: &str = ctx.args.get("jobId")
432 .ok_or_else(|| Error::new("jobId is required"))?
433 .string()?;
434
435 let job_id = Uuid::parse_str(job_id_str)
436 .map_err(|_| Error::new("Invalid UUID format for jobId"))?;
437
438 // Get pool from GraphQL context
439 let pool = ctx.data::<sqlx::PgPool>()
440 .map_err(|_| Error::new("Database pool not found in context"))?;
441
442 // Query database for job status
443 let job_status = crate::jobs::get_job_status(pool, job_id)
444 .await
445 .map_err(|e| Error::new(format!("Failed to fetch job status: {}", e)))?;
446
447 if let Some(status) = job_status {
448 let container = JobStatusContainer { status };
449 Ok(Some(FieldValue::owned_any(container)))
450 } else {
451 Ok(None)
452 }
453 })
454 },
455 )
456 .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING)))
457 .description("Get status of a specific sync job")
458 )
459}
460
461/// Add syncJobs query to the Query type
462pub fn add_sync_jobs_query(query: Object, slice_uri: String) -> Object {
463 query.field(
464 Field::new(
465 "syncJobs",
466 TypeRef::named_nn_list_nn("SyncJob"),
467 move |ctx| {
468 let current_slice = slice_uri.clone();
469
470 FieldFuture::new(async move {
471 // Get optional slice filter argument
472 // If not provided, default to filtering by the current slice URI
473 let slice_filter: Option<&str> = match ctx.args.get("slice") {
474 Some(val) => val.string().ok(),
475 None => Some(current_slice.as_str()),
476 };
477
478 // Get limit argument with default 20
479 let limit: i64 = match ctx.args.get("limit") {
480 Some(val) => val.i64().unwrap_or(20),
481 None => 20,
482 };
483
484 // Get pool from GraphQL context
485 let pool = ctx.data::<sqlx::PgPool>()
486 .map_err(|_| Error::new("Database pool not found in context"))?;
487
488 // Query database for job results
489 let jobs = crate::jobs::get_job_history_by_slice(pool, slice_filter, Some(limit))
490 .await
491 .map_err(|e| Error::new(format!("Failed to fetch sync jobs: {}", e)))?;
492
493 // Convert to GraphQL values
494 let field_values: Vec<FieldValue<'_>> = jobs
495 .into_iter()
496 .map(|job| {
497 let container = JobStatusContainer { status: job };
498 FieldValue::owned_any(container)
499 })
500 .collect();
501
502 Ok(Some(FieldValue::list(field_values)))
503 })
504 },
505 )
506 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING)))
507 .argument(InputValue::new("limit", TypeRef::named(TypeRef::INT)))
508 .description("Get sync job history for a slice")
509 )
510}
511
512/// Add syncJobLogs query to the Query type
513pub fn add_sync_job_logs_query(query: Object) -> Object {
514 query.field(
515 Field::new(
516 "syncJobLogs",
517 TypeRef::named_nn_list_nn("JetstreamLogEntry"),
518 move |ctx| {
519 FieldFuture::new(async move {
520 // Get job_id argument
521 let job_id_str: &str = ctx.args.get("jobId")
522 .ok_or_else(|| Error::new("jobId is required"))?
523 .string()?;
524
525 let job_id = Uuid::parse_str(job_id_str)
526 .map_err(|_| Error::new("Invalid UUID format for jobId"))?;
527
528 // Get limit argument with default 100
529 let limit: Option<i64> = ctx.args.get("limit")
530 .and_then(|val| val.i64().ok());
531
532 // Get pool from GraphQL context
533 let pool = ctx.data::<sqlx::PgPool>()
534 .map_err(|_| Error::new("Database pool not found in context"))?;
535
536 // Query database for logs
537 let logs = crate::logging::get_sync_job_logs(pool, job_id, limit)
538 .await
539 .map_err(|e| Error::new(format!("Failed to fetch sync job logs: {}", e)))?;
540
541 // Convert to GraphQL values (reuse LogEntryContainer from logs module)
542 let field_values: Vec<FieldValue<'_>> = logs
543 .into_iter()
544 .map(|log| {
545 let container = crate::graphql::schema_ext::logs::LogEntryContainer { entry: log };
546 FieldValue::owned_any(container)
547 })
548 .collect();
549
550 Ok(Some(FieldValue::list(field_values)))
551 })
552 },
553 )
554 .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING)))
555 .argument(InputValue::new("limit", TypeRef::named(TypeRef::INT)))
556 .description("Get logs for a specific sync job")
557 )
558}
559
560/// Add getSyncSummary query to the Query type
561pub fn add_get_sync_summary_query(query: Object) -> Object {
562 query.field(
563 Field::new(
564 "getSyncSummary",
565 TypeRef::named_nn("SyncSummary"),
566 move |ctx| {
567 FieldFuture::new(async move {
568 // Get required slice argument
569 let slice: String = ctx.args.get("slice")
570 .ok_or_else(|| Error::new("slice is required"))?
571 .string()?
572 .to_string();
573
574 // Get optional collections
575 let collections: Option<Vec<String>> = ctx.args.get("collections")
576 .and_then(|val| {
577 val.list().ok().map(|list| {
578 list.iter()
579 .filter_map(|v| v.string().ok().map(|s| s.to_string()))
580 .collect()
581 })
582 });
583
584 // Get optional external collections
585 let external_collections: Option<Vec<String>> = ctx.args.get("externalCollections")
586 .and_then(|val| {
587 val.list().ok().map(|list| {
588 list.iter()
589 .filter_map(|v| v.string().ok().map(|s| s.to_string()))
590 .collect()
591 })
592 });
593
594 // Get optional repos
595 let repos: Option<Vec<String>> = ctx.args.get("repos")
596 .and_then(|val| {
597 val.list().ok().map(|list| {
598 list.iter()
599 .filter_map(|v| v.string().ok().map(|s| s.to_string()))
600 .collect()
601 })
602 });
603
604 // Get GraphQL context
605 let gql_context = ctx.data::<crate::graphql::GraphQLContext>()
606 .map_err(|_| Error::new("GraphQL context not found"))?;
607
608 // Get pool from context
609 let pool = ctx.data::<sqlx::PgPool>()
610 .map_err(|_| Error::new("Database pool not found in context"))?;
611
612 // Create Database instance from pool
613 let database = crate::database::Database::new(pool.clone());
614
615 // Get slice domain using existing database method
616 let slice_domain = database
617 .get_slice_domain(&slice)
618 .await
619 .map_err(|e| Error::new(format!("Failed to get slice domain: {}", e)))?
620 .ok_or_else(|| Error::new(format!("Slice not found: {}", slice)))?;
621
622 // Get default max sync repos from env var (same as main.rs)
623 let applied_limit = std::env::var("DEFAULT_MAX_SYNC_REPOS")
624 .unwrap_or_else(|_| "5000".to_string())
625 .parse::<i32>()
626 .unwrap_or(5000);
627
628 // Get cache from context or create in-memory cache
629 let cache = gql_context.auth_cache.clone()
630 .unwrap_or_else(|| {
631 let backend = crate::cache::CacheBackendImpl::InMemory(
632 crate::cache::InMemoryCache::new(Some(300))
633 );
634 Arc::new(tokio::sync::Mutex::new(crate::cache::SliceCache::new(backend)))
635 });
636
637 // Create sync service for repo discovery
638 let sync_service = crate::sync::SyncService::with_cache(
639 database,
640 "https://relay1.us-west.bsky.network".to_string(),
641 cache,
642 );
643
644 // Discover repos if not provided
645 let (all_repos, collection_repo_counts): (Vec<String>, std::collections::HashMap<String, i64>) =
646 if let Some(provided_repos) = repos {
647 (provided_repos, std::collections::HashMap::new())
648 } else {
649 let primary_collections = collections.clone().unwrap_or_default();
650 let mut discovered_repos = std::collections::HashSet::new();
651 let mut counts: std::collections::HashMap<String, i64> = std::collections::HashMap::new();
652
653 // Get repos from primary collections
654 for collection in &primary_collections {
655 match sync_service.get_repos_for_collection(collection, &slice, Some(applied_limit)).await {
656 Ok(repos) => {
657 counts.insert(collection.clone(), repos.len() as i64);
658 discovered_repos.extend(repos);
659 }
660 Err(e) => {
661 tracing::warn!("Failed to get repos for collection {}: {}", collection, e);
662 counts.insert(collection.clone(), 0);
663 }
664 }
665 }
666
667 (discovered_repos.into_iter().collect(), counts)
668 };
669
670 let total_repos = all_repos.len() as i64;
671 let capped_repos = std::cmp::min(total_repos, applied_limit as i64);
672 let would_be_capped = total_repos > applied_limit as i64;
673
674 // Build collections summary
675 let mut collections_summary = Vec::new();
676
677 // Add primary collections
678 if let Some(primary_cols) = collections {
679 for collection in &primary_cols {
680 let is_external = !collection.starts_with(&slice_domain);
681 let estimated_repos = *collection_repo_counts.get(collection).unwrap_or(&0);
682
683 collections_summary.push(crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary {
684 collection: collection.clone(),
685 estimated_repos,
686 is_external,
687 });
688 }
689 }
690
691 // Add external collections (no repo counts)
692 if let Some(ext_cols) = external_collections {
693 for collection in &ext_cols {
694 let is_external = !collection.starts_with(&slice_domain);
695
696 collections_summary.push(crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary {
697 collection: collection.clone(),
698 estimated_repos: 0,
699 is_external,
700 });
701 }
702 }
703
704 // Create output
705 let output = crate::xrpc::network::slices::slice::get_sync_summary::Output {
706 total_repos,
707 capped_repos,
708 collections_summary,
709 would_be_capped,
710 applied_limit,
711 };
712
713 let container = SyncSummaryContainer { summary: output };
714 Ok(Some(FieldValue::owned_any(container)))
715 })
716 },
717 )
718 .argument(InputValue::new("slice", TypeRef::named_nn(TypeRef::STRING)))
719 .argument(InputValue::new("collections", TypeRef::named_list(TypeRef::STRING)))
720 .argument(InputValue::new("externalCollections", TypeRef::named_list(TypeRef::STRING)))
721 .argument(InputValue::new("repos", TypeRef::named_list(TypeRef::STRING)))
722 .description("Get summary of repos that would be synced based on collection filters")
723 )
724}
725
726/// Add syncJobUpdated subscription to the Subscription type
727pub fn add_sync_job_subscription(subscription: async_graphql::dynamic::Subscription) -> async_graphql::dynamic::Subscription {
728 subscription.field(SubscriptionField::new(
729 "syncJobUpdated",
730 TypeRef::named_nn("SyncJob"),
731 move |ctx| {
732 SubscriptionFieldFuture::new(async move {
733 // Get optional job filter from arguments
734 let job_filter: Option<Uuid> = ctx.args.get("jobId")
735 .and_then(|val| val.string().ok())
736 .and_then(|s| Uuid::parse_str(s).ok());
737
738 // Subscribe to the broadcast channel
739 let sender = get_job_channel();
740 let sender_lock = sender.lock().await;
741 let mut receiver = sender_lock.subscribe();
742 drop(sender_lock); // Release lock
743
744 // Get optional slice filter from arguments
745 let slice_filter: Option<String> = ctx.args.get("slice")
746 .and_then(|val| val.string().ok())
747 .map(|s| s.to_string());
748
749 let stream = async_stream::stream! {
750 while let Ok(job_status) = receiver.recv().await {
751 // Filter by job_id if provided
752 if let Some(filter_id) = job_filter {
753 if job_status.job_id != filter_id {
754 continue;
755 }
756 }
757
758 // Filter by slice_uri if provided
759 if let Some(ref filter_slice) = slice_filter {
760 if &job_status.slice_uri != filter_slice {
761 continue;
762 }
763 }
764
765 // Convert to GraphQL value and yield
766 let container = JobStatusContainer { status: job_status };
767 yield Ok(FieldValue::owned_any(container));
768 }
769 };
770
771 Ok(stream)
772 })
773 },
774 )
775 .argument(InputValue::new("jobId", TypeRef::named(TypeRef::STRING)))
776 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING)))
777 .description("Subscribe to sync job status updates"))
778}
779
780/// Add cancelJob mutation to the Mutation type
781pub fn add_cancel_job_mutation(mutation: Object) -> Object {
782 mutation.field(
783 Field::new(
784 "cancelJob",
785 TypeRef::named_nn(TypeRef::BOOLEAN),
786 move |ctx| {
787 FieldFuture::new(async move {
788 // Get job_id argument
789 let job_id_str: &str = ctx.args.get("jobId")
790 .ok_or_else(|| Error::new("jobId is required"))?
791 .string()?;
792
793 let job_id = Uuid::parse_str(job_id_str)
794 .map_err(|_| Error::new("Invalid UUID format for jobId"))?;
795
796 // Get pool from GraphQL context
797 let pool = ctx.data::<sqlx::PgPool>()
798 .map_err(|_| Error::new("Database pool not found in context"))?;
799
800 // Cancel the job
801 let cancelled = cancel_job(pool, job_id)
802 .await
803 .map_err(|e| Error::new(format!("Failed to cancel job: {}", e)))?;
804
805 Ok(Some(GraphQLValue::from(cancelled)))
806 })
807 },
808 )
809 .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING)))
810 .description("Cancel a pending or running sync job")
811 )
812}
813
814/// Add deleteJob mutation to the Mutation type
815pub fn add_delete_job_mutation(mutation: Object) -> Object {
816 mutation.field(
817 Field::new(
818 "deleteJob",
819 TypeRef::named(TypeRef::ID),
820 move |ctx| {
821 FieldFuture::new(async move {
822 // Get global ID argument
823 let global_id: &str = ctx.args.get("id")
824 .ok_or_else(|| Error::new("id is required"))?
825 .string()?;
826
827 // Decode global ID: base64("SyncJob:uuid") -> "uuid"
828 let decoded = general_purpose::STANDARD.decode(global_id)
829 .map_err(|_| Error::new("Invalid global ID format"))?;
830 let decoded_str = String::from_utf8(decoded)
831 .map_err(|_| Error::new("Invalid global ID encoding"))?;
832
833 // Extract UUID from "SyncJob:uuid"
834 let job_id_str = decoded_str
835 .strip_prefix("SyncJob:")
836 .ok_or_else(|| Error::new("Invalid global ID: not a SyncJob"))?;
837
838 let job_id = Uuid::parse_str(job_id_str)
839 .map_err(|_| Error::new("Invalid UUID in global ID"))?;
840
841 // Get pool from GraphQL context
842 let pool = ctx.data::<sqlx::PgPool>()
843 .map_err(|_| Error::new("Database pool not found in context"))?;
844
845 // Delete the job
846 let deleted = delete_job(pool, job_id)
847 .await
848 .map_err(|e| Error::new(format!("Failed to delete job: {}", e)))?;
849
850 // Return global ID if deleted, null if not found
851 if deleted {
852 Ok(Some(GraphQLValue::from(global_id)))
853 } else {
854 Ok(None)
855 }
856 })
857 },
858 )
859 .argument(InputValue::new("id", TypeRef::named_nn(TypeRef::ID)))
860 .description("Delete a sync job from the database")
861 )
862}
863
864/// Initialize Redis pub/sub for sync job updates
865///
866/// This function should be called once at application startup.
867/// It initializes the Redis client and starts a background task to listen for
868/// job updates from other processes (e.g., worker processes).
869///
870/// # Arguments
871/// * `redis_url` - Optional Redis connection URL. If None, Redis pub/sub is disabled.
872pub fn initialize_redis_pubsub(redis_url: Option<String>) {
873 // Initialize Redis client (or None if not configured)
874 let client = redis_url.and_then(|url| {
875 match Client::open(url.as_str()) {
876 Ok(client) => {
877 tracing::info!("Initialized Redis client for sync job pub/sub");
878 Some(client)
879 }
880 Err(e) => {
881 tracing::error!("Failed to create Redis client for sync job pub/sub: {}", e);
882 None
883 }
884 }
885 });
886
887 let has_redis = client.is_some();
888 REDIS_CLIENT.get_or_init(|| client);
889
890 // Start Redis subscription listener task if Redis is available
891 if has_redis {
892 start_redis_listener();
893 } else {
894 tracing::info!("Redis not configured - sync job updates will use in-memory broadcast only");
895 }
896}
897
898/// Start a background task that subscribes to Redis and forwards messages to the in-memory broadcast channel
899fn start_redis_listener() {
900 tokio::spawn(async {
901 tracing::info!("Starting Redis subscription listener for sync job updates");
902
903 loop {
904 // Get Redis client
905 let client = match REDIS_CLIENT.get() {
906 Some(Some(client)) => client,
907 _ => {
908 tracing::error!("Redis client not available for subscription");
909 return;
910 }
911 };
912
913 // Connect and subscribe
914 match subscribe_to_redis(client).await {
915 Ok(_) => {
916 tracing::warn!("Redis subscription ended, reconnecting in 5 seconds...");
917 }
918 Err(e) => {
919 tracing::error!("Redis subscription error: {}, reconnecting in 5 seconds...", e);
920 }
921 }
922
923 // Wait before reconnecting
924 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
925 }
926 });
927}
928
929/// Subscribe to Redis channel and forward messages to in-memory broadcast
930async fn subscribe_to_redis(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
931 // Create a pub/sub connection from the client
932 let mut pubsub = client.get_async_pubsub().await?;
933
934 pubsub.subscribe("sync_job_updates").await?;
935 tracing::info!("Subscribed to Redis channel: sync_job_updates");
936
937 // Get the in-memory broadcast sender
938 let sender = get_job_channel();
939
940 loop {
941 let msg = pubsub.on_message().next().await;
942 if let Some(msg) = msg {
943 let payload: String = msg.get_payload()?;
944
945 // Deserialize JobStatus from JSON
946 match serde_json::from_str::<JobStatus>(&payload) {
947 Ok(job_status) => {
948 // Forward to in-memory broadcast channel
949 let sender_lock = sender.lock().await;
950 if let Err(e) = sender_lock.send(job_status.clone()) {
951 tracing::debug!("No local subscribers for job update: {}", e);
952 }
953 drop(sender_lock);
954
955 tracing::debug!("Forwarded job update from Redis: job_id={}", job_status.job_id);
956 }
957 Err(e) => {
958 tracing::warn!("Failed to deserialize job status from Redis: {}", e);
959 }
960 }
961 }
962 }
963}