A rust implementation of skywatch-phash
at main 160 lines 4.6 kB view raw
1use miette::{IntoDiagnostic, Result}; 2use redis::AsyncCommands; 3use tracing::{debug, info, warn}; 4 5use crate::config::Config; 6use crate::types::ImageJob; 7 8/// Redis queue names 9const PENDING_QUEUE: &str = "jobs:pending"; 10const PROCESSING_QUEUE: &str = "jobs:processing"; 11const DEAD_LETTER_QUEUE: &str = "jobs:dead"; 12 13/// Redis-based job queue for ImageJob processing 14#[derive(Clone)] 15pub struct JobQueue { 16 redis: redis::aio::MultiplexedConnection, 17 max_retries: u32, 18} 19 20impl JobQueue { 21 /// Create a new job queue 22 pub async fn new(config: &Config) -> Result<Self> { 23 info!("Connecting to Redis for job queue: {}", config.redis.url); 24 25 let client = redis::Client::open(config.redis.url.as_str()).into_diagnostic()?; 26 let redis = client 27 .get_multiplexed_async_connection() 28 .await 29 .into_diagnostic()?; 30 31 info!("Job queue connected to Redis"); 32 33 Ok(Self { 34 redis, 35 max_retries: config.processing.retry_attempts, 36 }) 37 } 38 39 /// Push a job to the pending queue 40 pub async fn push(&mut self, job: &ImageJob) -> Result<()> { 41 let job_json = serde_json::to_string(job).into_diagnostic()?; 42 43 let _: () = self 44 .redis 45 .rpush(PENDING_QUEUE, &job_json) 46 .await 47 .into_diagnostic()?; 48 49 debug!("Pushed job to queue: {}", job.post_uri); 50 51 Ok(()) 52 } 53 54 /// Pop a job from the pending queue (blocking with timeout) 55 pub async fn pop(&mut self, timeout_secs: usize) -> Result<Option<ImageJob>> { 56 let result: Option<Vec<String>> = self 57 .redis 58 .blpop(PENDING_QUEUE, timeout_secs as f64) 59 .await 60 .into_diagnostic()?; 61 62 match result { 63 Some(items) => { 64 // blpop returns [key, value] 65 if items.len() >= 2 { 66 let job_json = &items[1]; 67 let job: ImageJob = serde_json::from_str(job_json).into_diagnostic()?; 68 debug!("Popped job from queue: {}", job.post_uri); 69 Ok(Some(job)) 70 } else { 71 Ok(None) 72 } 73 } 74 None => Ok(None), 75 } 76 } 77 78 /// Retry a failed job (increment attempts and re-queue) 79 pub async fn retry(&mut self, mut job: ImageJob) -> Result<()> { 80 job.attempts += 1; 81 82 if job.attempts >= self.max_retries { 83 warn!( 84 "Job exceeded max retries ({}), moving to dead letter queue: {}", 85 self.max_retries, job.post_uri 86 ); 87 self.move_to_dead_letter(&job).await?; 88 } else { 89 info!( 90 "Retrying job (attempt {}/{}): {}", 91 job.attempts, self.max_retries, job.post_uri 92 ); 93 self.push(&job).await?; 94 } 95 96 Ok(()) 97 } 98 99 /// Move a job to the dead letter queue 100 async fn move_to_dead_letter(&mut self, job: &ImageJob) -> Result<()> { 101 let job_json = serde_json::to_string(job).into_diagnostic()?; 102 103 let _: () = self 104 .redis 105 .rpush(DEAD_LETTER_QUEUE, &job_json) 106 .await 107 .into_diagnostic()?; 108 109 warn!("Moved job to dead letter queue: {}", job.post_uri); 110 111 Ok(()) 112 } 113 114 /// Get queue statistics 115 pub async fn stats(&mut self) -> Result<QueueStats> { 116 let pending: usize = self.redis.llen(PENDING_QUEUE).await.into_diagnostic()?; 117 let processing: usize = self 118 .redis 119 .llen(PROCESSING_QUEUE) 120 .await 121 .into_diagnostic()?; 122 let dead: usize = self.redis.llen(DEAD_LETTER_QUEUE).await.into_diagnostic()?; 123 124 Ok(QueueStats { 125 pending, 126 processing, 127 dead, 128 }) 129 } 130 131 /// Clear all queues (for testing/maintenance) 132 pub async fn clear_all(&mut self) -> Result<()> { 133 let _: () = self.redis.del(PENDING_QUEUE).await.into_diagnostic()?; 134 let _: () = self.redis.del(PROCESSING_QUEUE).await.into_diagnostic()?; 135 let _: () = self 136 .redis 137 .del(DEAD_LETTER_QUEUE) 138 .await 139 .into_diagnostic()?; 140 141 info!("Cleared all job queues"); 142 143 Ok(()) 144 } 145} 146 147#[derive(Debug, Clone)] 148pub struct QueueStats { 149 pub pending: usize, 150 pub processing: usize, 151 pub dead: usize, 152} 153 154#[cfg(test)] 155mod tests { 156 157 158 // Note: These are integration tests that require a running Redis instance 159 // Run with: cargo test --test queue -- --ignored 160}