Highly ambitious ATProtocol AppView service and sdks
1//! Background job system for asynchronous collection synchronization.
2//!
3//! This module uses sqlxmq (a PostgreSQL-backed message queue) to handle
4//! background sync jobs. Jobs are:
5//! - Enqueued with deduplication checks (one active job per user+slice)
6//! - Executed asynchronously in background workers
7//! - Retried up to 5 times on failure
8//! - Tracked with detailed logging and result persistence
9//!
10//! The sync process fetches records from AT Protocol relays and validates them
11//! against Lexicon schemas before persisting to the database.
12
13use crate::cache;
14use crate::errors::SyncError;
15use crate::graphql::schema_ext::publish_sync_job_update;
16use crate::logging::LogLevel;
17use crate::models::BulkSyncParams;
18use crate::sync::SyncService;
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use sqlx::PgPool;
22use sqlxmq::{CurrentJob, JobRegistry, job};
23use std::sync::Arc;
24use tokio::sync::Mutex;
25use tracing::{error, info};
26use uuid::Uuid;
27
28/// Job payload containing all parameters needed to execute a sync job.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct SyncJobPayload {
31 /// Unique identifier for tracking this specific job execution
32 pub job_id: Uuid,
33 /// Decentralized identifier of the user requesting the sync
34 pub user_did: String,
35 /// AT-URI of the slice being synchronized
36 pub slice_uri: String,
37 /// Synchronization parameters (collections, repos, validation settings)
38 pub params: BulkSyncParams,
39}
40
41/// Result data persisted after job completion or failure.
42///
43/// This is stored in the `job_results` table for historical tracking and
44/// status queries. Field names are camelCased for JSON API responses.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct SyncJobResult {
48 pub success: bool,
49 pub total_records: i64,
50 pub collections_synced: Vec<String>,
51 pub repos_processed: i64,
52 pub message: String,
53}
54
55/// Initializes the sqlxmq job registry with all job handlers.
56///
57/// This must be called once at application startup to register job handlers
58/// before workers can process jobs from the queue.
59///
60/// # Returns
61/// A configured JobRegistry containing all registered job handlers
62pub fn registry() -> JobRegistry {
63 JobRegistry::new(&[sync_job])
64}
65
66/// Background job handler for collection synchronization.
67///
68/// This is the main worker function that executes sync jobs from the queue.
69/// It performs the following steps:
70/// 1. Extracts job payload and validates parameters
71/// 2. Initializes sync service with logging and caching
72/// 3. Fetches and validates records from AT Protocol relays
73/// 4. Persists results to the database
74/// 5. Logs detailed progress and completion status
75///
76/// # Job Behavior
77/// - Channel: `sync_queue`
78/// - Retries: Up to 5 attempts on failure
79/// - Concurrency: Multiple jobs can run in parallel
80/// - Deduplication: Enforced at enqueue time (one active job per user+slice)
81///
82/// # Arguments
83/// * `current_job` - The sqlxmq job context containing payload and database access
84///
85/// # Returns
86/// * `Ok(())` - Job completed successfully and marked complete
87/// * `Err(...)` - Job failed and will be retried (up to max retry limit)
88#[job(channel_name = "sync_queue")]
89async fn sync_job(
90 mut current_job: CurrentJob,
91) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
92 let payload: SyncJobPayload = current_job.json()?.expect("Invalid job payload");
93
94 info!(
95 "Starting sync job {} for user {} on slice {}",
96 payload.job_id, payload.user_did, payload.slice_uri
97 );
98
99 // Access database pool and global logger for this job execution
100 let pool = current_job.pool();
101 let logger = crate::logging::Logger::global();
102
103 // Log job start
104 logger.log_sync_job(
105 payload.job_id,
106 &payload.user_did,
107 &payload.slice_uri,
108 LogLevel::Info,
109 &format!(
110 "Starting sync job for {} collections",
111 payload
112 .params
113 .collections
114 .as_ref()
115 .map(|c| c.len())
116 .unwrap_or(0)
117 + payload
118 .params
119 .external_collections
120 .as_ref()
121 .map(|c| c.len())
122 .unwrap_or(0)
123 ),
124 Some(json!({
125 "collections": payload.params.collections,
126 "external_collections": payload.params.external_collections,
127 "repos": payload.params.repos,
128 "skip_validation": payload.params.skip_validation
129 })),
130 );
131
132 // Insert "running" status into job_results immediately (enables cancellation)
133 let now = chrono::Utc::now();
134 sqlx::query!(
135 r#"
136 INSERT INTO job_results (job_id, user_did, slice_uri, status, success, message, created_at)
137 VALUES ($1, $2, $3, 'running', false, 'Job is running...', $4)
138 ON CONFLICT (job_id) DO NOTHING
139 "#,
140 payload.job_id,
141 payload.user_did,
142 payload.slice_uri,
143 now
144 )
145 .execute(pool)
146 .await?;
147
148 // Publish job running status to subscribers
149 let running_status = JobStatus {
150 job_id: payload.job_id,
151 slice_uri: payload.slice_uri.clone(),
152 status: "running".to_string(),
153 created_at: now,
154 started_at: Some(now),
155 completed_at: None,
156 result: None,
157 error: None,
158 retry_count: 0,
159 };
160 crate::graphql::schema_ext::publish_sync_job_update(running_status).await;
161
162 // Check if job was cancelled before we even started
163 let cancelled_check = sqlx::query!(
164 r#"
165 SELECT status
166 FROM job_results
167 WHERE job_id = $1
168 AND status = 'cancelled'
169 "#,
170 payload.job_id
171 )
172 .fetch_optional(pool)
173 .await?;
174
175 if cancelled_check.is_some() {
176 info!("Job {} was cancelled before execution started", payload.job_id);
177 current_job.complete().await?;
178 return Ok(());
179 }
180
181 // Initialize sync service with database, relay endpoint, and caching
182 let database = crate::database::Database::from_pool(pool.clone());
183 let relay_endpoint = std::env::var("RELAY_ENDPOINT")
184 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string());
185
186 // Create in-memory cache for DID resolution with 24-hour TTL to reduce identity lookups
187 let cache = Arc::new(Mutex::new(
188 cache::CacheFactory::create_slice_cache(cache::CacheBackend::InMemory {
189 ttl_seconds: Some(24 * 60 * 60),
190 })
191 .await?,
192 ));
193
194 let sync_service = SyncService::with_logging_and_cache(
195 database.clone(),
196 relay_endpoint,
197 logger.clone(),
198 payload.job_id,
199 payload.user_did.clone(),
200 cache,
201 );
202
203 // Track execution time for performance monitoring
204 let start_time = std::time::Instant::now();
205
206 // Execute the synchronization process
207 match sync_service
208 .backfill_collections(
209 &payload.slice_uri,
210 payload.params.collections.as_deref(),
211 payload.params.external_collections.as_deref(),
212 payload.params.repos.as_deref(),
213 payload.params.skip_validation.unwrap_or(false),
214 payload.params.max_repos,
215 )
216 .await
217 {
218 Ok((repos_processed, records_synced)) => {
219 let elapsed = start_time.elapsed();
220 let result = SyncJobResult {
221 success: true,
222 total_records: records_synced,
223 collections_synced: [
224 payload.params.collections.unwrap_or_default(),
225 payload.params.external_collections.unwrap_or_default(),
226 ]
227 .concat(),
228 repos_processed,
229 message: "Sync completed successfully".to_string(),
230 };
231
232 // Log completion with detailed metrics for monitoring
233 logger.log_sync_job(
234 payload.job_id,
235 &payload.user_did,
236 &payload.slice_uri,
237 LogLevel::Info,
238 &format!(
239 "Sync completed successfully: {} repos, {} records in {:?}",
240 repos_processed, records_synced, elapsed
241 ),
242 Some(json!({
243 "repos_processed": repos_processed,
244 "records_synced": records_synced,
245 "duration_secs": elapsed.as_secs_f64(),
246 "collections_synced": result.collections_synced
247 })),
248 );
249
250 // Persist job result before marking complete (ensures result is queryable)
251 store_job_result(
252 pool,
253 payload.job_id,
254 &payload.user_did,
255 &payload.slice_uri,
256 &result,
257 None,
258 )
259 .await?;
260
261 // Publish job status update to GraphQL subscribers
262 let job_status = JobStatus {
263 job_id: payload.job_id,
264 slice_uri: payload.slice_uri.clone(),
265 status: "completed".to_string(),
266 created_at: chrono::Utc::now(),
267 started_at: Some(chrono::Utc::now()),
268 completed_at: Some(chrono::Utc::now()),
269 result: Some(result.clone()),
270 error: None,
271 retry_count: 0,
272 };
273 publish_sync_job_update(job_status).await;
274
275 info!(
276 "Sync job {} completed successfully: {} repos, {} records",
277 payload.job_id, repos_processed, records_synced
278 );
279
280 // CRITICAL: Explicitly mark job as complete to prevent automatic retry
281 // Without this, sqlxmq will treat the job as failed and retry it
282 current_job.complete().await?;
283
284 info!(
285 "Sync job {} marked as complete and will be cleaned up",
286 payload.job_id
287 );
288
289 Ok(())
290 }
291 Err(e) => {
292 // Special handling for cancelled jobs - don't overwrite the cancelled status
293 if matches!(e, SyncError::Cancelled) {
294 info!("Sync job {} was cancelled", payload.job_id);
295
296 // Mark job as complete (no retry needed for cancelled jobs)
297 current_job.complete().await?;
298
299 return Ok(());
300 }
301
302 error!("Sync job {} failed: {}", payload.job_id, e);
303
304 // Log error details for debugging and user visibility
305 logger.log_sync_job(
306 payload.job_id,
307 &payload.user_did,
308 &payload.slice_uri,
309 LogLevel::Error,
310 &format!("Sync job failed: {}", e),
311 Some(json!({
312 "error": e.to_string(),
313 "duration_secs": start_time.elapsed().as_secs_f64()
314 })),
315 );
316
317 let result = SyncJobResult {
318 success: false,
319 total_records: 0,
320 collections_synced: vec![],
321 repos_processed: 0,
322 message: format!("Sync failed: {}", e),
323 };
324
325 // Persist failure result even if job will retry (for status tracking)
326 if let Err(db_err) = store_job_result(
327 pool,
328 payload.job_id,
329 &payload.user_did,
330 &payload.slice_uri,
331 &result,
332 Some(&format!("{}", e)),
333 )
334 .await
335 {
336 error!("Failed to store job result: {}", db_err);
337 }
338
339 // Publish job status update to GraphQL subscribers
340 let job_status = JobStatus {
341 job_id: payload.job_id,
342 slice_uri: payload.slice_uri.clone(),
343 status: "failed".to_string(),
344 created_at: chrono::Utc::now(),
345 started_at: Some(chrono::Utc::now()),
346 completed_at: Some(chrono::Utc::now()),
347 result: Some(result.clone()),
348 error: Some(format!("{}", e)),
349 retry_count: 0,
350 };
351 publish_sync_job_update(job_status).await;
352
353 // Return error to trigger sqlxmq's automatic retry mechanism (up to 5 attempts)
354 Err(Box::new(e))
355 }
356 }
357}
358
359/// Persists job result to the database for status queries and historical tracking.
360///
361/// This is called both on success and failure to ensure result data is available
362/// via the job status API. Uses UPSERT to handle retries (updates existing result).
363///
364/// # Arguments
365/// * `pool` - PostgreSQL connection pool
366/// * `job_id` - Unique identifier for the job
367/// * `user_did` - User who initiated the job
368/// * `slice_uri` - Slice being synchronized
369/// * `result` - Job result data (success/failure, metrics)
370/// * `error_message` - Optional error details for failed jobs
371///
372/// # Returns
373/// * `Ok(())` - Result stored successfully
374/// * `Err(sqlx::Error)` - Database error during insert/update
375async fn store_job_result(
376 pool: &PgPool,
377 job_id: Uuid,
378 user_did: &str,
379 slice_uri: &str,
380 result: &SyncJobResult,
381 error_message: Option<&str>,
382) -> Result<(), sqlx::Error> {
383 info!(
384 "Storing job result: job_id={}, user_did={}, slice_uri={}, success={}",
385 job_id, user_did, slice_uri, result.success
386 );
387
388 // Convert collections list to JSONB for storage
389 let collections_json = serde_json::to_value(&result.collections_synced)
390 .map_err(|e| sqlx::Error::Protocol(format!("Failed to serialize collections: {}", e)))?;
391
392 // UPSERT: insert new result or update existing on retry
393 sqlx::query!(
394 r#"
395 INSERT INTO job_results (
396 job_id, user_did, slice_uri, status, success, total_records,
397 collections_synced, repos_processed, message, error_message
398 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
399 ON CONFLICT (job_id)
400 DO UPDATE SET
401 status = EXCLUDED.status,
402 success = EXCLUDED.success,
403 total_records = EXCLUDED.total_records,
404 collections_synced = EXCLUDED.collections_synced,
405 repos_processed = EXCLUDED.repos_processed,
406 message = EXCLUDED.message,
407 error_message = EXCLUDED.error_message,
408 completed_at = NOW()
409 WHERE job_results.status != 'cancelled'
410 "#,
411 job_id,
412 user_did,
413 slice_uri,
414 if result.success {
415 "completed"
416 } else {
417 "failed"
418 },
419 result.success,
420 result.total_records,
421 collections_json,
422 result.repos_processed,
423 result.message,
424 error_message,
425 )
426 .execute(pool)
427 .await?;
428
429 Ok(())
430}
431
432/// Enqueues a new sync job with deduplication checks.
433///
434/// This function ensures only one active sync job exists per user+slice combination
435/// by checking both the message queue and recent job results. This prevents:
436/// - Duplicate jobs competing for the same data
437/// - Wasted resources on redundant syncs
438/// - Race conditions in record persistence
439///
440/// # Arguments
441/// * `pool` - PostgreSQL connection pool
442/// * `user_did` - Decentralized identifier of the user
443/// * `slice_uri` - AT-URI of the slice to synchronize
444/// * `params` - Sync parameters (collections, repos, validation settings)
445///
446/// # Returns
447/// * `Ok(Uuid)` - Job ID of the newly enqueued job
448/// * `Err(...)` - Error if job already running or enqueue fails
449///
450/// # Deduplication Strategy
451/// 1. Check for pending jobs in `mq_msgs` queue
452/// 2. Check for recent jobs (< 10 min) without results
453/// 3. Reject if either check finds an active job
454pub async fn enqueue_sync_job(
455 pool: &PgPool,
456 user_did: String,
457 slice_uri: String,
458 params: BulkSyncParams,
459) -> Result<Uuid, Box<dyn std::error::Error + Send + Sync>> {
460 // Deduplication check 1: Look for pending jobs in the message queue
461 // This catches jobs that haven't started executing yet
462 let existing_running_msg = sqlx::query!(
463 r#"
464 SELECT m.id
465 FROM mq_msgs m
466 JOIN mq_payloads p ON m.id = p.id
467 WHERE m.channel_name = 'sync_queue'
468 AND m.id != '00000000-0000-0000-0000-000000000000'
469 AND p.payload_json->>'user_did' = $1
470 AND p.payload_json->>'slice_uri' = $2
471 AND m.attempt_at <= NOW()
472 "#,
473 user_did,
474 slice_uri
475 )
476 .fetch_optional(pool)
477 .await?;
478
479 // Deduplication check 2: Look for recently started jobs without results
480 // This catches jobs that started but haven't written results yet (< 10 min)
481 let recent_start = sqlx::query!(
482 r#"
483 SELECT m.id
484 FROM mq_msgs m
485 JOIN mq_payloads p ON m.id = p.id
486 LEFT JOIN job_results jr ON (p.payload_json->>'job_id')::uuid = jr.job_id
487 WHERE m.channel_name = 'sync_queue'
488 AND m.id != '00000000-0000-0000-0000-000000000000'
489 AND p.payload_json->>'user_did' = $1
490 AND p.payload_json->>'slice_uri' = $2
491 AND m.created_at > NOW() - INTERVAL '10 minutes'
492 AND jr.job_id IS NULL
493 "#,
494 user_did,
495 slice_uri
496 )
497 .fetch_optional(pool)
498 .await?;
499
500 // Deduplication check 3: Look for running jobs in job_results
501 // This catches jobs that have started and written a "running" status
502 let running_job = sqlx::query!(
503 r#"
504 SELECT job_id
505 FROM job_results
506 WHERE user_did = $1
507 AND slice_uri = $2
508 AND status = 'running'
509 AND created_at > NOW() - INTERVAL '10 minutes'
510 "#,
511 user_did,
512 slice_uri
513 )
514 .fetch_optional(pool)
515 .await?;
516
517 if existing_running_msg.is_some() || recent_start.is_some() || running_job.is_some() {
518 return Err("A sync job is already running for this slice. Please wait for it to complete before starting another.".into());
519 }
520
521 // Generate unique job ID for tracking and result storage
522 let job_id = Uuid::new_v4();
523
524 let payload = SyncJobPayload {
525 job_id,
526 user_did: user_did.clone(),
527 slice_uri: slice_uri.clone(),
528 params,
529 };
530
531 // Enqueue the job using sqlxmq's builder pattern
532 let job_uuid = sync_job.builder().set_json(&payload)?.spawn(pool).await?;
533
534 info!(
535 "Enqueued sync job {} (queue id: {}) for user {}",
536 job_id, job_uuid, user_did
537 );
538
539 // Publish job creation event to subscribers
540 let job_status = JobStatus {
541 job_id,
542 slice_uri: slice_uri.clone(),
543 status: "pending".to_string(),
544 created_at: chrono::Utc::now(),
545 started_at: None,
546 completed_at: None,
547 result: None,
548 error: None,
549 retry_count: 0,
550 };
551 crate::graphql::schema_ext::publish_sync_job_update(job_status).await;
552
553 Ok(job_id)
554}
555
556/// Status information for a sync job, including progress and results.
557///
558/// This combines data from both the message queue (for pending jobs) and
559/// the job_results table (for completed jobs). Field names are camelCased
560/// for JSON API responses.
561#[derive(Debug, Clone, Serialize, Deserialize)]
562#[serde(rename_all = "camelCase")]
563pub struct JobStatus {
564 /// Unique identifier for the job
565 pub job_id: Uuid,
566 /// Slice URI this job belongs to
567 pub slice_uri: String,
568 /// Current status: "pending", "running", "completed", or "failed"
569 pub status: String,
570 /// Timestamp when job was enqueued
571 pub created_at: chrono::DateTime<chrono::Utc>,
572 /// Timestamp when job execution started (None if still pending)
573 pub started_at: Option<chrono::DateTime<chrono::Utc>>,
574 /// Timestamp when job finished (None if still running)
575 pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
576 /// Detailed result data (None if still running)
577 pub result: Option<SyncJobResult>,
578 /// Error message if job failed
579 pub error: Option<String>,
580 /// Number of retry attempts remaining (5 max)
581 pub retry_count: i32,
582}
583
584/// Retrieves the current status of a sync job.
585///
586/// This function checks both the job_results table (for completed jobs) and
587/// the message queue (for pending/running jobs) to provide comprehensive status.
588///
589/// # Arguments
590/// * `pool` - PostgreSQL connection pool
591/// * `job_id` - Unique identifier of the job to query
592///
593/// # Returns
594/// * `Ok(Some(JobStatus))` - Job found with current status
595/// * `Ok(None)` - Job not found (may have been cleaned up)
596/// * `Err(sqlx::Error)` - Database query error
597pub async fn get_job_status(pool: &PgPool, job_id: Uuid) -> Result<Option<JobStatus>, sqlx::Error> {
598 // Priority 1: Check for completed job result (most common case)
599 let result_row = sqlx::query!(
600 r#"
601 SELECT
602 job_id, user_did, slice_uri, status, success, total_records,
603 collections_synced, repos_processed, message, error_message,
604 created_at, completed_at
605 FROM job_results
606 WHERE job_id = $1
607 "#,
608 job_id
609 )
610 .fetch_optional(pool)
611 .await?;
612
613 if let Some(result) = result_row {
614 // Found completed job, construct status from result data
615 let collections_synced: Vec<String> =
616 serde_json::from_value(result.collections_synced).unwrap_or_default();
617
618 return Ok(Some(JobStatus {
619 job_id,
620 slice_uri: result.slice_uri,
621 status: result.status,
622 created_at: result.created_at,
623 started_at: Some(result.created_at),
624 completed_at: Some(result.completed_at),
625 result: Some(SyncJobResult {
626 success: result.success,
627 total_records: result.total_records,
628 collections_synced,
629 repos_processed: result.repos_processed,
630 message: result.message,
631 }),
632 error: result.error_message,
633 retry_count: 0,
634 }));
635 }
636
637 // Priority 2: Check message queue for pending/running jobs
638 let queue_row = sqlx::query!(
639 r#"
640 SELECT
641 m.id,
642 m.created_at,
643 m.attempt_at,
644 m.attempts,
645 p.payload_json
646 FROM mq_msgs m
647 LEFT JOIN mq_payloads p ON m.id = p.id
648 WHERE p.payload_json::jsonb ->> 'job_id' = $1
649 "#,
650 job_id.to_string()
651 )
652 .fetch_optional(pool)
653 .await?;
654
655 match queue_row {
656 Some(row) => {
657 // Extract slice_uri from payload JSON
658 let slice_uri = row.payload_json
659 .as_ref()
660 .and_then(|json| json.get("slice_uri"))
661 .and_then(|v| v.as_str())
662 .unwrap_or_default()
663 .to_string();
664
665 // Determine status based on attempt_at timestamp
666 let status = if row.attempt_at.is_none() {
667 "completed".to_string()
668 } else if let Some(attempt_at) = row.attempt_at {
669 if attempt_at <= chrono::Utc::now() {
670 "running".to_string()
671 } else {
672 "pending".to_string()
673 }
674 } else {
675 "pending".to_string()
676 };
677
678 Ok(Some(JobStatus {
679 job_id,
680 slice_uri,
681 status: status.clone(),
682 created_at: row.created_at.unwrap_or_else(chrono::Utc::now),
683 started_at: if status == "running" || status == "completed" {
684 row.created_at
685 } else {
686 None
687 },
688 completed_at: if status == "completed" {
689 row.attempt_at
690 } else {
691 None
692 },
693 result: None,
694 error: None,
695 retry_count: 5 - row.attempts,
696 }))
697 }
698 None => {
699 // Job not found anywhere - either never existed or was cleaned up
700 Ok(None)
701 }
702 }
703}
704
705/// Retrieves job history for a specific user and slice combination.
706///
707/// This returns both completed jobs (from job_results) and pending/running jobs
708/// (from the message queue), ordered by creation time (most recent first).
709/// Useful for displaying sync history in the UI.
710///
711/// # Arguments
712/// * `pool` - PostgreSQL connection pool
713/// * `user_did` - User's decentralized identifier
714/// * `slice_uri` - AT-URI of the slice
715/// * `limit` - Optional maximum number of results (default: 10)
716///
717/// # Returns
718/// * `Ok(Vec<JobStatus>)` - List of job statuses ordered by recency
719/// * `Err(sqlx::Error)` - Database query error
720pub async fn get_slice_job_history(
721 pool: &PgPool,
722 user_did: &str,
723 slice_uri: &str,
724 limit: Option<i64>,
725) -> Result<Vec<JobStatus>, sqlx::Error> {
726 let limit = limit.unwrap_or(10);
727
728 info!(
729 "Querying job history: user_did={}, slice_uri={}, limit={}",
730 user_did, slice_uri, limit
731 );
732
733 // Query combines completed jobs (job_results) and pending jobs (mq_msgs) via UNION
734 let rows = sqlx::query!(
735 r#"
736 -- Completed jobs from job_results
737 SELECT
738 job_id, user_did, slice_uri, status, success, total_records,
739 collections_synced, repos_processed, message, error_message,
740 created_at, completed_at,
741 'completed' as job_type
742 FROM job_results
743 WHERE user_did = $1 AND slice_uri = $2
744
745 UNION ALL
746
747 -- Pending jobs from message queue
748 SELECT
749 (p.payload_json->>'job_id')::uuid as job_id,
750 p.payload_json->>'user_did' as user_did,
751 p.payload_json->>'slice_uri' as slice_uri,
752 'running' as status,
753 NULL::boolean as success,
754 NULL::bigint as total_records,
755 '[]'::jsonb as collections_synced,
756 NULL::bigint as repos_processed,
757 'Job in progress...' as message,
758 NULL::text as error_message,
759 m.created_at,
760 NULL::timestamptz as completed_at,
761 'pending' as job_type
762 FROM mq_msgs m
763 JOIN mq_payloads p ON m.id = p.id
764 WHERE m.channel_name = 'sync_queue'
765 AND m.id != '00000000-0000-0000-0000-000000000000'
766 AND p.payload_json->>'user_did' = $1
767 AND p.payload_json->>'slice_uri' = $2
768 AND NOT EXISTS (
769 SELECT 1 FROM job_results jr
770 WHERE jr.job_id = (p.payload_json->>'job_id')::uuid
771 )
772
773 ORDER BY created_at DESC
774 LIMIT $3
775 "#,
776 user_did,
777 slice_uri,
778 limit
779 )
780 .fetch_all(pool)
781 .await?;
782
783 // Transform database rows into JobStatus structs
784 let mut results = Vec::new();
785 for row in rows {
786 let collections_synced: Vec<String> = serde_json::from_value(
787 row.collections_synced
788 .unwrap_or_else(|| serde_json::json!([])),
789 )
790 .unwrap_or_default();
791
792 // Differentiate between pending jobs (no result data) and completed jobs
793 let result = if row.job_type.as_deref() == Some("pending") || row.success.is_none() {
794 // This is a pending job - no result data available
795 None
796 } else {
797 // This is a completed job - include result data
798 Some(SyncJobResult {
799 success: row.success.unwrap_or(false),
800 total_records: row.total_records.unwrap_or(0),
801 collections_synced,
802 repos_processed: row.repos_processed.unwrap_or(0),
803 message: row.message.clone().unwrap_or_default(),
804 })
805 };
806
807 results.push(JobStatus {
808 job_id: row.job_id.unwrap_or_else(Uuid::new_v4),
809 slice_uri: row.slice_uri.clone().unwrap_or_default(),
810 status: row.status.unwrap_or_default(),
811 created_at: row.created_at.unwrap_or_else(chrono::Utc::now),
812 started_at: row.created_at,
813 completed_at: row.completed_at,
814 result,
815 error: row.error_message,
816 retry_count: 0,
817 });
818 }
819
820 Ok(results)
821}
822
823/// Gets job history for a slice (without user filter).
824///
825/// This function is used by the GraphQL API to show all jobs for a slice,
826/// regardless of which user created them. It combines both completed jobs
827/// from job_results and pending jobs from the message queue.
828///
829/// # Arguments
830///
831/// * `pool` - Database connection pool
832/// * `slice_uri` - Optional slice URI to filter by (if None, returns all jobs)
833/// * `limit` - Optional maximum number of jobs to return (default: 20)
834///
835/// # Returns
836///
837/// * `Ok(Vec<JobStatus>)` - List of job statuses ordered by recency
838/// * `Err(sqlx::Error)` - Database query error
839pub async fn get_job_history_by_slice(
840 pool: &PgPool,
841 slice_uri: Option<&str>,
842 limit: Option<i64>,
843) -> Result<Vec<JobStatus>, sqlx::Error> {
844 let limit = limit.unwrap_or(20);
845
846 // Build WHERE clause conditionally
847 let slice_filter = slice_uri.unwrap_or("%"); // Match all if no filter
848
849 let rows = sqlx::query!(
850 r#"
851 -- Completed jobs from job_results
852 SELECT
853 job_id, user_did, slice_uri, status, success, total_records,
854 collections_synced, repos_processed, message, error_message,
855 created_at, completed_at,
856 'completed' as job_type
857 FROM job_results
858 WHERE slice_uri LIKE $1
859
860 UNION ALL
861
862 -- Pending jobs from message queue
863 SELECT
864 (p.payload_json->>'job_id')::uuid as job_id,
865 p.payload_json->>'user_did' as user_did,
866 p.payload_json->>'slice_uri' as slice_uri,
867 'running' as status,
868 NULL::boolean as success,
869 NULL::bigint as total_records,
870 '[]'::jsonb as collections_synced,
871 NULL::bigint as repos_processed,
872 'Job in progress...' as message,
873 NULL::text as error_message,
874 m.created_at,
875 NULL::timestamptz as completed_at,
876 'pending' as job_type
877 FROM mq_msgs m
878 JOIN mq_payloads p ON m.id = p.id
879 WHERE m.channel_name = 'sync_queue'
880 AND m.id != '00000000-0000-0000-0000-000000000000'
881 AND p.payload_json->>'slice_uri' LIKE $1
882 AND NOT EXISTS (
883 SELECT 1 FROM job_results jr
884 WHERE jr.job_id = (p.payload_json->>'job_id')::uuid
885 )
886
887 ORDER BY created_at DESC
888 LIMIT $2
889 "#,
890 slice_filter,
891 limit
892 )
893 .fetch_all(pool)
894 .await?;
895
896 // Transform database rows into JobStatus structs
897 let mut results = Vec::new();
898 for row in rows {
899 let collections_synced: Vec<String> = serde_json::from_value(
900 row.collections_synced
901 .unwrap_or_else(|| serde_json::json!([])),
902 )
903 .unwrap_or_default();
904
905 // Differentiate between pending jobs (no result data) and completed jobs
906 let result = if row.job_type.as_deref() == Some("pending") || row.success.is_none() {
907 // This is a pending job - no result data available
908 None
909 } else {
910 // This is a completed job - include result data
911 Some(SyncJobResult {
912 success: row.success.unwrap_or(false),
913 total_records: row.total_records.unwrap_or(0),
914 collections_synced,
915 repos_processed: row.repos_processed.unwrap_or(0),
916 message: row.message.clone().unwrap_or_default(),
917 })
918 };
919
920 results.push(JobStatus {
921 job_id: row.job_id.unwrap_or_else(Uuid::new_v4),
922 slice_uri: row.slice_uri.clone().unwrap_or_default(),
923 status: row.status.unwrap_or_default(),
924 created_at: row.created_at.unwrap_or_else(chrono::Utc::now),
925 started_at: row.created_at,
926 completed_at: row.completed_at,
927 result,
928 error: row.error_message,
929 retry_count: 0,
930 });
931 }
932
933 Ok(results)
934}
935
936/// Cancels a sync job by marking it as cancelled in the database.
937///
938/// This function handles both pending and running jobs:
939/// - Pending jobs: Removed from queue and marked as cancelled
940/// - Running jobs: Marked as cancelled (worker will check and exit)
941/// - Completed jobs: Cannot be cancelled
942///
943/// # Arguments
944/// * `pool` - PostgreSQL connection pool
945/// * `job_id` - UUID of the job to cancel
946///
947/// # Returns
948/// * `Ok(true)` - Job was successfully cancelled
949/// * `Ok(false)` - Job could not be cancelled (already completed)
950/// * `Err(sqlx::Error)` - Database query error
951pub async fn cancel_job(pool: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
952 // First, check if job is already completed (can't cancel)
953 let existing_result = sqlx::query!(
954 r#"
955 SELECT status
956 FROM job_results
957 WHERE job_id = $1
958 "#,
959 job_id
960 )
961 .fetch_optional(pool)
962 .await?;
963
964 if let Some(result) = existing_result {
965 // Job already has a result - can only cancel if it's not already completed/failed/cancelled
966 if result.status == "completed" || result.status == "failed" || result.status == "cancelled" {
967 tracing::warn!("Cannot cancel job {} - already in status: {}", job_id, result.status);
968 return Ok(false);
969 }
970 }
971
972 // Try to get job details from the queue
973 let job_info = sqlx::query!(
974 r#"
975 SELECT p.payload_json
976 FROM mq_msgs m
977 JOIN mq_payloads p ON m.id = p.id
978 WHERE m.channel_name = 'sync_queue'
979 AND p.payload_json->>'job_id' = $1
980 LIMIT 1
981 "#,
982 job_id.to_string()
983 )
984 .fetch_optional(pool)
985 .await?;
986
987 let (user_did, slice_uri) = if let Some(info) = job_info {
988 // Extract user_did and slice_uri from the payload
989 let payload = info.payload_json.ok_or_else(||
990 sqlx::Error::Protocol("Job payload_json is null".into())
991 )?;
992
993 let user_did = payload["user_did"].as_str().unwrap_or_default().to_string();
994 let slice_uri = payload["slice_uri"].as_str().unwrap_or_default().to_string();
995
996 // Delete from the message queue (for pending jobs)
997 sqlx::query!(
998 r#"
999 DELETE FROM mq_msgs
1000 WHERE id = (
1001 SELECT m.id
1002 FROM mq_msgs m
1003 JOIN mq_payloads p ON m.id = p.id
1004 WHERE m.channel_name = 'sync_queue'
1005 AND p.payload_json->>'job_id' = $1
1006 LIMIT 1
1007 )
1008 "#,
1009 job_id.to_string()
1010 )
1011 .execute(pool)
1012 .await?;
1013
1014 (user_did, slice_uri)
1015 } else {
1016 // Job not in queue - might be running, try to find it in job_results to get user/slice info
1017 tracing::warn!("Job {} not found in queue, checking if it's running", job_id);
1018
1019 // Since we don't have user_did/slice_uri and the job might be running, we need to mark it somehow
1020 // For running jobs, we'll insert a cancellation record without those fields
1021 // The worker will check for cancellation and update with proper details when it finds it
1022 ("".to_string(), "".to_string())
1023 };
1024
1025 // Insert/update cancelled status into job_results
1026 let now = chrono::Utc::now();
1027
1028 if !user_did.is_empty() && !slice_uri.is_empty() {
1029 // We have the full details - insert complete record
1030 sqlx::query!(
1031 r#"
1032 INSERT INTO job_results (job_id, user_did, slice_uri, status, success, message, created_at, completed_at)
1033 VALUES ($1, $2, $3, 'cancelled', false, 'Job cancelled by user', $4, $4)
1034 ON CONFLICT (job_id)
1035 DO UPDATE SET
1036 status = 'cancelled',
1037 message = 'Job cancelled by user',
1038 completed_at = $4
1039 WHERE job_results.status NOT IN ('completed', 'failed', 'cancelled')
1040 "#,
1041 job_id,
1042 user_did,
1043 slice_uri,
1044 now
1045 )
1046 .execute(pool)
1047 .await?;
1048 } else {
1049 // Job is running - we don't have user_did/slice_uri, but mark it as cancelled anyway
1050 // The UPDATE will only work if a record exists (which it should for a running job)
1051 let result = sqlx::query!(
1052 r#"
1053 UPDATE job_results
1054 SET status = 'cancelled',
1055 message = 'Job cancelled by user',
1056 completed_at = $2
1057 WHERE job_id = $1
1058 AND status NOT IN ('completed', 'failed', 'cancelled')
1059 RETURNING job_id
1060 "#,
1061 job_id,
1062 now
1063 )
1064 .fetch_optional(pool)
1065 .await?;
1066
1067 if result.is_none() {
1068 tracing::warn!("Could not mark job {} as cancelled - may already be completed", job_id);
1069 return Ok(false);
1070 }
1071 }
1072
1073 // Publish job status update for subscribers
1074 let job_status = JobStatus {
1075 job_id,
1076 slice_uri,
1077 status: "cancelled".to_string(),
1078 created_at: now,
1079 started_at: None,
1080 completed_at: Some(now),
1081 result: None,
1082 error: Some("Job cancelled by user".to_string()),
1083 retry_count: 0,
1084 };
1085
1086 crate::graphql::schema_ext::publish_sync_job_update(job_status).await;
1087
1088 tracing::info!("Cancelled job: {}", job_id);
1089 Ok(true)
1090}
1091
1092/// Deletes a sync job from the database.
1093///
1094/// This removes the job from the results table. Jobs in the queue should be cancelled first.
1095///
1096/// # Arguments
1097/// * `pool` - PostgreSQL connection pool
1098/// * `job_id` - UUID of the job to delete
1099///
1100/// # Returns
1101/// * `Ok(true)` - Job was successfully deleted
1102/// * `Ok(false)` - Job was not found
1103/// * `Err(sqlx::Error)` - Database query error
1104pub async fn delete_job(pool: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
1105 // Delete from job_results table
1106 let result_deleted = sqlx::query!(
1107 r#"
1108 DELETE FROM job_results
1109 WHERE job_id = $1
1110 RETURNING job_id
1111 "#,
1112 job_id
1113 )
1114 .fetch_optional(pool)
1115 .await?;
1116
1117 if result_deleted.is_some() {
1118 tracing::info!("Deleted job: {}", job_id);
1119 Ok(true)
1120 } else {
1121 tracing::warn!("Job not found for deletion: {}", job_id);
1122 Ok(false)
1123 }
1124}
1125
1126/// Checks if a job has been cancelled.
1127///
1128/// Used by the sync worker to periodically check if a running job
1129/// has been cancelled by the user.
1130///
1131/// # Arguments
1132/// * `pool` - PostgreSQL connection pool
1133/// * `job_id` - UUID of the job to check
1134///
1135/// # Returns
1136/// * `Ok(true)` - Job has been cancelled
1137/// * `Ok(false)` - Job is not cancelled or doesn't exist
1138/// * `Err(sqlx::Error)` - Database query error
1139pub async fn is_job_cancelled(pool: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
1140 let result = sqlx::query!(
1141 r#"
1142 SELECT status
1143 FROM job_results
1144 WHERE job_id = $1
1145 AND status = 'cancelled'
1146 "#,
1147 job_id
1148 )
1149 .fetch_optional(pool)
1150 .await?;
1151
1152 Ok(result.is_some())
1153}