use miette::{IntoDiagnostic, Result}; use redis::AsyncCommands; use tracing::{debug, info, warn}; use crate::config::Config; use crate::types::ImageJob; /// Redis queue names const PENDING_QUEUE: &str = "jobs:pending"; const PROCESSING_QUEUE: &str = "jobs:processing"; const DEAD_LETTER_QUEUE: &str = "jobs:dead"; /// Redis-based job queue for ImageJob processing #[derive(Clone)] pub struct JobQueue { redis: redis::aio::MultiplexedConnection, max_retries: u32, } impl JobQueue { /// Create a new job queue pub async fn new(config: &Config) -> Result { info!("Connecting to Redis for job queue: {}", config.redis.url); let client = redis::Client::open(config.redis.url.as_str()).into_diagnostic()?; let redis = client .get_multiplexed_async_connection() .await .into_diagnostic()?; info!("Job queue connected to Redis"); Ok(Self { redis, max_retries: config.processing.retry_attempts, }) } /// Push a job to the pending queue pub async fn push(&mut self, job: &ImageJob) -> Result<()> { let job_json = serde_json::to_string(job).into_diagnostic()?; let _: () = self .redis .rpush(PENDING_QUEUE, &job_json) .await .into_diagnostic()?; debug!("Pushed job to queue: {}", job.post_uri); Ok(()) } /// Pop a job from the pending queue (blocking with timeout) pub async fn pop(&mut self, timeout_secs: usize) -> Result> { let result: Option> = self .redis .blpop(PENDING_QUEUE, timeout_secs as f64) .await .into_diagnostic()?; match result { Some(items) => { // blpop returns [key, value] if items.len() >= 2 { let job_json = &items[1]; let job: ImageJob = serde_json::from_str(job_json).into_diagnostic()?; debug!("Popped job from queue: {}", job.post_uri); Ok(Some(job)) } else { Ok(None) } } None => Ok(None), } } /// Retry a failed job (increment attempts and re-queue) pub async fn retry(&mut self, mut job: ImageJob) -> Result<()> { job.attempts += 1; if job.attempts >= self.max_retries { warn!( "Job exceeded max retries ({}), moving to dead letter queue: {}", self.max_retries, job.post_uri ); self.move_to_dead_letter(&job).await?; } else { info!( "Retrying job (attempt {}/{}): {}", job.attempts, self.max_retries, job.post_uri ); self.push(&job).await?; } Ok(()) } /// Move a job to the dead letter queue async fn move_to_dead_letter(&mut self, job: &ImageJob) -> Result<()> { let job_json = serde_json::to_string(job).into_diagnostic()?; let _: () = self .redis .rpush(DEAD_LETTER_QUEUE, &job_json) .await .into_diagnostic()?; warn!("Moved job to dead letter queue: {}", job.post_uri); Ok(()) } /// Get queue statistics pub async fn stats(&mut self) -> Result { let pending: usize = self.redis.llen(PENDING_QUEUE).await.into_diagnostic()?; let processing: usize = self .redis .llen(PROCESSING_QUEUE) .await .into_diagnostic()?; let dead: usize = self.redis.llen(DEAD_LETTER_QUEUE).await.into_diagnostic()?; Ok(QueueStats { pending, processing, dead, }) } /// Clear all queues (for testing/maintenance) pub async fn clear_all(&mut self) -> Result<()> { let _: () = self.redis.del(PENDING_QUEUE).await.into_diagnostic()?; let _: () = self.redis.del(PROCESSING_QUEUE).await.into_diagnostic()?; let _: () = self .redis .del(DEAD_LETTER_QUEUE) .await .into_diagnostic()?; info!("Cleared all job queues"); Ok(()) } } #[derive(Debug, Clone)] pub struct QueueStats { pub pending: usize, pub processing: usize, pub dead: usize, } #[cfg(test)] mod tests { // Note: These are integration tests that require a running Redis instance // Run with: cargo test --test queue -- --ignored }