Highly ambitious ATProtocol AppView service and sdks
at main 1153 lines 39 kB view raw
1//! Background job system for asynchronous collection synchronization. 2//! 3//! This module uses sqlxmq (a PostgreSQL-backed message queue) to handle 4//! background sync jobs. Jobs are: 5//! - Enqueued with deduplication checks (one active job per user+slice) 6//! - Executed asynchronously in background workers 7//! - Retried up to 5 times on failure 8//! - Tracked with detailed logging and result persistence 9//! 10//! The sync process fetches records from AT Protocol relays and validates them 11//! against Lexicon schemas before persisting to the database. 12 13use crate::cache; 14use crate::errors::SyncError; 15use crate::graphql::schema_ext::publish_sync_job_update; 16use crate::logging::LogLevel; 17use crate::models::BulkSyncParams; 18use crate::sync::SyncService; 19use serde::{Deserialize, Serialize}; 20use serde_json::json; 21use sqlx::PgPool; 22use sqlxmq::{CurrentJob, JobRegistry, job}; 23use std::sync::Arc; 24use tokio::sync::Mutex; 25use tracing::{error, info}; 26use uuid::Uuid; 27 28/// Job payload containing all parameters needed to execute a sync job. 29#[derive(Debug, Clone, Serialize, Deserialize)] 30pub struct SyncJobPayload { 31 /// Unique identifier for tracking this specific job execution 32 pub job_id: Uuid, 33 /// Decentralized identifier of the user requesting the sync 34 pub user_did: String, 35 /// AT-URI of the slice being synchronized 36 pub slice_uri: String, 37 /// Synchronization parameters (collections, repos, validation settings) 38 pub params: BulkSyncParams, 39} 40 41/// Result data persisted after job completion or failure. 42/// 43/// This is stored in the `job_results` table for historical tracking and 44/// status queries. Field names are camelCased for JSON API responses. 45#[derive(Debug, Clone, Serialize, Deserialize)] 46#[serde(rename_all = "camelCase")] 47pub struct SyncJobResult { 48 pub success: bool, 49 pub total_records: i64, 50 pub collections_synced: Vec<String>, 51 pub repos_processed: i64, 52 pub message: String, 53} 54 55/// Initializes the sqlxmq job registry with all job handlers. 56/// 57/// This must be called once at application startup to register job handlers 58/// before workers can process jobs from the queue. 59/// 60/// # Returns 61/// A configured JobRegistry containing all registered job handlers 62pub fn registry() -> JobRegistry { 63 JobRegistry::new(&[sync_job]) 64} 65 66/// Background job handler for collection synchronization. 67/// 68/// This is the main worker function that executes sync jobs from the queue. 69/// It performs the following steps: 70/// 1. Extracts job payload and validates parameters 71/// 2. Initializes sync service with logging and caching 72/// 3. Fetches and validates records from AT Protocol relays 73/// 4. Persists results to the database 74/// 5. Logs detailed progress and completion status 75/// 76/// # Job Behavior 77/// - Channel: `sync_queue` 78/// - Retries: Up to 5 attempts on failure 79/// - Concurrency: Multiple jobs can run in parallel 80/// - Deduplication: Enforced at enqueue time (one active job per user+slice) 81/// 82/// # Arguments 83/// * `current_job` - The sqlxmq job context containing payload and database access 84/// 85/// # Returns 86/// * `Ok(())` - Job completed successfully and marked complete 87/// * `Err(...)` - Job failed and will be retried (up to max retry limit) 88#[job(channel_name = "sync_queue")] 89async fn sync_job( 90 mut current_job: CurrentJob, 91) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 92 let payload: SyncJobPayload = current_job.json()?.expect("Invalid job payload"); 93 94 info!( 95 "Starting sync job {} for user {} on slice {}", 96 payload.job_id, payload.user_did, payload.slice_uri 97 ); 98 99 // Access database pool and global logger for this job execution 100 let pool = current_job.pool(); 101 let logger = crate::logging::Logger::global(); 102 103 // Log job start 104 logger.log_sync_job( 105 payload.job_id, 106 &payload.user_did, 107 &payload.slice_uri, 108 LogLevel::Info, 109 &format!( 110 "Starting sync job for {} collections", 111 payload 112 .params 113 .collections 114 .as_ref() 115 .map(|c| c.len()) 116 .unwrap_or(0) 117 + payload 118 .params 119 .external_collections 120 .as_ref() 121 .map(|c| c.len()) 122 .unwrap_or(0) 123 ), 124 Some(json!({ 125 "collections": payload.params.collections, 126 "external_collections": payload.params.external_collections, 127 "repos": payload.params.repos, 128 "skip_validation": payload.params.skip_validation 129 })), 130 ); 131 132 // Insert "running" status into job_results immediately (enables cancellation) 133 let now = chrono::Utc::now(); 134 sqlx::query!( 135 r#" 136 INSERT INTO job_results (job_id, user_did, slice_uri, status, success, message, created_at) 137 VALUES ($1, $2, $3, 'running', false, 'Job is running...', $4) 138 ON CONFLICT (job_id) DO NOTHING 139 "#, 140 payload.job_id, 141 payload.user_did, 142 payload.slice_uri, 143 now 144 ) 145 .execute(pool) 146 .await?; 147 148 // Publish job running status to subscribers 149 let running_status = JobStatus { 150 job_id: payload.job_id, 151 slice_uri: payload.slice_uri.clone(), 152 status: "running".to_string(), 153 created_at: now, 154 started_at: Some(now), 155 completed_at: None, 156 result: None, 157 error: None, 158 retry_count: 0, 159 }; 160 crate::graphql::schema_ext::publish_sync_job_update(running_status).await; 161 162 // Check if job was cancelled before we even started 163 let cancelled_check = sqlx::query!( 164 r#" 165 SELECT status 166 FROM job_results 167 WHERE job_id = $1 168 AND status = 'cancelled' 169 "#, 170 payload.job_id 171 ) 172 .fetch_optional(pool) 173 .await?; 174 175 if cancelled_check.is_some() { 176 info!("Job {} was cancelled before execution started", payload.job_id); 177 current_job.complete().await?; 178 return Ok(()); 179 } 180 181 // Initialize sync service with database, relay endpoint, and caching 182 let database = crate::database::Database::from_pool(pool.clone()); 183 let relay_endpoint = std::env::var("RELAY_ENDPOINT") 184 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()); 185 186 // Create in-memory cache for DID resolution with 24-hour TTL to reduce identity lookups 187 let cache = Arc::new(Mutex::new( 188 cache::CacheFactory::create_slice_cache(cache::CacheBackend::InMemory { 189 ttl_seconds: Some(24 * 60 * 60), 190 }) 191 .await?, 192 )); 193 194 let sync_service = SyncService::with_logging_and_cache( 195 database.clone(), 196 relay_endpoint, 197 logger.clone(), 198 payload.job_id, 199 payload.user_did.clone(), 200 cache, 201 ); 202 203 // Track execution time for performance monitoring 204 let start_time = std::time::Instant::now(); 205 206 // Execute the synchronization process 207 match sync_service 208 .backfill_collections( 209 &payload.slice_uri, 210 payload.params.collections.as_deref(), 211 payload.params.external_collections.as_deref(), 212 payload.params.repos.as_deref(), 213 payload.params.skip_validation.unwrap_or(false), 214 payload.params.max_repos, 215 ) 216 .await 217 { 218 Ok((repos_processed, records_synced)) => { 219 let elapsed = start_time.elapsed(); 220 let result = SyncJobResult { 221 success: true, 222 total_records: records_synced, 223 collections_synced: [ 224 payload.params.collections.unwrap_or_default(), 225 payload.params.external_collections.unwrap_or_default(), 226 ] 227 .concat(), 228 repos_processed, 229 message: "Sync completed successfully".to_string(), 230 }; 231 232 // Log completion with detailed metrics for monitoring 233 logger.log_sync_job( 234 payload.job_id, 235 &payload.user_did, 236 &payload.slice_uri, 237 LogLevel::Info, 238 &format!( 239 "Sync completed successfully: {} repos, {} records in {:?}", 240 repos_processed, records_synced, elapsed 241 ), 242 Some(json!({ 243 "repos_processed": repos_processed, 244 "records_synced": records_synced, 245 "duration_secs": elapsed.as_secs_f64(), 246 "collections_synced": result.collections_synced 247 })), 248 ); 249 250 // Persist job result before marking complete (ensures result is queryable) 251 store_job_result( 252 pool, 253 payload.job_id, 254 &payload.user_did, 255 &payload.slice_uri, 256 &result, 257 None, 258 ) 259 .await?; 260 261 // Publish job status update to GraphQL subscribers 262 let job_status = JobStatus { 263 job_id: payload.job_id, 264 slice_uri: payload.slice_uri.clone(), 265 status: "completed".to_string(), 266 created_at: chrono::Utc::now(), 267 started_at: Some(chrono::Utc::now()), 268 completed_at: Some(chrono::Utc::now()), 269 result: Some(result.clone()), 270 error: None, 271 retry_count: 0, 272 }; 273 publish_sync_job_update(job_status).await; 274 275 info!( 276 "Sync job {} completed successfully: {} repos, {} records", 277 payload.job_id, repos_processed, records_synced 278 ); 279 280 // CRITICAL: Explicitly mark job as complete to prevent automatic retry 281 // Without this, sqlxmq will treat the job as failed and retry it 282 current_job.complete().await?; 283 284 info!( 285 "Sync job {} marked as complete and will be cleaned up", 286 payload.job_id 287 ); 288 289 Ok(()) 290 } 291 Err(e) => { 292 // Special handling for cancelled jobs - don't overwrite the cancelled status 293 if matches!(e, SyncError::Cancelled) { 294 info!("Sync job {} was cancelled", payload.job_id); 295 296 // Mark job as complete (no retry needed for cancelled jobs) 297 current_job.complete().await?; 298 299 return Ok(()); 300 } 301 302 error!("Sync job {} failed: {}", payload.job_id, e); 303 304 // Log error details for debugging and user visibility 305 logger.log_sync_job( 306 payload.job_id, 307 &payload.user_did, 308 &payload.slice_uri, 309 LogLevel::Error, 310 &format!("Sync job failed: {}", e), 311 Some(json!({ 312 "error": e.to_string(), 313 "duration_secs": start_time.elapsed().as_secs_f64() 314 })), 315 ); 316 317 let result = SyncJobResult { 318 success: false, 319 total_records: 0, 320 collections_synced: vec![], 321 repos_processed: 0, 322 message: format!("Sync failed: {}", e), 323 }; 324 325 // Persist failure result even if job will retry (for status tracking) 326 if let Err(db_err) = store_job_result( 327 pool, 328 payload.job_id, 329 &payload.user_did, 330 &payload.slice_uri, 331 &result, 332 Some(&format!("{}", e)), 333 ) 334 .await 335 { 336 error!("Failed to store job result: {}", db_err); 337 } 338 339 // Publish job status update to GraphQL subscribers 340 let job_status = JobStatus { 341 job_id: payload.job_id, 342 slice_uri: payload.slice_uri.clone(), 343 status: "failed".to_string(), 344 created_at: chrono::Utc::now(), 345 started_at: Some(chrono::Utc::now()), 346 completed_at: Some(chrono::Utc::now()), 347 result: Some(result.clone()), 348 error: Some(format!("{}", e)), 349 retry_count: 0, 350 }; 351 publish_sync_job_update(job_status).await; 352 353 // Return error to trigger sqlxmq's automatic retry mechanism (up to 5 attempts) 354 Err(Box::new(e)) 355 } 356 } 357} 358 359/// Persists job result to the database for status queries and historical tracking. 360/// 361/// This is called both on success and failure to ensure result data is available 362/// via the job status API. Uses UPSERT to handle retries (updates existing result). 363/// 364/// # Arguments 365/// * `pool` - PostgreSQL connection pool 366/// * `job_id` - Unique identifier for the job 367/// * `user_did` - User who initiated the job 368/// * `slice_uri` - Slice being synchronized 369/// * `result` - Job result data (success/failure, metrics) 370/// * `error_message` - Optional error details for failed jobs 371/// 372/// # Returns 373/// * `Ok(())` - Result stored successfully 374/// * `Err(sqlx::Error)` - Database error during insert/update 375async fn store_job_result( 376 pool: &PgPool, 377 job_id: Uuid, 378 user_did: &str, 379 slice_uri: &str, 380 result: &SyncJobResult, 381 error_message: Option<&str>, 382) -> Result<(), sqlx::Error> { 383 info!( 384 "Storing job result: job_id={}, user_did={}, slice_uri={}, success={}", 385 job_id, user_did, slice_uri, result.success 386 ); 387 388 // Convert collections list to JSONB for storage 389 let collections_json = serde_json::to_value(&result.collections_synced) 390 .map_err(|e| sqlx::Error::Protocol(format!("Failed to serialize collections: {}", e)))?; 391 392 // UPSERT: insert new result or update existing on retry 393 sqlx::query!( 394 r#" 395 INSERT INTO job_results ( 396 job_id, user_did, slice_uri, status, success, total_records, 397 collections_synced, repos_processed, message, error_message 398 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 399 ON CONFLICT (job_id) 400 DO UPDATE SET 401 status = EXCLUDED.status, 402 success = EXCLUDED.success, 403 total_records = EXCLUDED.total_records, 404 collections_synced = EXCLUDED.collections_synced, 405 repos_processed = EXCLUDED.repos_processed, 406 message = EXCLUDED.message, 407 error_message = EXCLUDED.error_message, 408 completed_at = NOW() 409 WHERE job_results.status != 'cancelled' 410 "#, 411 job_id, 412 user_did, 413 slice_uri, 414 if result.success { 415 "completed" 416 } else { 417 "failed" 418 }, 419 result.success, 420 result.total_records, 421 collections_json, 422 result.repos_processed, 423 result.message, 424 error_message, 425 ) 426 .execute(pool) 427 .await?; 428 429 Ok(()) 430} 431 432/// Enqueues a new sync job with deduplication checks. 433/// 434/// This function ensures only one active sync job exists per user+slice combination 435/// by checking both the message queue and recent job results. This prevents: 436/// - Duplicate jobs competing for the same data 437/// - Wasted resources on redundant syncs 438/// - Race conditions in record persistence 439/// 440/// # Arguments 441/// * `pool` - PostgreSQL connection pool 442/// * `user_did` - Decentralized identifier of the user 443/// * `slice_uri` - AT-URI of the slice to synchronize 444/// * `params` - Sync parameters (collections, repos, validation settings) 445/// 446/// # Returns 447/// * `Ok(Uuid)` - Job ID of the newly enqueued job 448/// * `Err(...)` - Error if job already running or enqueue fails 449/// 450/// # Deduplication Strategy 451/// 1. Check for pending jobs in `mq_msgs` queue 452/// 2. Check for recent jobs (< 10 min) without results 453/// 3. Reject if either check finds an active job 454pub async fn enqueue_sync_job( 455 pool: &PgPool, 456 user_did: String, 457 slice_uri: String, 458 params: BulkSyncParams, 459) -> Result<Uuid, Box<dyn std::error::Error + Send + Sync>> { 460 // Deduplication check 1: Look for pending jobs in the message queue 461 // This catches jobs that haven't started executing yet 462 let existing_running_msg = sqlx::query!( 463 r#" 464 SELECT m.id 465 FROM mq_msgs m 466 JOIN mq_payloads p ON m.id = p.id 467 WHERE m.channel_name = 'sync_queue' 468 AND m.id != '00000000-0000-0000-0000-000000000000' 469 AND p.payload_json->>'user_did' = $1 470 AND p.payload_json->>'slice_uri' = $2 471 AND m.attempt_at <= NOW() 472 "#, 473 user_did, 474 slice_uri 475 ) 476 .fetch_optional(pool) 477 .await?; 478 479 // Deduplication check 2: Look for recently started jobs without results 480 // This catches jobs that started but haven't written results yet (< 10 min) 481 let recent_start = sqlx::query!( 482 r#" 483 SELECT m.id 484 FROM mq_msgs m 485 JOIN mq_payloads p ON m.id = p.id 486 LEFT JOIN job_results jr ON (p.payload_json->>'job_id')::uuid = jr.job_id 487 WHERE m.channel_name = 'sync_queue' 488 AND m.id != '00000000-0000-0000-0000-000000000000' 489 AND p.payload_json->>'user_did' = $1 490 AND p.payload_json->>'slice_uri' = $2 491 AND m.created_at > NOW() - INTERVAL '10 minutes' 492 AND jr.job_id IS NULL 493 "#, 494 user_did, 495 slice_uri 496 ) 497 .fetch_optional(pool) 498 .await?; 499 500 // Deduplication check 3: Look for running jobs in job_results 501 // This catches jobs that have started and written a "running" status 502 let running_job = sqlx::query!( 503 r#" 504 SELECT job_id 505 FROM job_results 506 WHERE user_did = $1 507 AND slice_uri = $2 508 AND status = 'running' 509 AND created_at > NOW() - INTERVAL '10 minutes' 510 "#, 511 user_did, 512 slice_uri 513 ) 514 .fetch_optional(pool) 515 .await?; 516 517 if existing_running_msg.is_some() || recent_start.is_some() || running_job.is_some() { 518 return Err("A sync job is already running for this slice. Please wait for it to complete before starting another.".into()); 519 } 520 521 // Generate unique job ID for tracking and result storage 522 let job_id = Uuid::new_v4(); 523 524 let payload = SyncJobPayload { 525 job_id, 526 user_did: user_did.clone(), 527 slice_uri: slice_uri.clone(), 528 params, 529 }; 530 531 // Enqueue the job using sqlxmq's builder pattern 532 let job_uuid = sync_job.builder().set_json(&payload)?.spawn(pool).await?; 533 534 info!( 535 "Enqueued sync job {} (queue id: {}) for user {}", 536 job_id, job_uuid, user_did 537 ); 538 539 // Publish job creation event to subscribers 540 let job_status = JobStatus { 541 job_id, 542 slice_uri: slice_uri.clone(), 543 status: "pending".to_string(), 544 created_at: chrono::Utc::now(), 545 started_at: None, 546 completed_at: None, 547 result: None, 548 error: None, 549 retry_count: 0, 550 }; 551 crate::graphql::schema_ext::publish_sync_job_update(job_status).await; 552 553 Ok(job_id) 554} 555 556/// Status information for a sync job, including progress and results. 557/// 558/// This combines data from both the message queue (for pending jobs) and 559/// the job_results table (for completed jobs). Field names are camelCased 560/// for JSON API responses. 561#[derive(Debug, Clone, Serialize, Deserialize)] 562#[serde(rename_all = "camelCase")] 563pub struct JobStatus { 564 /// Unique identifier for the job 565 pub job_id: Uuid, 566 /// Slice URI this job belongs to 567 pub slice_uri: String, 568 /// Current status: "pending", "running", "completed", or "failed" 569 pub status: String, 570 /// Timestamp when job was enqueued 571 pub created_at: chrono::DateTime<chrono::Utc>, 572 /// Timestamp when job execution started (None if still pending) 573 pub started_at: Option<chrono::DateTime<chrono::Utc>>, 574 /// Timestamp when job finished (None if still running) 575 pub completed_at: Option<chrono::DateTime<chrono::Utc>>, 576 /// Detailed result data (None if still running) 577 pub result: Option<SyncJobResult>, 578 /// Error message if job failed 579 pub error: Option<String>, 580 /// Number of retry attempts remaining (5 max) 581 pub retry_count: i32, 582} 583 584/// Retrieves the current status of a sync job. 585/// 586/// This function checks both the job_results table (for completed jobs) and 587/// the message queue (for pending/running jobs) to provide comprehensive status. 588/// 589/// # Arguments 590/// * `pool` - PostgreSQL connection pool 591/// * `job_id` - Unique identifier of the job to query 592/// 593/// # Returns 594/// * `Ok(Some(JobStatus))` - Job found with current status 595/// * `Ok(None)` - Job not found (may have been cleaned up) 596/// * `Err(sqlx::Error)` - Database query error 597pub async fn get_job_status(pool: &PgPool, job_id: Uuid) -> Result<Option<JobStatus>, sqlx::Error> { 598 // Priority 1: Check for completed job result (most common case) 599 let result_row = sqlx::query!( 600 r#" 601 SELECT 602 job_id, user_did, slice_uri, status, success, total_records, 603 collections_synced, repos_processed, message, error_message, 604 created_at, completed_at 605 FROM job_results 606 WHERE job_id = $1 607 "#, 608 job_id 609 ) 610 .fetch_optional(pool) 611 .await?; 612 613 if let Some(result) = result_row { 614 // Found completed job, construct status from result data 615 let collections_synced: Vec<String> = 616 serde_json::from_value(result.collections_synced).unwrap_or_default(); 617 618 return Ok(Some(JobStatus { 619 job_id, 620 slice_uri: result.slice_uri, 621 status: result.status, 622 created_at: result.created_at, 623 started_at: Some(result.created_at), 624 completed_at: Some(result.completed_at), 625 result: Some(SyncJobResult { 626 success: result.success, 627 total_records: result.total_records, 628 collections_synced, 629 repos_processed: result.repos_processed, 630 message: result.message, 631 }), 632 error: result.error_message, 633 retry_count: 0, 634 })); 635 } 636 637 // Priority 2: Check message queue for pending/running jobs 638 let queue_row = sqlx::query!( 639 r#" 640 SELECT 641 m.id, 642 m.created_at, 643 m.attempt_at, 644 m.attempts, 645 p.payload_json 646 FROM mq_msgs m 647 LEFT JOIN mq_payloads p ON m.id = p.id 648 WHERE p.payload_json::jsonb ->> 'job_id' = $1 649 "#, 650 job_id.to_string() 651 ) 652 .fetch_optional(pool) 653 .await?; 654 655 match queue_row { 656 Some(row) => { 657 // Extract slice_uri from payload JSON 658 let slice_uri = row.payload_json 659 .as_ref() 660 .and_then(|json| json.get("slice_uri")) 661 .and_then(|v| v.as_str()) 662 .unwrap_or_default() 663 .to_string(); 664 665 // Determine status based on attempt_at timestamp 666 let status = if row.attempt_at.is_none() { 667 "completed".to_string() 668 } else if let Some(attempt_at) = row.attempt_at { 669 if attempt_at <= chrono::Utc::now() { 670 "running".to_string() 671 } else { 672 "pending".to_string() 673 } 674 } else { 675 "pending".to_string() 676 }; 677 678 Ok(Some(JobStatus { 679 job_id, 680 slice_uri, 681 status: status.clone(), 682 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 683 started_at: if status == "running" || status == "completed" { 684 row.created_at 685 } else { 686 None 687 }, 688 completed_at: if status == "completed" { 689 row.attempt_at 690 } else { 691 None 692 }, 693 result: None, 694 error: None, 695 retry_count: 5 - row.attempts, 696 })) 697 } 698 None => { 699 // Job not found anywhere - either never existed or was cleaned up 700 Ok(None) 701 } 702 } 703} 704 705/// Retrieves job history for a specific user and slice combination. 706/// 707/// This returns both completed jobs (from job_results) and pending/running jobs 708/// (from the message queue), ordered by creation time (most recent first). 709/// Useful for displaying sync history in the UI. 710/// 711/// # Arguments 712/// * `pool` - PostgreSQL connection pool 713/// * `user_did` - User's decentralized identifier 714/// * `slice_uri` - AT-URI of the slice 715/// * `limit` - Optional maximum number of results (default: 10) 716/// 717/// # Returns 718/// * `Ok(Vec<JobStatus>)` - List of job statuses ordered by recency 719/// * `Err(sqlx::Error)` - Database query error 720pub async fn get_slice_job_history( 721 pool: &PgPool, 722 user_did: &str, 723 slice_uri: &str, 724 limit: Option<i64>, 725) -> Result<Vec<JobStatus>, sqlx::Error> { 726 let limit = limit.unwrap_or(10); 727 728 info!( 729 "Querying job history: user_did={}, slice_uri={}, limit={}", 730 user_did, slice_uri, limit 731 ); 732 733 // Query combines completed jobs (job_results) and pending jobs (mq_msgs) via UNION 734 let rows = sqlx::query!( 735 r#" 736 -- Completed jobs from job_results 737 SELECT 738 job_id, user_did, slice_uri, status, success, total_records, 739 collections_synced, repos_processed, message, error_message, 740 created_at, completed_at, 741 'completed' as job_type 742 FROM job_results 743 WHERE user_did = $1 AND slice_uri = $2 744 745 UNION ALL 746 747 -- Pending jobs from message queue 748 SELECT 749 (p.payload_json->>'job_id')::uuid as job_id, 750 p.payload_json->>'user_did' as user_did, 751 p.payload_json->>'slice_uri' as slice_uri, 752 'running' as status, 753 NULL::boolean as success, 754 NULL::bigint as total_records, 755 '[]'::jsonb as collections_synced, 756 NULL::bigint as repos_processed, 757 'Job in progress...' as message, 758 NULL::text as error_message, 759 m.created_at, 760 NULL::timestamptz as completed_at, 761 'pending' as job_type 762 FROM mq_msgs m 763 JOIN mq_payloads p ON m.id = p.id 764 WHERE m.channel_name = 'sync_queue' 765 AND m.id != '00000000-0000-0000-0000-000000000000' 766 AND p.payload_json->>'user_did' = $1 767 AND p.payload_json->>'slice_uri' = $2 768 AND NOT EXISTS ( 769 SELECT 1 FROM job_results jr 770 WHERE jr.job_id = (p.payload_json->>'job_id')::uuid 771 ) 772 773 ORDER BY created_at DESC 774 LIMIT $3 775 "#, 776 user_did, 777 slice_uri, 778 limit 779 ) 780 .fetch_all(pool) 781 .await?; 782 783 // Transform database rows into JobStatus structs 784 let mut results = Vec::new(); 785 for row in rows { 786 let collections_synced: Vec<String> = serde_json::from_value( 787 row.collections_synced 788 .unwrap_or_else(|| serde_json::json!([])), 789 ) 790 .unwrap_or_default(); 791 792 // Differentiate between pending jobs (no result data) and completed jobs 793 let result = if row.job_type.as_deref() == Some("pending") || row.success.is_none() { 794 // This is a pending job - no result data available 795 None 796 } else { 797 // This is a completed job - include result data 798 Some(SyncJobResult { 799 success: row.success.unwrap_or(false), 800 total_records: row.total_records.unwrap_or(0), 801 collections_synced, 802 repos_processed: row.repos_processed.unwrap_or(0), 803 message: row.message.clone().unwrap_or_default(), 804 }) 805 }; 806 807 results.push(JobStatus { 808 job_id: row.job_id.unwrap_or_else(Uuid::new_v4), 809 slice_uri: row.slice_uri.clone().unwrap_or_default(), 810 status: row.status.unwrap_or_default(), 811 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 812 started_at: row.created_at, 813 completed_at: row.completed_at, 814 result, 815 error: row.error_message, 816 retry_count: 0, 817 }); 818 } 819 820 Ok(results) 821} 822 823/// Gets job history for a slice (without user filter). 824/// 825/// This function is used by the GraphQL API to show all jobs for a slice, 826/// regardless of which user created them. It combines both completed jobs 827/// from job_results and pending jobs from the message queue. 828/// 829/// # Arguments 830/// 831/// * `pool` - Database connection pool 832/// * `slice_uri` - Optional slice URI to filter by (if None, returns all jobs) 833/// * `limit` - Optional maximum number of jobs to return (default: 20) 834/// 835/// # Returns 836/// 837/// * `Ok(Vec<JobStatus>)` - List of job statuses ordered by recency 838/// * `Err(sqlx::Error)` - Database query error 839pub async fn get_job_history_by_slice( 840 pool: &PgPool, 841 slice_uri: Option<&str>, 842 limit: Option<i64>, 843) -> Result<Vec<JobStatus>, sqlx::Error> { 844 let limit = limit.unwrap_or(20); 845 846 // Build WHERE clause conditionally 847 let slice_filter = slice_uri.unwrap_or("%"); // Match all if no filter 848 849 let rows = sqlx::query!( 850 r#" 851 -- Completed jobs from job_results 852 SELECT 853 job_id, user_did, slice_uri, status, success, total_records, 854 collections_synced, repos_processed, message, error_message, 855 created_at, completed_at, 856 'completed' as job_type 857 FROM job_results 858 WHERE slice_uri LIKE $1 859 860 UNION ALL 861 862 -- Pending jobs from message queue 863 SELECT 864 (p.payload_json->>'job_id')::uuid as job_id, 865 p.payload_json->>'user_did' as user_did, 866 p.payload_json->>'slice_uri' as slice_uri, 867 'running' as status, 868 NULL::boolean as success, 869 NULL::bigint as total_records, 870 '[]'::jsonb as collections_synced, 871 NULL::bigint as repos_processed, 872 'Job in progress...' as message, 873 NULL::text as error_message, 874 m.created_at, 875 NULL::timestamptz as completed_at, 876 'pending' as job_type 877 FROM mq_msgs m 878 JOIN mq_payloads p ON m.id = p.id 879 WHERE m.channel_name = 'sync_queue' 880 AND m.id != '00000000-0000-0000-0000-000000000000' 881 AND p.payload_json->>'slice_uri' LIKE $1 882 AND NOT EXISTS ( 883 SELECT 1 FROM job_results jr 884 WHERE jr.job_id = (p.payload_json->>'job_id')::uuid 885 ) 886 887 ORDER BY created_at DESC 888 LIMIT $2 889 "#, 890 slice_filter, 891 limit 892 ) 893 .fetch_all(pool) 894 .await?; 895 896 // Transform database rows into JobStatus structs 897 let mut results = Vec::new(); 898 for row in rows { 899 let collections_synced: Vec<String> = serde_json::from_value( 900 row.collections_synced 901 .unwrap_or_else(|| serde_json::json!([])), 902 ) 903 .unwrap_or_default(); 904 905 // Differentiate between pending jobs (no result data) and completed jobs 906 let result = if row.job_type.as_deref() == Some("pending") || row.success.is_none() { 907 // This is a pending job - no result data available 908 None 909 } else { 910 // This is a completed job - include result data 911 Some(SyncJobResult { 912 success: row.success.unwrap_or(false), 913 total_records: row.total_records.unwrap_or(0), 914 collections_synced, 915 repos_processed: row.repos_processed.unwrap_or(0), 916 message: row.message.clone().unwrap_or_default(), 917 }) 918 }; 919 920 results.push(JobStatus { 921 job_id: row.job_id.unwrap_or_else(Uuid::new_v4), 922 slice_uri: row.slice_uri.clone().unwrap_or_default(), 923 status: row.status.unwrap_or_default(), 924 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 925 started_at: row.created_at, 926 completed_at: row.completed_at, 927 result, 928 error: row.error_message, 929 retry_count: 0, 930 }); 931 } 932 933 Ok(results) 934} 935 936/// Cancels a sync job by marking it as cancelled in the database. 937/// 938/// This function handles both pending and running jobs: 939/// - Pending jobs: Removed from queue and marked as cancelled 940/// - Running jobs: Marked as cancelled (worker will check and exit) 941/// - Completed jobs: Cannot be cancelled 942/// 943/// # Arguments 944/// * `pool` - PostgreSQL connection pool 945/// * `job_id` - UUID of the job to cancel 946/// 947/// # Returns 948/// * `Ok(true)` - Job was successfully cancelled 949/// * `Ok(false)` - Job could not be cancelled (already completed) 950/// * `Err(sqlx::Error)` - Database query error 951pub async fn cancel_job(pool: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> { 952 // First, check if job is already completed (can't cancel) 953 let existing_result = sqlx::query!( 954 r#" 955 SELECT status 956 FROM job_results 957 WHERE job_id = $1 958 "#, 959 job_id 960 ) 961 .fetch_optional(pool) 962 .await?; 963 964 if let Some(result) = existing_result { 965 // Job already has a result - can only cancel if it's not already completed/failed/cancelled 966 if result.status == "completed" || result.status == "failed" || result.status == "cancelled" { 967 tracing::warn!("Cannot cancel job {} - already in status: {}", job_id, result.status); 968 return Ok(false); 969 } 970 } 971 972 // Try to get job details from the queue 973 let job_info = sqlx::query!( 974 r#" 975 SELECT p.payload_json 976 FROM mq_msgs m 977 JOIN mq_payloads p ON m.id = p.id 978 WHERE m.channel_name = 'sync_queue' 979 AND p.payload_json->>'job_id' = $1 980 LIMIT 1 981 "#, 982 job_id.to_string() 983 ) 984 .fetch_optional(pool) 985 .await?; 986 987 let (user_did, slice_uri) = if let Some(info) = job_info { 988 // Extract user_did and slice_uri from the payload 989 let payload = info.payload_json.ok_or_else(|| 990 sqlx::Error::Protocol("Job payload_json is null".into()) 991 )?; 992 993 let user_did = payload["user_did"].as_str().unwrap_or_default().to_string(); 994 let slice_uri = payload["slice_uri"].as_str().unwrap_or_default().to_string(); 995 996 // Delete from the message queue (for pending jobs) 997 sqlx::query!( 998 r#" 999 DELETE FROM mq_msgs 1000 WHERE id = ( 1001 SELECT m.id 1002 FROM mq_msgs m 1003 JOIN mq_payloads p ON m.id = p.id 1004 WHERE m.channel_name = 'sync_queue' 1005 AND p.payload_json->>'job_id' = $1 1006 LIMIT 1 1007 ) 1008 "#, 1009 job_id.to_string() 1010 ) 1011 .execute(pool) 1012 .await?; 1013 1014 (user_did, slice_uri) 1015 } else { 1016 // Job not in queue - might be running, try to find it in job_results to get user/slice info 1017 tracing::warn!("Job {} not found in queue, checking if it's running", job_id); 1018 1019 // Since we don't have user_did/slice_uri and the job might be running, we need to mark it somehow 1020 // For running jobs, we'll insert a cancellation record without those fields 1021 // The worker will check for cancellation and update with proper details when it finds it 1022 ("".to_string(), "".to_string()) 1023 }; 1024 1025 // Insert/update cancelled status into job_results 1026 let now = chrono::Utc::now(); 1027 1028 if !user_did.is_empty() && !slice_uri.is_empty() { 1029 // We have the full details - insert complete record 1030 sqlx::query!( 1031 r#" 1032 INSERT INTO job_results (job_id, user_did, slice_uri, status, success, message, created_at, completed_at) 1033 VALUES ($1, $2, $3, 'cancelled', false, 'Job cancelled by user', $4, $4) 1034 ON CONFLICT (job_id) 1035 DO UPDATE SET 1036 status = 'cancelled', 1037 message = 'Job cancelled by user', 1038 completed_at = $4 1039 WHERE job_results.status NOT IN ('completed', 'failed', 'cancelled') 1040 "#, 1041 job_id, 1042 user_did, 1043 slice_uri, 1044 now 1045 ) 1046 .execute(pool) 1047 .await?; 1048 } else { 1049 // Job is running - we don't have user_did/slice_uri, but mark it as cancelled anyway 1050 // The UPDATE will only work if a record exists (which it should for a running job) 1051 let result = sqlx::query!( 1052 r#" 1053 UPDATE job_results 1054 SET status = 'cancelled', 1055 message = 'Job cancelled by user', 1056 completed_at = $2 1057 WHERE job_id = $1 1058 AND status NOT IN ('completed', 'failed', 'cancelled') 1059 RETURNING job_id 1060 "#, 1061 job_id, 1062 now 1063 ) 1064 .fetch_optional(pool) 1065 .await?; 1066 1067 if result.is_none() { 1068 tracing::warn!("Could not mark job {} as cancelled - may already be completed", job_id); 1069 return Ok(false); 1070 } 1071 } 1072 1073 // Publish job status update for subscribers 1074 let job_status = JobStatus { 1075 job_id, 1076 slice_uri, 1077 status: "cancelled".to_string(), 1078 created_at: now, 1079 started_at: None, 1080 completed_at: Some(now), 1081 result: None, 1082 error: Some("Job cancelled by user".to_string()), 1083 retry_count: 0, 1084 }; 1085 1086 crate::graphql::schema_ext::publish_sync_job_update(job_status).await; 1087 1088 tracing::info!("Cancelled job: {}", job_id); 1089 Ok(true) 1090} 1091 1092/// Deletes a sync job from the database. 1093/// 1094/// This removes the job from the results table. Jobs in the queue should be cancelled first. 1095/// 1096/// # Arguments 1097/// * `pool` - PostgreSQL connection pool 1098/// * `job_id` - UUID of the job to delete 1099/// 1100/// # Returns 1101/// * `Ok(true)` - Job was successfully deleted 1102/// * `Ok(false)` - Job was not found 1103/// * `Err(sqlx::Error)` - Database query error 1104pub async fn delete_job(pool: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> { 1105 // Delete from job_results table 1106 let result_deleted = sqlx::query!( 1107 r#" 1108 DELETE FROM job_results 1109 WHERE job_id = $1 1110 RETURNING job_id 1111 "#, 1112 job_id 1113 ) 1114 .fetch_optional(pool) 1115 .await?; 1116 1117 if result_deleted.is_some() { 1118 tracing::info!("Deleted job: {}", job_id); 1119 Ok(true) 1120 } else { 1121 tracing::warn!("Job not found for deletion: {}", job_id); 1122 Ok(false) 1123 } 1124} 1125 1126/// Checks if a job has been cancelled. 1127/// 1128/// Used by the sync worker to periodically check if a running job 1129/// has been cancelled by the user. 1130/// 1131/// # Arguments 1132/// * `pool` - PostgreSQL connection pool 1133/// * `job_id` - UUID of the job to check 1134/// 1135/// # Returns 1136/// * `Ok(true)` - Job has been cancelled 1137/// * `Ok(false)` - Job is not cancelled or doesn't exist 1138/// * `Err(sqlx::Error)` - Database query error 1139pub async fn is_job_cancelled(pool: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> { 1140 let result = sqlx::query!( 1141 r#" 1142 SELECT status 1143 FROM job_results 1144 WHERE job_id = $1 1145 AND status = 'cancelled' 1146 "#, 1147 job_id 1148 ) 1149 .fetch_optional(pool) 1150 .await?; 1151 1152 Ok(result.is_some()) 1153}