//! GraphQL schema extensions for sync jobs use async_graphql::dynamic::{Field, FieldFuture, FieldValue, InputValue, Object, TypeRef, SubscriptionField, SubscriptionFieldFuture}; use async_graphql::{Error, Value as GraphQLValue}; use crate::jobs::{JobStatus, enqueue_sync_job, cancel_job, delete_job}; use crate::models::BulkSyncParams; use tokio::sync::broadcast; use std::sync::{Arc, OnceLock}; use tokio::sync::Mutex; use uuid::Uuid; use base64::engine::general_purpose; use base64::Engine; use redis::aio::ConnectionManager; use redis::{Client, AsyncCommands}; use futures_util::StreamExt; /// Global broadcast channel for sync job status updates /// This allows real-time job status streaming to GraphQL subscriptions static JOB_CHANNEL: OnceLock>>> = OnceLock::new(); /// Global Redis client for cross-process pub/sub (optional) static REDIS_CLIENT: OnceLock> = OnceLock::new(); /// Initialize or get the global job channel fn get_job_channel() -> Arc>> { JOB_CHANNEL .get_or_init(|| { let (tx, _) = broadcast::channel(1000); // Buffer up to 1000 job status updates Arc::new(Mutex::new(tx)) }) .clone() } /// Publish a sync job status update to subscribers pub async fn publish_sync_job_update(job_status: JobStatus) { // Publish to in-memory broadcast channel (for same-process subscribers) let sender = get_job_channel(); let sender_lock = sender.lock().await; let _ = sender_lock.send(job_status.clone()); // Ignore errors if no subscribers drop(sender_lock); // Also publish to Redis for cross-process communication (if Redis is configured) if let Some(Some(client)) = REDIS_CLIENT.get() { if let Err(e) = publish_to_redis(client, &job_status).await { tracing::warn!("Failed to publish job status to Redis: {}", e); } } } /// Publish job status to Redis for cross-process communication async fn publish_to_redis(client: &Client, job_status: &JobStatus) -> Result<(), Box> { let mut conn = ConnectionManager::new(client.clone()).await?; let payload = serde_json::to_string(job_status)?; let _: () = conn.publish("sync_job_updates", payload).await?; Ok(()) } /// Container for JobStatus to implement Any trait for GraphQL #[derive(Clone)] struct JobStatusContainer { status: JobStatus, } /// Container for SyncJobResult #[derive(Clone)] struct SyncJobResultContainer { result: crate::jobs::SyncJobResult, } /// Container for StartSyncOutput #[derive(Clone)] struct StartSyncOutputContainer { job_id: String, message: String, } /// Container for SyncSummary #[derive(Clone)] struct SyncSummaryContainer { summary: crate::xrpc::network::slices::slice::get_sync_summary::Output, } /// Container for CollectionSummary #[derive(Clone)] struct CollectionSummaryContainer { summary: crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary, } /// Creates the CollectionSummary GraphQL type pub fn create_collection_summary_type() -> Object { let mut collection_summary = Object::new("CollectionSummary"); collection_summary = collection_summary.field(Field::new("collection", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.summary.collection.clone()))) }) })); collection_summary = collection_summary.field(Field::new("estimatedRepos", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.summary.estimated_repos as i32))) }) })); collection_summary = collection_summary.field(Field::new("isExternal", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.summary.is_external))) }) })); collection_summary } /// Creates the SyncSummary GraphQL type pub fn create_sync_summary_type() -> Object { let mut sync_summary = Object::new("SyncSummary"); sync_summary = sync_summary.field(Field::new("totalRepos", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.summary.total_repos as i32))) }) })); sync_summary = sync_summary.field(Field::new("cappedRepos", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.summary.capped_repos as i32))) }) })); sync_summary = sync_summary.field(Field::new("wouldBeCapped", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.summary.would_be_capped))) }) })); sync_summary = sync_summary.field(Field::new("appliedLimit", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.summary.applied_limit))) }) })); sync_summary = sync_summary.field(Field::new("collectionsSummary", TypeRef::named_nn_list_nn("CollectionSummary"), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let field_values: Vec> = container.summary.collections_summary .iter() .map(|col| { let col_container = CollectionSummaryContainer { summary: col.clone() }; FieldValue::owned_any(col_container) }) .collect(); Ok(Some(FieldValue::list(field_values))) }) })); sync_summary } /// Creates the StartSyncOutput GraphQL type pub fn create_start_sync_output_type() -> Object { let mut output = Object::new("StartSyncOutput"); output = output.field(Field::new("jobId", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.job_id.clone()))) }) })); output = output.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.message.clone()))) }) })); output } /// Creates the SyncJobResult GraphQL type pub fn create_sync_job_result_type() -> Object { let mut result = Object::new("SyncJobResult"); result = result.field(Field::new("success", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.result.success))) }) })); result = result.field(Field::new("totalRecords", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.result.total_records as i32))) }) })); result = result.field(Field::new("collectionsSynced", TypeRef::named_nn_list_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; let values: Vec = container.result.collections_synced .iter() .map(|s| GraphQLValue::from(s.clone())) .collect(); Ok(Some(GraphQLValue::List(values))) }) })); result = result.field(Field::new("reposProcessed", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.result.repos_processed as i32))) }) })); result = result.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.result.message.clone()))) }) })); result } /// Creates the SyncJob GraphQL type pub fn create_sync_job_type() -> Object { let mut job = Object::new("SyncJob"); // Add global ID field for Relay job = job.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; // Create Relay-style global ID: base64("SyncJob:uuid") let global_id = format!("SyncJob:{}", container.status.job_id); let encoded = general_purpose::STANDARD.encode(global_id.as_bytes()); Ok(Some(GraphQLValue::from(encoded))) }) })); job = job.field(Field::new("jobId", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.status.job_id.to_string()))) }) })); job = job.field(Field::new("sliceUri", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.status.slice_uri.clone()))) }) })); job = job.field(Field::new("status", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.status.status.clone()))) }) })); job = job.field(Field::new("createdAt", TypeRef::named_nn(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.status.created_at.to_rfc3339()))) }) })); job = job.field(Field::new("startedAt", TypeRef::named(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(container.status.started_at.map(|dt| GraphQLValue::from(dt.to_rfc3339()))) }) })); job = job.field(Field::new("completedAt", TypeRef::named(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(container.status.completed_at.map(|dt| GraphQLValue::from(dt.to_rfc3339()))) }) })); job = job.field(Field::new("result", TypeRef::named("SyncJobResult"), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; if let Some(result) = &container.status.result { let result_container = SyncJobResultContainer { result: result.clone() }; Ok(Some(FieldValue::owned_any(result_container))) } else { Ok(None) } }) })); job = job.field(Field::new("error", TypeRef::named(TypeRef::STRING), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(container.status.error.as_ref().map(|s| GraphQLValue::from(s.clone()))) }) })); job = job.field(Field::new("retryCount", TypeRef::named_nn(TypeRef::INT), |ctx| { FieldFuture::new(async move { let container = ctx.parent_value.try_downcast_ref::()?; Ok(Some(GraphQLValue::from(container.status.retry_count as i32))) }) })); job } /// Add startSync mutation to the Mutation type pub fn add_start_sync_mutation(mutation: Object, slice_uri: String) -> Object { mutation.field( Field::new( "startSync", TypeRef::named_nn("StartSyncOutput"), move |ctx| { let current_slice = slice_uri.clone(); FieldFuture::new(async move { // Get user_did from context (set by auth middleware) let user_did = ctx .data::() .map_err(|_| Error::new("Authentication required"))? .clone(); // Get slice parameter (defaults to current slice) let slice: String = match ctx.args.get("slice") { Some(val) => val.string()?.to_string(), None => current_slice, }; // Get optional collections let collections: Option> = ctx.args.get("collections") .and_then(|val| { val.list().ok().map(|list| { list.iter() .filter_map(|v| v.string().ok().map(|s| s.to_string())) .collect() }) }); // Get optional external collections let external_collections: Option> = ctx.args.get("externalCollections") .and_then(|val| { val.list().ok().map(|list| { list.iter() .filter_map(|v| v.string().ok().map(|s| s.to_string())) .collect() }) }); // Get optional repos let repos: Option> = ctx.args.get("repos") .and_then(|val| { val.list().ok().map(|list| { list.iter() .filter_map(|v| v.string().ok().map(|s| s.to_string())) .collect() }) }); // Get optional limit_per_repo let limit_per_repo: Option = ctx.args.get("limitPerRepo") .and_then(|val| val.i64().ok().map(|i| i as i32)); // Get optional skip_validation let skip_validation: Option = ctx.args.get("skipValidation") .and_then(|val| val.boolean().ok()); // Get optional max_repos let max_repos: Option = ctx.args.get("maxRepos") .and_then(|val| val.i64().ok().map(|i| i as i32)); let params = BulkSyncParams { collections, external_collections, repos, limit_per_repo, skip_validation, max_repos, }; // Enqueue the sync job // Get pool from GraphQL context let pool = ctx.data::() .map_err(|_| Error::new("Database pool not found in context"))?; let job_id = enqueue_sync_job(pool, user_did, slice.clone(), params) .await .map_err(|e| Error::new(format!("Failed to enqueue sync job: {}", e)))?; // Return output as container let output = StartSyncOutputContainer { job_id: job_id.to_string(), message: format!("Sync job {} enqueued successfully", job_id), }; Ok(Some(FieldValue::owned_any(output))) }) }, ) .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING))) .argument(InputValue::new("collections", TypeRef::named_list(TypeRef::STRING))) .argument(InputValue::new("externalCollections", TypeRef::named_list(TypeRef::STRING))) .argument(InputValue::new("repos", TypeRef::named_list(TypeRef::STRING))) .argument(InputValue::new("limitPerRepo", TypeRef::named(TypeRef::INT))) .argument(InputValue::new("skipValidation", TypeRef::named(TypeRef::BOOLEAN))) .argument(InputValue::new("maxRepos", TypeRef::named(TypeRef::INT))) .description("Start a sync job to backfill collections from the ATProto relay") ) } /// Add syncJob query to the Query type pub fn add_sync_job_query(query: Object) -> Object { query.field( Field::new( "syncJob", TypeRef::named("SyncJob"), move |ctx| { FieldFuture::new(async move { // Get job_id argument let job_id_str: &str = ctx.args.get("jobId") .ok_or_else(|| Error::new("jobId is required"))? .string()?; let job_id = Uuid::parse_str(job_id_str) .map_err(|_| Error::new("Invalid UUID format for jobId"))?; // Get pool from GraphQL context let pool = ctx.data::() .map_err(|_| Error::new("Database pool not found in context"))?; // Query database for job status let job_status = crate::jobs::get_job_status(pool, job_id) .await .map_err(|e| Error::new(format!("Failed to fetch job status: {}", e)))?; if let Some(status) = job_status { let container = JobStatusContainer { status }; Ok(Some(FieldValue::owned_any(container))) } else { Ok(None) } }) }, ) .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING))) .description("Get status of a specific sync job") ) } /// Add syncJobs query to the Query type pub fn add_sync_jobs_query(query: Object, slice_uri: String) -> Object { query.field( Field::new( "syncJobs", TypeRef::named_nn_list_nn("SyncJob"), move |ctx| { let current_slice = slice_uri.clone(); FieldFuture::new(async move { // Get optional slice filter argument // If not provided, default to filtering by the current slice URI let slice_filter: Option<&str> = match ctx.args.get("slice") { Some(val) => val.string().ok(), None => Some(current_slice.as_str()), }; // Get limit argument with default 20 let limit: i64 = match ctx.args.get("limit") { Some(val) => val.i64().unwrap_or(20), None => 20, }; // Get pool from GraphQL context let pool = ctx.data::() .map_err(|_| Error::new("Database pool not found in context"))?; // Query database for job results let jobs = crate::jobs::get_job_history_by_slice(pool, slice_filter, Some(limit)) .await .map_err(|e| Error::new(format!("Failed to fetch sync jobs: {}", e)))?; // Convert to GraphQL values let field_values: Vec> = jobs .into_iter() .map(|job| { let container = JobStatusContainer { status: job }; FieldValue::owned_any(container) }) .collect(); Ok(Some(FieldValue::list(field_values))) }) }, ) .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING))) .argument(InputValue::new("limit", TypeRef::named(TypeRef::INT))) .description("Get sync job history for a slice") ) } /// Add syncJobLogs query to the Query type pub fn add_sync_job_logs_query(query: Object) -> Object { query.field( Field::new( "syncJobLogs", TypeRef::named_nn_list_nn("JetstreamLogEntry"), move |ctx| { FieldFuture::new(async move { // Get job_id argument let job_id_str: &str = ctx.args.get("jobId") .ok_or_else(|| Error::new("jobId is required"))? .string()?; let job_id = Uuid::parse_str(job_id_str) .map_err(|_| Error::new("Invalid UUID format for jobId"))?; // Get limit argument with default 100 let limit: Option = ctx.args.get("limit") .and_then(|val| val.i64().ok()); // Get pool from GraphQL context let pool = ctx.data::() .map_err(|_| Error::new("Database pool not found in context"))?; // Query database for logs let logs = crate::logging::get_sync_job_logs(pool, job_id, limit) .await .map_err(|e| Error::new(format!("Failed to fetch sync job logs: {}", e)))?; // Convert to GraphQL values (reuse LogEntryContainer from logs module) let field_values: Vec> = logs .into_iter() .map(|log| { let container = crate::graphql::schema_ext::logs::LogEntryContainer { entry: log }; FieldValue::owned_any(container) }) .collect(); Ok(Some(FieldValue::list(field_values))) }) }, ) .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING))) .argument(InputValue::new("limit", TypeRef::named(TypeRef::INT))) .description("Get logs for a specific sync job") ) } /// Add getSyncSummary query to the Query type pub fn add_get_sync_summary_query(query: Object) -> Object { query.field( Field::new( "getSyncSummary", TypeRef::named_nn("SyncSummary"), move |ctx| { FieldFuture::new(async move { // Get required slice argument let slice: String = ctx.args.get("slice") .ok_or_else(|| Error::new("slice is required"))? .string()? .to_string(); // Get optional collections let collections: Option> = ctx.args.get("collections") .and_then(|val| { val.list().ok().map(|list| { list.iter() .filter_map(|v| v.string().ok().map(|s| s.to_string())) .collect() }) }); // Get optional external collections let external_collections: Option> = ctx.args.get("externalCollections") .and_then(|val| { val.list().ok().map(|list| { list.iter() .filter_map(|v| v.string().ok().map(|s| s.to_string())) .collect() }) }); // Get optional repos let repos: Option> = ctx.args.get("repos") .and_then(|val| { val.list().ok().map(|list| { list.iter() .filter_map(|v| v.string().ok().map(|s| s.to_string())) .collect() }) }); // Get GraphQL context let gql_context = ctx.data::() .map_err(|_| Error::new("GraphQL context not found"))?; // Get pool from context let pool = ctx.data::() .map_err(|_| Error::new("Database pool not found in context"))?; // Create Database instance from pool let database = crate::database::Database::new(pool.clone()); // Get slice domain using existing database method let slice_domain = database .get_slice_domain(&slice) .await .map_err(|e| Error::new(format!("Failed to get slice domain: {}", e)))? .ok_or_else(|| Error::new(format!("Slice not found: {}", slice)))?; // Get default max sync repos from env var (same as main.rs) let applied_limit = std::env::var("DEFAULT_MAX_SYNC_REPOS") .unwrap_or_else(|_| "5000".to_string()) .parse::() .unwrap_or(5000); // Get cache from context or create in-memory cache let cache = gql_context.auth_cache.clone() .unwrap_or_else(|| { let backend = crate::cache::CacheBackendImpl::InMemory( crate::cache::InMemoryCache::new(Some(300)) ); Arc::new(tokio::sync::Mutex::new(crate::cache::SliceCache::new(backend))) }); // Create sync service for repo discovery let sync_service = crate::sync::SyncService::with_cache( database, "https://relay1.us-west.bsky.network".to_string(), cache, ); // Discover repos if not provided let (all_repos, collection_repo_counts): (Vec, std::collections::HashMap) = if let Some(provided_repos) = repos { (provided_repos, std::collections::HashMap::new()) } else { let primary_collections = collections.clone().unwrap_or_default(); let mut discovered_repos = std::collections::HashSet::new(); let mut counts: std::collections::HashMap = std::collections::HashMap::new(); // Get repos from primary collections for collection in &primary_collections { match sync_service.get_repos_for_collection(collection, &slice, Some(applied_limit)).await { Ok(repos) => { counts.insert(collection.clone(), repos.len() as i64); discovered_repos.extend(repos); } Err(e) => { tracing::warn!("Failed to get repos for collection {}: {}", collection, e); counts.insert(collection.clone(), 0); } } } (discovered_repos.into_iter().collect(), counts) }; let total_repos = all_repos.len() as i64; let capped_repos = std::cmp::min(total_repos, applied_limit as i64); let would_be_capped = total_repos > applied_limit as i64; // Build collections summary let mut collections_summary = Vec::new(); // Add primary collections if let Some(primary_cols) = collections { for collection in &primary_cols { let is_external = !collection.starts_with(&slice_domain); let estimated_repos = *collection_repo_counts.get(collection).unwrap_or(&0); collections_summary.push(crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary { collection: collection.clone(), estimated_repos, is_external, }); } } // Add external collections (no repo counts) if let Some(ext_cols) = external_collections { for collection in &ext_cols { let is_external = !collection.starts_with(&slice_domain); collections_summary.push(crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary { collection: collection.clone(), estimated_repos: 0, is_external, }); } } // Create output let output = crate::xrpc::network::slices::slice::get_sync_summary::Output { total_repos, capped_repos, collections_summary, would_be_capped, applied_limit, }; let container = SyncSummaryContainer { summary: output }; Ok(Some(FieldValue::owned_any(container))) }) }, ) .argument(InputValue::new("slice", TypeRef::named_nn(TypeRef::STRING))) .argument(InputValue::new("collections", TypeRef::named_list(TypeRef::STRING))) .argument(InputValue::new("externalCollections", TypeRef::named_list(TypeRef::STRING))) .argument(InputValue::new("repos", TypeRef::named_list(TypeRef::STRING))) .description("Get summary of repos that would be synced based on collection filters") ) } /// Add syncJobUpdated subscription to the Subscription type pub fn add_sync_job_subscription(subscription: async_graphql::dynamic::Subscription) -> async_graphql::dynamic::Subscription { subscription.field(SubscriptionField::new( "syncJobUpdated", TypeRef::named_nn("SyncJob"), move |ctx| { SubscriptionFieldFuture::new(async move { // Get optional job filter from arguments let job_filter: Option = ctx.args.get("jobId") .and_then(|val| val.string().ok()) .and_then(|s| Uuid::parse_str(s).ok()); // Subscribe to the broadcast channel let sender = get_job_channel(); let sender_lock = sender.lock().await; let mut receiver = sender_lock.subscribe(); drop(sender_lock); // Release lock // Get optional slice filter from arguments let slice_filter: Option = ctx.args.get("slice") .and_then(|val| val.string().ok()) .map(|s| s.to_string()); let stream = async_stream::stream! { while let Ok(job_status) = receiver.recv().await { // Filter by job_id if provided if let Some(filter_id) = job_filter { if job_status.job_id != filter_id { continue; } } // Filter by slice_uri if provided if let Some(ref filter_slice) = slice_filter { if &job_status.slice_uri != filter_slice { continue; } } // Convert to GraphQL value and yield let container = JobStatusContainer { status: job_status }; yield Ok(FieldValue::owned_any(container)); } }; Ok(stream) }) }, ) .argument(InputValue::new("jobId", TypeRef::named(TypeRef::STRING))) .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING))) .description("Subscribe to sync job status updates")) } /// Add cancelJob mutation to the Mutation type pub fn add_cancel_job_mutation(mutation: Object) -> Object { mutation.field( Field::new( "cancelJob", TypeRef::named_nn(TypeRef::BOOLEAN), move |ctx| { FieldFuture::new(async move { // Get job_id argument let job_id_str: &str = ctx.args.get("jobId") .ok_or_else(|| Error::new("jobId is required"))? .string()?; let job_id = Uuid::parse_str(job_id_str) .map_err(|_| Error::new("Invalid UUID format for jobId"))?; // Get pool from GraphQL context let pool = ctx.data::() .map_err(|_| Error::new("Database pool not found in context"))?; // Cancel the job let cancelled = cancel_job(pool, job_id) .await .map_err(|e| Error::new(format!("Failed to cancel job: {}", e)))?; Ok(Some(GraphQLValue::from(cancelled))) }) }, ) .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING))) .description("Cancel a pending or running sync job") ) } /// Add deleteJob mutation to the Mutation type pub fn add_delete_job_mutation(mutation: Object) -> Object { mutation.field( Field::new( "deleteJob", TypeRef::named(TypeRef::ID), move |ctx| { FieldFuture::new(async move { // Get global ID argument let global_id: &str = ctx.args.get("id") .ok_or_else(|| Error::new("id is required"))? .string()?; // Decode global ID: base64("SyncJob:uuid") -> "uuid" let decoded = general_purpose::STANDARD.decode(global_id) .map_err(|_| Error::new("Invalid global ID format"))?; let decoded_str = String::from_utf8(decoded) .map_err(|_| Error::new("Invalid global ID encoding"))?; // Extract UUID from "SyncJob:uuid" let job_id_str = decoded_str .strip_prefix("SyncJob:") .ok_or_else(|| Error::new("Invalid global ID: not a SyncJob"))?; let job_id = Uuid::parse_str(job_id_str) .map_err(|_| Error::new("Invalid UUID in global ID"))?; // Get pool from GraphQL context let pool = ctx.data::() .map_err(|_| Error::new("Database pool not found in context"))?; // Delete the job let deleted = delete_job(pool, job_id) .await .map_err(|e| Error::new(format!("Failed to delete job: {}", e)))?; // Return global ID if deleted, null if not found if deleted { Ok(Some(GraphQLValue::from(global_id))) } else { Ok(None) } }) }, ) .argument(InputValue::new("id", TypeRef::named_nn(TypeRef::ID))) .description("Delete a sync job from the database") ) } /// Initialize Redis pub/sub for sync job updates /// /// This function should be called once at application startup. /// It initializes the Redis client and starts a background task to listen for /// job updates from other processes (e.g., worker processes). /// /// # Arguments /// * `redis_url` - Optional Redis connection URL. If None, Redis pub/sub is disabled. pub fn initialize_redis_pubsub(redis_url: Option) { // Initialize Redis client (or None if not configured) let client = redis_url.and_then(|url| { match Client::open(url.as_str()) { Ok(client) => { tracing::info!("Initialized Redis client for sync job pub/sub"); Some(client) } Err(e) => { tracing::error!("Failed to create Redis client for sync job pub/sub: {}", e); None } } }); let has_redis = client.is_some(); REDIS_CLIENT.get_or_init(|| client); // Start Redis subscription listener task if Redis is available if has_redis { start_redis_listener(); } else { tracing::info!("Redis not configured - sync job updates will use in-memory broadcast only"); } } /// Start a background task that subscribes to Redis and forwards messages to the in-memory broadcast channel fn start_redis_listener() { tokio::spawn(async { tracing::info!("Starting Redis subscription listener for sync job updates"); loop { // Get Redis client let client = match REDIS_CLIENT.get() { Some(Some(client)) => client, _ => { tracing::error!("Redis client not available for subscription"); return; } }; // Connect and subscribe match subscribe_to_redis(client).await { Ok(_) => { tracing::warn!("Redis subscription ended, reconnecting in 5 seconds..."); } Err(e) => { tracing::error!("Redis subscription error: {}, reconnecting in 5 seconds...", e); } } // Wait before reconnecting tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } }); } /// Subscribe to Redis channel and forward messages to in-memory broadcast async fn subscribe_to_redis(client: &Client) -> Result<(), Box> { // Create a pub/sub connection from the client let mut pubsub = client.get_async_pubsub().await?; pubsub.subscribe("sync_job_updates").await?; tracing::info!("Subscribed to Redis channel: sync_job_updates"); // Get the in-memory broadcast sender let sender = get_job_channel(); loop { let msg = pubsub.on_message().next().await; if let Some(msg) = msg { let payload: String = msg.get_payload()?; // Deserialize JobStatus from JSON match serde_json::from_str::(&payload) { Ok(job_status) => { // Forward to in-memory broadcast channel let sender_lock = sender.lock().await; if let Err(e) = sender_lock.send(job_status.clone()) { tracing::debug!("No local subscribers for job update: {}", e); } drop(sender_lock); tracing::debug!("Forwarded job update from Redis: job_id={}", job_status.job_id); } Err(e) => { tracing::warn!("Failed to deserialize job status from Redis: {}", e); } } } } }