A rust implementation of skywatch-phash
1use miette::{IntoDiagnostic, Result};
2use redis::AsyncCommands;
3use tracing::{debug, info, warn};
4
5use crate::config::Config;
6use crate::types::ImageJob;
7
8/// Redis queue names
9const PENDING_QUEUE: &str = "jobs:pending";
10const PROCESSING_QUEUE: &str = "jobs:processing";
11const DEAD_LETTER_QUEUE: &str = "jobs:dead";
12
13/// Redis-based job queue for ImageJob processing
14#[derive(Clone)]
15pub struct JobQueue {
16 redis: redis::aio::MultiplexedConnection,
17 max_retries: u32,
18}
19
20impl JobQueue {
21 /// Create a new job queue
22 pub async fn new(config: &Config) -> Result<Self> {
23 info!("Connecting to Redis for job queue: {}", config.redis.url);
24
25 let client = redis::Client::open(config.redis.url.as_str()).into_diagnostic()?;
26 let redis = client
27 .get_multiplexed_async_connection()
28 .await
29 .into_diagnostic()?;
30
31 info!("Job queue connected to Redis");
32
33 Ok(Self {
34 redis,
35 max_retries: config.processing.retry_attempts,
36 })
37 }
38
39 /// Push a job to the pending queue
40 pub async fn push(&mut self, job: &ImageJob) -> Result<()> {
41 let job_json = serde_json::to_string(job).into_diagnostic()?;
42
43 let _: () = self
44 .redis
45 .rpush(PENDING_QUEUE, &job_json)
46 .await
47 .into_diagnostic()?;
48
49 debug!("Pushed job to queue: {}", job.post_uri);
50
51 Ok(())
52 }
53
54 /// Pop a job from the pending queue (blocking with timeout)
55 pub async fn pop(&mut self, timeout_secs: usize) -> Result<Option<ImageJob>> {
56 let result: Option<Vec<String>> = self
57 .redis
58 .blpop(PENDING_QUEUE, timeout_secs as f64)
59 .await
60 .into_diagnostic()?;
61
62 match result {
63 Some(items) => {
64 // blpop returns [key, value]
65 if items.len() >= 2 {
66 let job_json = &items[1];
67 let job: ImageJob = serde_json::from_str(job_json).into_diagnostic()?;
68 debug!("Popped job from queue: {}", job.post_uri);
69 Ok(Some(job))
70 } else {
71 Ok(None)
72 }
73 }
74 None => Ok(None),
75 }
76 }
77
78 /// Retry a failed job (increment attempts and re-queue)
79 pub async fn retry(&mut self, mut job: ImageJob) -> Result<()> {
80 job.attempts += 1;
81
82 if job.attempts >= self.max_retries {
83 warn!(
84 "Job exceeded max retries ({}), moving to dead letter queue: {}",
85 self.max_retries, job.post_uri
86 );
87 self.move_to_dead_letter(&job).await?;
88 } else {
89 info!(
90 "Retrying job (attempt {}/{}): {}",
91 job.attempts, self.max_retries, job.post_uri
92 );
93 self.push(&job).await?;
94 }
95
96 Ok(())
97 }
98
99 /// Move a job to the dead letter queue
100 async fn move_to_dead_letter(&mut self, job: &ImageJob) -> Result<()> {
101 let job_json = serde_json::to_string(job).into_diagnostic()?;
102
103 let _: () = self
104 .redis
105 .rpush(DEAD_LETTER_QUEUE, &job_json)
106 .await
107 .into_diagnostic()?;
108
109 warn!("Moved job to dead letter queue: {}", job.post_uri);
110
111 Ok(())
112 }
113
114 /// Get queue statistics
115 pub async fn stats(&mut self) -> Result<QueueStats> {
116 let pending: usize = self.redis.llen(PENDING_QUEUE).await.into_diagnostic()?;
117 let processing: usize = self
118 .redis
119 .llen(PROCESSING_QUEUE)
120 .await
121 .into_diagnostic()?;
122 let dead: usize = self.redis.llen(DEAD_LETTER_QUEUE).await.into_diagnostic()?;
123
124 Ok(QueueStats {
125 pending,
126 processing,
127 dead,
128 })
129 }
130
131 /// Clear all queues (for testing/maintenance)
132 pub async fn clear_all(&mut self) -> Result<()> {
133 let _: () = self.redis.del(PENDING_QUEUE).await.into_diagnostic()?;
134 let _: () = self.redis.del(PROCESSING_QUEUE).await.into_diagnostic()?;
135 let _: () = self
136 .redis
137 .del(DEAD_LETTER_QUEUE)
138 .await
139 .into_diagnostic()?;
140
141 info!("Cleared all job queues");
142
143 Ok(())
144 }
145}
146
147#[derive(Debug, Clone)]
148pub struct QueueStats {
149 pub pending: usize,
150 pub processing: usize,
151 pub dead: usize,
152}
153
154#[cfg(test)]
155mod tests {
156
157
158 // Note: These are integration tests that require a running Redis instance
159 // Run with: cargo test --test queue -- --ignored
160}