Highly ambitious ATProtocol AppView service and sdks
at main 963 lines 41 kB view raw
1//! GraphQL schema extensions for sync jobs 2 3use async_graphql::dynamic::{Field, FieldFuture, FieldValue, InputValue, Object, TypeRef, SubscriptionField, SubscriptionFieldFuture}; 4use async_graphql::{Error, Value as GraphQLValue}; 5use crate::jobs::{JobStatus, enqueue_sync_job, cancel_job, delete_job}; 6use crate::models::BulkSyncParams; 7use tokio::sync::broadcast; 8use std::sync::{Arc, OnceLock}; 9use tokio::sync::Mutex; 10use uuid::Uuid; 11use base64::engine::general_purpose; 12use base64::Engine; 13use redis::aio::ConnectionManager; 14use redis::{Client, AsyncCommands}; 15use futures_util::StreamExt; 16 17/// Global broadcast channel for sync job status updates 18/// This allows real-time job status streaming to GraphQL subscriptions 19static JOB_CHANNEL: OnceLock<Arc<Mutex<broadcast::Sender<JobStatus>>>> = OnceLock::new(); 20 21/// Global Redis client for cross-process pub/sub (optional) 22static REDIS_CLIENT: OnceLock<Option<Client>> = OnceLock::new(); 23 24/// Initialize or get the global job channel 25fn get_job_channel() -> Arc<Mutex<broadcast::Sender<JobStatus>>> { 26 JOB_CHANNEL 27 .get_or_init(|| { 28 let (tx, _) = broadcast::channel(1000); // Buffer up to 1000 job status updates 29 Arc::new(Mutex::new(tx)) 30 }) 31 .clone() 32} 33 34/// Publish a sync job status update to subscribers 35pub async fn publish_sync_job_update(job_status: JobStatus) { 36 // Publish to in-memory broadcast channel (for same-process subscribers) 37 let sender = get_job_channel(); 38 let sender_lock = sender.lock().await; 39 let _ = sender_lock.send(job_status.clone()); // Ignore errors if no subscribers 40 drop(sender_lock); 41 42 // Also publish to Redis for cross-process communication (if Redis is configured) 43 if let Some(Some(client)) = REDIS_CLIENT.get() { 44 if let Err(e) = publish_to_redis(client, &job_status).await { 45 tracing::warn!("Failed to publish job status to Redis: {}", e); 46 } 47 } 48} 49 50/// Publish job status to Redis for cross-process communication 51async fn publish_to_redis(client: &Client, job_status: &JobStatus) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 52 let mut conn = ConnectionManager::new(client.clone()).await?; 53 let payload = serde_json::to_string(job_status)?; 54 let _: () = conn.publish("sync_job_updates", payload).await?; 55 Ok(()) 56} 57 58/// Container for JobStatus to implement Any trait for GraphQL 59#[derive(Clone)] 60struct JobStatusContainer { 61 status: JobStatus, 62} 63 64/// Container for SyncJobResult 65#[derive(Clone)] 66struct SyncJobResultContainer { 67 result: crate::jobs::SyncJobResult, 68} 69 70/// Container for StartSyncOutput 71#[derive(Clone)] 72struct StartSyncOutputContainer { 73 job_id: String, 74 message: String, 75} 76 77/// Container for SyncSummary 78#[derive(Clone)] 79struct SyncSummaryContainer { 80 summary: crate::xrpc::network::slices::slice::get_sync_summary::Output, 81} 82 83/// Container for CollectionSummary 84#[derive(Clone)] 85struct CollectionSummaryContainer { 86 summary: crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary, 87} 88 89/// Creates the CollectionSummary GraphQL type 90pub fn create_collection_summary_type() -> Object { 91 let mut collection_summary = Object::new("CollectionSummary"); 92 93 collection_summary = collection_summary.field(Field::new("collection", TypeRef::named_nn(TypeRef::STRING), |ctx| { 94 FieldFuture::new(async move { 95 let container = ctx.parent_value.try_downcast_ref::<CollectionSummaryContainer>()?; 96 Ok(Some(GraphQLValue::from(container.summary.collection.clone()))) 97 }) 98 })); 99 100 collection_summary = collection_summary.field(Field::new("estimatedRepos", TypeRef::named_nn(TypeRef::INT), |ctx| { 101 FieldFuture::new(async move { 102 let container = ctx.parent_value.try_downcast_ref::<CollectionSummaryContainer>()?; 103 Ok(Some(GraphQLValue::from(container.summary.estimated_repos as i32))) 104 }) 105 })); 106 107 collection_summary = collection_summary.field(Field::new("isExternal", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { 108 FieldFuture::new(async move { 109 let container = ctx.parent_value.try_downcast_ref::<CollectionSummaryContainer>()?; 110 Ok(Some(GraphQLValue::from(container.summary.is_external))) 111 }) 112 })); 113 114 collection_summary 115} 116 117/// Creates the SyncSummary GraphQL type 118pub fn create_sync_summary_type() -> Object { 119 let mut sync_summary = Object::new("SyncSummary"); 120 121 sync_summary = sync_summary.field(Field::new("totalRepos", TypeRef::named_nn(TypeRef::INT), |ctx| { 122 FieldFuture::new(async move { 123 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?; 124 Ok(Some(GraphQLValue::from(container.summary.total_repos as i32))) 125 }) 126 })); 127 128 sync_summary = sync_summary.field(Field::new("cappedRepos", TypeRef::named_nn(TypeRef::INT), |ctx| { 129 FieldFuture::new(async move { 130 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?; 131 Ok(Some(GraphQLValue::from(container.summary.capped_repos as i32))) 132 }) 133 })); 134 135 sync_summary = sync_summary.field(Field::new("wouldBeCapped", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { 136 FieldFuture::new(async move { 137 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?; 138 Ok(Some(GraphQLValue::from(container.summary.would_be_capped))) 139 }) 140 })); 141 142 sync_summary = sync_summary.field(Field::new("appliedLimit", TypeRef::named_nn(TypeRef::INT), |ctx| { 143 FieldFuture::new(async move { 144 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?; 145 Ok(Some(GraphQLValue::from(container.summary.applied_limit))) 146 }) 147 })); 148 149 sync_summary = sync_summary.field(Field::new("collectionsSummary", TypeRef::named_nn_list_nn("CollectionSummary"), |ctx| { 150 FieldFuture::new(async move { 151 let container = ctx.parent_value.try_downcast_ref::<SyncSummaryContainer>()?; 152 let field_values: Vec<FieldValue<'_>> = container.summary.collections_summary 153 .iter() 154 .map(|col| { 155 let col_container = CollectionSummaryContainer { summary: col.clone() }; 156 FieldValue::owned_any(col_container) 157 }) 158 .collect(); 159 Ok(Some(FieldValue::list(field_values))) 160 }) 161 })); 162 163 sync_summary 164} 165 166/// Creates the StartSyncOutput GraphQL type 167pub fn create_start_sync_output_type() -> Object { 168 let mut output = Object::new("StartSyncOutput"); 169 170 output = output.field(Field::new("jobId", TypeRef::named_nn(TypeRef::STRING), |ctx| { 171 FieldFuture::new(async move { 172 let container = ctx.parent_value.try_downcast_ref::<StartSyncOutputContainer>()?; 173 Ok(Some(GraphQLValue::from(container.job_id.clone()))) 174 }) 175 })); 176 177 output = output.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| { 178 FieldFuture::new(async move { 179 let container = ctx.parent_value.try_downcast_ref::<StartSyncOutputContainer>()?; 180 Ok(Some(GraphQLValue::from(container.message.clone()))) 181 }) 182 })); 183 184 output 185} 186 187/// Creates the SyncJobResult GraphQL type 188pub fn create_sync_job_result_type() -> Object { 189 let mut result = Object::new("SyncJobResult"); 190 191 result = result.field(Field::new("success", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| { 192 FieldFuture::new(async move { 193 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?; 194 Ok(Some(GraphQLValue::from(container.result.success))) 195 }) 196 })); 197 198 result = result.field(Field::new("totalRecords", TypeRef::named_nn(TypeRef::INT), |ctx| { 199 FieldFuture::new(async move { 200 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?; 201 Ok(Some(GraphQLValue::from(container.result.total_records as i32))) 202 }) 203 })); 204 205 result = result.field(Field::new("collectionsSynced", TypeRef::named_nn_list_nn(TypeRef::STRING), |ctx| { 206 FieldFuture::new(async move { 207 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?; 208 let values: Vec<GraphQLValue> = container.result.collections_synced 209 .iter() 210 .map(|s| GraphQLValue::from(s.clone())) 211 .collect(); 212 Ok(Some(GraphQLValue::List(values))) 213 }) 214 })); 215 216 result = result.field(Field::new("reposProcessed", TypeRef::named_nn(TypeRef::INT), |ctx| { 217 FieldFuture::new(async move { 218 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?; 219 Ok(Some(GraphQLValue::from(container.result.repos_processed as i32))) 220 }) 221 })); 222 223 result = result.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| { 224 FieldFuture::new(async move { 225 let container = ctx.parent_value.try_downcast_ref::<SyncJobResultContainer>()?; 226 Ok(Some(GraphQLValue::from(container.result.message.clone()))) 227 }) 228 })); 229 230 result 231} 232 233/// Creates the SyncJob GraphQL type 234pub fn create_sync_job_type() -> Object { 235 let mut job = Object::new("SyncJob"); 236 237 // Add global ID field for Relay 238 job = job.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), |ctx| { 239 FieldFuture::new(async move { 240 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 241 // Create Relay-style global ID: base64("SyncJob:uuid") 242 let global_id = format!("SyncJob:{}", container.status.job_id); 243 let encoded = general_purpose::STANDARD.encode(global_id.as_bytes()); 244 Ok(Some(GraphQLValue::from(encoded))) 245 }) 246 })); 247 248 job = job.field(Field::new("jobId", TypeRef::named_nn(TypeRef::STRING), |ctx| { 249 FieldFuture::new(async move { 250 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 251 Ok(Some(GraphQLValue::from(container.status.job_id.to_string()))) 252 }) 253 })); 254 255 job = job.field(Field::new("sliceUri", TypeRef::named_nn(TypeRef::STRING), |ctx| { 256 FieldFuture::new(async move { 257 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 258 Ok(Some(GraphQLValue::from(container.status.slice_uri.clone()))) 259 }) 260 })); 261 262 job = job.field(Field::new("status", TypeRef::named_nn(TypeRef::STRING), |ctx| { 263 FieldFuture::new(async move { 264 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 265 Ok(Some(GraphQLValue::from(container.status.status.clone()))) 266 }) 267 })); 268 269 job = job.field(Field::new("createdAt", TypeRef::named_nn(TypeRef::STRING), |ctx| { 270 FieldFuture::new(async move { 271 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 272 Ok(Some(GraphQLValue::from(container.status.created_at.to_rfc3339()))) 273 }) 274 })); 275 276 job = job.field(Field::new("startedAt", TypeRef::named(TypeRef::STRING), |ctx| { 277 FieldFuture::new(async move { 278 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 279 Ok(container.status.started_at.map(|dt| GraphQLValue::from(dt.to_rfc3339()))) 280 }) 281 })); 282 283 job = job.field(Field::new("completedAt", TypeRef::named(TypeRef::STRING), |ctx| { 284 FieldFuture::new(async move { 285 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 286 Ok(container.status.completed_at.map(|dt| GraphQLValue::from(dt.to_rfc3339()))) 287 }) 288 })); 289 290 job = job.field(Field::new("result", TypeRef::named("SyncJobResult"), |ctx| { 291 FieldFuture::new(async move { 292 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 293 if let Some(result) = &container.status.result { 294 let result_container = SyncJobResultContainer { result: result.clone() }; 295 Ok(Some(FieldValue::owned_any(result_container))) 296 } else { 297 Ok(None) 298 } 299 }) 300 })); 301 302 job = job.field(Field::new("error", TypeRef::named(TypeRef::STRING), |ctx| { 303 FieldFuture::new(async move { 304 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 305 Ok(container.status.error.as_ref().map(|s| GraphQLValue::from(s.clone()))) 306 }) 307 })); 308 309 job = job.field(Field::new("retryCount", TypeRef::named_nn(TypeRef::INT), |ctx| { 310 FieldFuture::new(async move { 311 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 312 Ok(Some(GraphQLValue::from(container.status.retry_count as i32))) 313 }) 314 })); 315 316 job 317} 318 319/// Add startSync mutation to the Mutation type 320pub fn add_start_sync_mutation(mutation: Object, slice_uri: String) -> Object { 321 mutation.field( 322 Field::new( 323 "startSync", 324 TypeRef::named_nn("StartSyncOutput"), 325 move |ctx| { 326 let current_slice = slice_uri.clone(); 327 328 FieldFuture::new(async move { 329 // Get user_did from context (set by auth middleware) 330 let user_did = ctx 331 .data::<String>() 332 .map_err(|_| Error::new("Authentication required"))? 333 .clone(); 334 335 // Get slice parameter (defaults to current slice) 336 let slice: String = match ctx.args.get("slice") { 337 Some(val) => val.string()?.to_string(), 338 None => current_slice, 339 }; 340 341 // Get optional collections 342 let collections: Option<Vec<String>> = ctx.args.get("collections") 343 .and_then(|val| { 344 val.list().ok().map(|list| { 345 list.iter() 346 .filter_map(|v| v.string().ok().map(|s| s.to_string())) 347 .collect() 348 }) 349 }); 350 351 // Get optional external collections 352 let external_collections: Option<Vec<String>> = ctx.args.get("externalCollections") 353 .and_then(|val| { 354 val.list().ok().map(|list| { 355 list.iter() 356 .filter_map(|v| v.string().ok().map(|s| s.to_string())) 357 .collect() 358 }) 359 }); 360 361 // Get optional repos 362 let repos: Option<Vec<String>> = ctx.args.get("repos") 363 .and_then(|val| { 364 val.list().ok().map(|list| { 365 list.iter() 366 .filter_map(|v| v.string().ok().map(|s| s.to_string())) 367 .collect() 368 }) 369 }); 370 371 // Get optional limit_per_repo 372 let limit_per_repo: Option<i32> = ctx.args.get("limitPerRepo") 373 .and_then(|val| val.i64().ok().map(|i| i as i32)); 374 375 // Get optional skip_validation 376 let skip_validation: Option<bool> = ctx.args.get("skipValidation") 377 .and_then(|val| val.boolean().ok()); 378 379 // Get optional max_repos 380 let max_repos: Option<i32> = ctx.args.get("maxRepos") 381 .and_then(|val| val.i64().ok().map(|i| i as i32)); 382 383 let params = BulkSyncParams { 384 collections, 385 external_collections, 386 repos, 387 limit_per_repo, 388 skip_validation, 389 max_repos, 390 }; 391 392 // Enqueue the sync job 393 // Get pool from GraphQL context 394 let pool = ctx.data::<sqlx::PgPool>() 395 .map_err(|_| Error::new("Database pool not found in context"))?; 396 397 let job_id = enqueue_sync_job(pool, user_did, slice.clone(), params) 398 .await 399 .map_err(|e| Error::new(format!("Failed to enqueue sync job: {}", e)))?; 400 401 // Return output as container 402 let output = StartSyncOutputContainer { 403 job_id: job_id.to_string(), 404 message: format!("Sync job {} enqueued successfully", job_id), 405 }; 406 407 Ok(Some(FieldValue::owned_any(output))) 408 }) 409 }, 410 ) 411 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING))) 412 .argument(InputValue::new("collections", TypeRef::named_list(TypeRef::STRING))) 413 .argument(InputValue::new("externalCollections", TypeRef::named_list(TypeRef::STRING))) 414 .argument(InputValue::new("repos", TypeRef::named_list(TypeRef::STRING))) 415 .argument(InputValue::new("limitPerRepo", TypeRef::named(TypeRef::INT))) 416 .argument(InputValue::new("skipValidation", TypeRef::named(TypeRef::BOOLEAN))) 417 .argument(InputValue::new("maxRepos", TypeRef::named(TypeRef::INT))) 418 .description("Start a sync job to backfill collections from the ATProto relay") 419 ) 420} 421 422/// Add syncJob query to the Query type 423pub fn add_sync_job_query(query: Object) -> Object { 424 query.field( 425 Field::new( 426 "syncJob", 427 TypeRef::named("SyncJob"), 428 move |ctx| { 429 FieldFuture::new(async move { 430 // Get job_id argument 431 let job_id_str: &str = ctx.args.get("jobId") 432 .ok_or_else(|| Error::new("jobId is required"))? 433 .string()?; 434 435 let job_id = Uuid::parse_str(job_id_str) 436 .map_err(|_| Error::new("Invalid UUID format for jobId"))?; 437 438 // Get pool from GraphQL context 439 let pool = ctx.data::<sqlx::PgPool>() 440 .map_err(|_| Error::new("Database pool not found in context"))?; 441 442 // Query database for job status 443 let job_status = crate::jobs::get_job_status(pool, job_id) 444 .await 445 .map_err(|e| Error::new(format!("Failed to fetch job status: {}", e)))?; 446 447 if let Some(status) = job_status { 448 let container = JobStatusContainer { status }; 449 Ok(Some(FieldValue::owned_any(container))) 450 } else { 451 Ok(None) 452 } 453 }) 454 }, 455 ) 456 .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING))) 457 .description("Get status of a specific sync job") 458 ) 459} 460 461/// Add syncJobs query to the Query type 462pub fn add_sync_jobs_query(query: Object, slice_uri: String) -> Object { 463 query.field( 464 Field::new( 465 "syncJobs", 466 TypeRef::named_nn_list_nn("SyncJob"), 467 move |ctx| { 468 let current_slice = slice_uri.clone(); 469 470 FieldFuture::new(async move { 471 // Get optional slice filter argument 472 // If not provided, default to filtering by the current slice URI 473 let slice_filter: Option<&str> = match ctx.args.get("slice") { 474 Some(val) => val.string().ok(), 475 None => Some(current_slice.as_str()), 476 }; 477 478 // Get limit argument with default 20 479 let limit: i64 = match ctx.args.get("limit") { 480 Some(val) => val.i64().unwrap_or(20), 481 None => 20, 482 }; 483 484 // Get pool from GraphQL context 485 let pool = ctx.data::<sqlx::PgPool>() 486 .map_err(|_| Error::new("Database pool not found in context"))?; 487 488 // Query database for job results 489 let jobs = crate::jobs::get_job_history_by_slice(pool, slice_filter, Some(limit)) 490 .await 491 .map_err(|e| Error::new(format!("Failed to fetch sync jobs: {}", e)))?; 492 493 // Convert to GraphQL values 494 let field_values: Vec<FieldValue<'_>> = jobs 495 .into_iter() 496 .map(|job| { 497 let container = JobStatusContainer { status: job }; 498 FieldValue::owned_any(container) 499 }) 500 .collect(); 501 502 Ok(Some(FieldValue::list(field_values))) 503 }) 504 }, 505 ) 506 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING))) 507 .argument(InputValue::new("limit", TypeRef::named(TypeRef::INT))) 508 .description("Get sync job history for a slice") 509 ) 510} 511 512/// Add syncJobLogs query to the Query type 513pub fn add_sync_job_logs_query(query: Object) -> Object { 514 query.field( 515 Field::new( 516 "syncJobLogs", 517 TypeRef::named_nn_list_nn("JetstreamLogEntry"), 518 move |ctx| { 519 FieldFuture::new(async move { 520 // Get job_id argument 521 let job_id_str: &str = ctx.args.get("jobId") 522 .ok_or_else(|| Error::new("jobId is required"))? 523 .string()?; 524 525 let job_id = Uuid::parse_str(job_id_str) 526 .map_err(|_| Error::new("Invalid UUID format for jobId"))?; 527 528 // Get limit argument with default 100 529 let limit: Option<i64> = ctx.args.get("limit") 530 .and_then(|val| val.i64().ok()); 531 532 // Get pool from GraphQL context 533 let pool = ctx.data::<sqlx::PgPool>() 534 .map_err(|_| Error::new("Database pool not found in context"))?; 535 536 // Query database for logs 537 let logs = crate::logging::get_sync_job_logs(pool, job_id, limit) 538 .await 539 .map_err(|e| Error::new(format!("Failed to fetch sync job logs: {}", e)))?; 540 541 // Convert to GraphQL values (reuse LogEntryContainer from logs module) 542 let field_values: Vec<FieldValue<'_>> = logs 543 .into_iter() 544 .map(|log| { 545 let container = crate::graphql::schema_ext::logs::LogEntryContainer { entry: log }; 546 FieldValue::owned_any(container) 547 }) 548 .collect(); 549 550 Ok(Some(FieldValue::list(field_values))) 551 }) 552 }, 553 ) 554 .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING))) 555 .argument(InputValue::new("limit", TypeRef::named(TypeRef::INT))) 556 .description("Get logs for a specific sync job") 557 ) 558} 559 560/// Add getSyncSummary query to the Query type 561pub fn add_get_sync_summary_query(query: Object) -> Object { 562 query.field( 563 Field::new( 564 "getSyncSummary", 565 TypeRef::named_nn("SyncSummary"), 566 move |ctx| { 567 FieldFuture::new(async move { 568 // Get required slice argument 569 let slice: String = ctx.args.get("slice") 570 .ok_or_else(|| Error::new("slice is required"))? 571 .string()? 572 .to_string(); 573 574 // Get optional collections 575 let collections: Option<Vec<String>> = ctx.args.get("collections") 576 .and_then(|val| { 577 val.list().ok().map(|list| { 578 list.iter() 579 .filter_map(|v| v.string().ok().map(|s| s.to_string())) 580 .collect() 581 }) 582 }); 583 584 // Get optional external collections 585 let external_collections: Option<Vec<String>> = ctx.args.get("externalCollections") 586 .and_then(|val| { 587 val.list().ok().map(|list| { 588 list.iter() 589 .filter_map(|v| v.string().ok().map(|s| s.to_string())) 590 .collect() 591 }) 592 }); 593 594 // Get optional repos 595 let repos: Option<Vec<String>> = ctx.args.get("repos") 596 .and_then(|val| { 597 val.list().ok().map(|list| { 598 list.iter() 599 .filter_map(|v| v.string().ok().map(|s| s.to_string())) 600 .collect() 601 }) 602 }); 603 604 // Get GraphQL context 605 let gql_context = ctx.data::<crate::graphql::GraphQLContext>() 606 .map_err(|_| Error::new("GraphQL context not found"))?; 607 608 // Get pool from context 609 let pool = ctx.data::<sqlx::PgPool>() 610 .map_err(|_| Error::new("Database pool not found in context"))?; 611 612 // Create Database instance from pool 613 let database = crate::database::Database::new(pool.clone()); 614 615 // Get slice domain using existing database method 616 let slice_domain = database 617 .get_slice_domain(&slice) 618 .await 619 .map_err(|e| Error::new(format!("Failed to get slice domain: {}", e)))? 620 .ok_or_else(|| Error::new(format!("Slice not found: {}", slice)))?; 621 622 // Get default max sync repos from env var (same as main.rs) 623 let applied_limit = std::env::var("DEFAULT_MAX_SYNC_REPOS") 624 .unwrap_or_else(|_| "5000".to_string()) 625 .parse::<i32>() 626 .unwrap_or(5000); 627 628 // Get cache from context or create in-memory cache 629 let cache = gql_context.auth_cache.clone() 630 .unwrap_or_else(|| { 631 let backend = crate::cache::CacheBackendImpl::InMemory( 632 crate::cache::InMemoryCache::new(Some(300)) 633 ); 634 Arc::new(tokio::sync::Mutex::new(crate::cache::SliceCache::new(backend))) 635 }); 636 637 // Create sync service for repo discovery 638 let sync_service = crate::sync::SyncService::with_cache( 639 database, 640 "https://relay1.us-west.bsky.network".to_string(), 641 cache, 642 ); 643 644 // Discover repos if not provided 645 let (all_repos, collection_repo_counts): (Vec<String>, std::collections::HashMap<String, i64>) = 646 if let Some(provided_repos) = repos { 647 (provided_repos, std::collections::HashMap::new()) 648 } else { 649 let primary_collections = collections.clone().unwrap_or_default(); 650 let mut discovered_repos = std::collections::HashSet::new(); 651 let mut counts: std::collections::HashMap<String, i64> = std::collections::HashMap::new(); 652 653 // Get repos from primary collections 654 for collection in &primary_collections { 655 match sync_service.get_repos_for_collection(collection, &slice, Some(applied_limit)).await { 656 Ok(repos) => { 657 counts.insert(collection.clone(), repos.len() as i64); 658 discovered_repos.extend(repos); 659 } 660 Err(e) => { 661 tracing::warn!("Failed to get repos for collection {}: {}", collection, e); 662 counts.insert(collection.clone(), 0); 663 } 664 } 665 } 666 667 (discovered_repos.into_iter().collect(), counts) 668 }; 669 670 let total_repos = all_repos.len() as i64; 671 let capped_repos = std::cmp::min(total_repos, applied_limit as i64); 672 let would_be_capped = total_repos > applied_limit as i64; 673 674 // Build collections summary 675 let mut collections_summary = Vec::new(); 676 677 // Add primary collections 678 if let Some(primary_cols) = collections { 679 for collection in &primary_cols { 680 let is_external = !collection.starts_with(&slice_domain); 681 let estimated_repos = *collection_repo_counts.get(collection).unwrap_or(&0); 682 683 collections_summary.push(crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary { 684 collection: collection.clone(), 685 estimated_repos, 686 is_external, 687 }); 688 } 689 } 690 691 // Add external collections (no repo counts) 692 if let Some(ext_cols) = external_collections { 693 for collection in &ext_cols { 694 let is_external = !collection.starts_with(&slice_domain); 695 696 collections_summary.push(crate::xrpc::network::slices::slice::get_sync_summary::CollectionSummary { 697 collection: collection.clone(), 698 estimated_repos: 0, 699 is_external, 700 }); 701 } 702 } 703 704 // Create output 705 let output = crate::xrpc::network::slices::slice::get_sync_summary::Output { 706 total_repos, 707 capped_repos, 708 collections_summary, 709 would_be_capped, 710 applied_limit, 711 }; 712 713 let container = SyncSummaryContainer { summary: output }; 714 Ok(Some(FieldValue::owned_any(container))) 715 }) 716 }, 717 ) 718 .argument(InputValue::new("slice", TypeRef::named_nn(TypeRef::STRING))) 719 .argument(InputValue::new("collections", TypeRef::named_list(TypeRef::STRING))) 720 .argument(InputValue::new("externalCollections", TypeRef::named_list(TypeRef::STRING))) 721 .argument(InputValue::new("repos", TypeRef::named_list(TypeRef::STRING))) 722 .description("Get summary of repos that would be synced based on collection filters") 723 ) 724} 725 726/// Add syncJobUpdated subscription to the Subscription type 727pub fn add_sync_job_subscription(subscription: async_graphql::dynamic::Subscription) -> async_graphql::dynamic::Subscription { 728 subscription.field(SubscriptionField::new( 729 "syncJobUpdated", 730 TypeRef::named_nn("SyncJob"), 731 move |ctx| { 732 SubscriptionFieldFuture::new(async move { 733 // Get optional job filter from arguments 734 let job_filter: Option<Uuid> = ctx.args.get("jobId") 735 .and_then(|val| val.string().ok()) 736 .and_then(|s| Uuid::parse_str(s).ok()); 737 738 // Subscribe to the broadcast channel 739 let sender = get_job_channel(); 740 let sender_lock = sender.lock().await; 741 let mut receiver = sender_lock.subscribe(); 742 drop(sender_lock); // Release lock 743 744 // Get optional slice filter from arguments 745 let slice_filter: Option<String> = ctx.args.get("slice") 746 .and_then(|val| val.string().ok()) 747 .map(|s| s.to_string()); 748 749 let stream = async_stream::stream! { 750 while let Ok(job_status) = receiver.recv().await { 751 // Filter by job_id if provided 752 if let Some(filter_id) = job_filter { 753 if job_status.job_id != filter_id { 754 continue; 755 } 756 } 757 758 // Filter by slice_uri if provided 759 if let Some(ref filter_slice) = slice_filter { 760 if &job_status.slice_uri != filter_slice { 761 continue; 762 } 763 } 764 765 // Convert to GraphQL value and yield 766 let container = JobStatusContainer { status: job_status }; 767 yield Ok(FieldValue::owned_any(container)); 768 } 769 }; 770 771 Ok(stream) 772 }) 773 }, 774 ) 775 .argument(InputValue::new("jobId", TypeRef::named(TypeRef::STRING))) 776 .argument(InputValue::new("slice", TypeRef::named(TypeRef::STRING))) 777 .description("Subscribe to sync job status updates")) 778} 779 780/// Add cancelJob mutation to the Mutation type 781pub fn add_cancel_job_mutation(mutation: Object) -> Object { 782 mutation.field( 783 Field::new( 784 "cancelJob", 785 TypeRef::named_nn(TypeRef::BOOLEAN), 786 move |ctx| { 787 FieldFuture::new(async move { 788 // Get job_id argument 789 let job_id_str: &str = ctx.args.get("jobId") 790 .ok_or_else(|| Error::new("jobId is required"))? 791 .string()?; 792 793 let job_id = Uuid::parse_str(job_id_str) 794 .map_err(|_| Error::new("Invalid UUID format for jobId"))?; 795 796 // Get pool from GraphQL context 797 let pool = ctx.data::<sqlx::PgPool>() 798 .map_err(|_| Error::new("Database pool not found in context"))?; 799 800 // Cancel the job 801 let cancelled = cancel_job(pool, job_id) 802 .await 803 .map_err(|e| Error::new(format!("Failed to cancel job: {}", e)))?; 804 805 Ok(Some(GraphQLValue::from(cancelled))) 806 }) 807 }, 808 ) 809 .argument(InputValue::new("jobId", TypeRef::named_nn(TypeRef::STRING))) 810 .description("Cancel a pending or running sync job") 811 ) 812} 813 814/// Add deleteJob mutation to the Mutation type 815pub fn add_delete_job_mutation(mutation: Object) -> Object { 816 mutation.field( 817 Field::new( 818 "deleteJob", 819 TypeRef::named(TypeRef::ID), 820 move |ctx| { 821 FieldFuture::new(async move { 822 // Get global ID argument 823 let global_id: &str = ctx.args.get("id") 824 .ok_or_else(|| Error::new("id is required"))? 825 .string()?; 826 827 // Decode global ID: base64("SyncJob:uuid") -> "uuid" 828 let decoded = general_purpose::STANDARD.decode(global_id) 829 .map_err(|_| Error::new("Invalid global ID format"))?; 830 let decoded_str = String::from_utf8(decoded) 831 .map_err(|_| Error::new("Invalid global ID encoding"))?; 832 833 // Extract UUID from "SyncJob:uuid" 834 let job_id_str = decoded_str 835 .strip_prefix("SyncJob:") 836 .ok_or_else(|| Error::new("Invalid global ID: not a SyncJob"))?; 837 838 let job_id = Uuid::parse_str(job_id_str) 839 .map_err(|_| Error::new("Invalid UUID in global ID"))?; 840 841 // Get pool from GraphQL context 842 let pool = ctx.data::<sqlx::PgPool>() 843 .map_err(|_| Error::new("Database pool not found in context"))?; 844 845 // Delete the job 846 let deleted = delete_job(pool, job_id) 847 .await 848 .map_err(|e| Error::new(format!("Failed to delete job: {}", e)))?; 849 850 // Return global ID if deleted, null if not found 851 if deleted { 852 Ok(Some(GraphQLValue::from(global_id))) 853 } else { 854 Ok(None) 855 } 856 }) 857 }, 858 ) 859 .argument(InputValue::new("id", TypeRef::named_nn(TypeRef::ID))) 860 .description("Delete a sync job from the database") 861 ) 862} 863 864/// Initialize Redis pub/sub for sync job updates 865/// 866/// This function should be called once at application startup. 867/// It initializes the Redis client and starts a background task to listen for 868/// job updates from other processes (e.g., worker processes). 869/// 870/// # Arguments 871/// * `redis_url` - Optional Redis connection URL. If None, Redis pub/sub is disabled. 872pub fn initialize_redis_pubsub(redis_url: Option<String>) { 873 // Initialize Redis client (or None if not configured) 874 let client = redis_url.and_then(|url| { 875 match Client::open(url.as_str()) { 876 Ok(client) => { 877 tracing::info!("Initialized Redis client for sync job pub/sub"); 878 Some(client) 879 } 880 Err(e) => { 881 tracing::error!("Failed to create Redis client for sync job pub/sub: {}", e); 882 None 883 } 884 } 885 }); 886 887 let has_redis = client.is_some(); 888 REDIS_CLIENT.get_or_init(|| client); 889 890 // Start Redis subscription listener task if Redis is available 891 if has_redis { 892 start_redis_listener(); 893 } else { 894 tracing::info!("Redis not configured - sync job updates will use in-memory broadcast only"); 895 } 896} 897 898/// Start a background task that subscribes to Redis and forwards messages to the in-memory broadcast channel 899fn start_redis_listener() { 900 tokio::spawn(async { 901 tracing::info!("Starting Redis subscription listener for sync job updates"); 902 903 loop { 904 // Get Redis client 905 let client = match REDIS_CLIENT.get() { 906 Some(Some(client)) => client, 907 _ => { 908 tracing::error!("Redis client not available for subscription"); 909 return; 910 } 911 }; 912 913 // Connect and subscribe 914 match subscribe_to_redis(client).await { 915 Ok(_) => { 916 tracing::warn!("Redis subscription ended, reconnecting in 5 seconds..."); 917 } 918 Err(e) => { 919 tracing::error!("Redis subscription error: {}, reconnecting in 5 seconds...", e); 920 } 921 } 922 923 // Wait before reconnecting 924 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 925 } 926 }); 927} 928 929/// Subscribe to Redis channel and forward messages to in-memory broadcast 930async fn subscribe_to_redis(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 931 // Create a pub/sub connection from the client 932 let mut pubsub = client.get_async_pubsub().await?; 933 934 pubsub.subscribe("sync_job_updates").await?; 935 tracing::info!("Subscribed to Redis channel: sync_job_updates"); 936 937 // Get the in-memory broadcast sender 938 let sender = get_job_channel(); 939 940 loop { 941 let msg = pubsub.on_message().next().await; 942 if let Some(msg) = msg { 943 let payload: String = msg.get_payload()?; 944 945 // Deserialize JobStatus from JSON 946 match serde_json::from_str::<JobStatus>(&payload) { 947 Ok(job_status) => { 948 // Forward to in-memory broadcast channel 949 let sender_lock = sender.lock().await; 950 if let Err(e) = sender_lock.send(job_status.clone()) { 951 tracing::debug!("No local subscribers for job update: {}", e); 952 } 953 drop(sender_lock); 954 955 tracing::debug!("Forwarded job update from Redis: job_id={}", job_status.job_id); 956 } 957 Err(e) => { 958 tracing::warn!("Failed to deserialize job status from Redis: {}", e); 959 } 960 } 961 } 962 } 963}