QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Generic queue adapter system for work queue abstraction.
2//!
3//! This module provides a generic trait and implementations for queue adapters
4//! that can be used with any work type for handle resolution and other tasks.
5
6use async_trait::async_trait;
7use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands};
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use thiserror::Error;
11use tokio::sync::{Mutex, mpsc};
12use tracing::{debug, error, warn};
13
14/// Queue operation errors
15#[derive(Error, Debug)]
16pub enum QueueError {
17 #[error("error-quickdid-queue-1 Failed to push to queue: {0}")]
18 PushFailed(String),
19
20 #[error("error-quickdid-queue-2 Queue is full")]
21 QueueFull,
22
23 #[error("error-quickdid-queue-3 Queue is closed")]
24 QueueClosed,
25
26 #[error("error-quickdid-queue-4 Redis connection failed: {0}")]
27 RedisConnectionFailed(String),
28
29 #[error("error-quickdid-queue-5 Redis operation failed: {operation}: {details}")]
30 RedisOperationFailed { operation: String, details: String },
31
32 #[error("error-quickdid-queue-6 Serialization failed: {0}")]
33 SerializationFailed(String),
34
35 #[error("error-quickdid-queue-7 Deserialization failed: {0}")]
36 DeserializationFailed(String),
37
38 #[error("error-quickdid-queue-8 Item not found in worker queue during acknowledgment")]
39 AckItemNotFound,
40}
41
42type Result<T> = std::result::Result<T, QueueError>;
43
44/// Generic trait for queue adapters that can work with any work type.
45///
46/// This trait provides a common interface for different queue implementations
47/// (MPSC, Redis, PostgreSQL, etc.) allowing them to be used interchangeably.
48#[async_trait]
49pub trait QueueAdapter<T>: Send + Sync
50where
51 T: Send + Sync + 'static,
52{
53 /// Pull the next work item from the queue.
54 ///
55 /// Returns None if the queue is closed or empty (depending on implementation).
56 async fn pull(&self) -> Option<T>;
57
58 /// Push a work item to the queue.
59 ///
60 /// Returns an error if the queue is full or closed.
61 async fn push(&self, work: T) -> Result<()>;
62
63 /// Acknowledge that a work item has been successfully processed.
64 ///
65 /// This is used by reliable queue implementations to remove the item
66 /// from a temporary processing queue. Implementations that don't require
67 /// acknowledgment (like MPSC) can use the default no-op implementation.
68 async fn ack(&self, _item: &T) -> Result<()> {
69 // Default no-op implementation for queues that don't need acknowledgment
70 Ok(())
71 }
72
73 /// Try to push a work item without blocking.
74 ///
75 /// Returns an error if the queue is full or closed.
76 async fn try_push(&self, work: T) -> Result<()> {
77 // Default implementation uses regular push
78 self.push(work).await
79 }
80
81 /// Get the current queue depth if available.
82 ///
83 /// Returns None if the implementation doesn't support queue depth.
84 async fn depth(&self) -> Option<usize> {
85 None
86 }
87
88 /// Check if the queue is healthy.
89 ///
90 /// Used for health checks and monitoring.
91 async fn is_healthy(&self) -> bool {
92 true
93 }
94}
95
96/// MPSC channel-based queue adapter implementation.
97///
98/// This adapter uses tokio's multi-producer, single-consumer channel
99/// for in-memory queuing of work items. It's suitable for single-instance
100/// deployments with moderate throughput requirements.
101pub(crate) struct MpscQueueAdapter<T>
102where
103 T: Send + Sync + 'static,
104{
105 receiver: Arc<Mutex<mpsc::Receiver<T>>>,
106 sender: mpsc::Sender<T>,
107}
108
109impl<T> MpscQueueAdapter<T>
110where
111 T: Send + Sync + 'static,
112{
113 /// Create a new MPSC queue adapter with the specified buffer size.
114 pub(crate) fn new(buffer: usize) -> Self {
115 let (sender, receiver) = mpsc::channel(buffer);
116 Self {
117 receiver: Arc::new(Mutex::new(receiver)),
118 sender,
119 }
120 }
121
122 /// Create an adapter from existing MPSC channels (for backward compatibility).
123 pub(crate) fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self {
124 Self {
125 receiver: Arc::new(Mutex::new(receiver)),
126 sender,
127 }
128 }
129}
130
131#[async_trait]
132impl<T> QueueAdapter<T> for MpscQueueAdapter<T>
133where
134 T: Send + Sync + 'static,
135{
136 async fn pull(&self) -> Option<T> {
137 let mut receiver = self.receiver.lock().await;
138 receiver.recv().await
139 }
140
141 async fn push(&self, work: T) -> Result<()> {
142 self.sender
143 .send(work)
144 .await
145 .map_err(|e| QueueError::PushFailed(e.to_string()))
146 }
147
148 async fn try_push(&self, work: T) -> Result<()> {
149 self.sender.try_send(work).map_err(|e| match e {
150 mpsc::error::TrySendError::Full(_) => QueueError::QueueFull,
151 mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed,
152 })
153 }
154
155 async fn depth(&self) -> Option<usize> {
156 // Note: This is an approximation as mpsc doesn't provide exact depth
157 Some(self.sender.max_capacity() - self.sender.capacity())
158 }
159
160 async fn is_healthy(&self) -> bool {
161 !self.sender.is_closed()
162 }
163}
164
165/// Work item for handle resolution tasks
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct HandleResolutionWork {
168 /// The handle to resolve
169 pub handle: String,
170}
171
172impl HandleResolutionWork {
173 /// Create a new handle resolution work item
174 pub fn new(handle: String) -> Self {
175 Self { handle }
176 }
177}
178
179/// Redis-backed queue adapter implementation.
180///
181/// This adapter uses Redis lists with a reliable queue pattern:
182/// - LPUSH to push items to the primary queue
183/// - RPOPLPUSH to atomically move items from primary to worker queue
184/// - LREM to acknowledge processed items from worker queue
185///
186/// This ensures at-least-once delivery semantics and allows for recovery
187/// of in-flight items if a worker crashes.
188pub(crate) struct RedisQueueAdapter<T>
189where
190 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
191{
192 /// Redis connection pool
193 pool: RedisPool,
194 /// Unique worker ID for this adapter instance
195 worker_id: String,
196 /// Key prefix for all queues (default: "queue:handleresolver:")
197 key_prefix: String,
198 /// Timeout for blocking RPOPLPUSH operations
199 timeout_seconds: u64,
200 /// Type marker for generic parameter
201 _phantom: std::marker::PhantomData<T>,
202}
203
204impl<T> RedisQueueAdapter<T>
205where
206 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
207{
208 /// Create a new Redis queue adapter with custom configuration
209 fn with_config(
210 pool: RedisPool,
211 worker_id: Option<String>,
212 key_prefix: String,
213 timeout_seconds: u64,
214 ) -> Self {
215 let worker_id = worker_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
216 Self {
217 pool,
218 worker_id,
219 key_prefix,
220 timeout_seconds,
221 _phantom: std::marker::PhantomData,
222 }
223 }
224
225 /// Get the primary queue key
226 fn primary_queue_key(&self) -> String {
227 format!("{}primary", self.key_prefix)
228 }
229
230 /// Get the worker-specific temporary queue key
231 fn worker_queue_key(&self) -> String {
232 format!("{}{}", self.key_prefix, self.worker_id)
233 }
234}
235
236#[async_trait]
237impl<T> QueueAdapter<T> for RedisQueueAdapter<T>
238where
239 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
240{
241 async fn pull(&self) -> Option<T> {
242 match self.pool.get().await {
243 Ok(mut conn) => {
244 let primary_key = self.primary_queue_key();
245 let worker_key = self.worker_queue_key();
246
247 // Use blocking RPOPLPUSH to atomically move item from primary to worker queue
248 let data: Option<Vec<u8>> = match conn
249 .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64)
250 .await
251 {
252 Ok(data) => data,
253 Err(e) => {
254 error!("Failed to pull from queue: {}", e);
255 return None;
256 }
257 };
258
259 if let Some(data) = data {
260 // Deserialize the item
261 match serde_json::from_slice(&data) {
262 Ok(item) => {
263 debug!(
264 worker_id = %self.worker_id,
265 "Pulled item from queue"
266 );
267 Some(item)
268 }
269 Err(e) => {
270 error!("Failed to deserialize item: {}", e);
271 // Remove the corrupted item from worker queue
272 let _: std::result::Result<(), _> =
273 conn.lrem(&worker_key, 1, &data).await;
274 None
275 }
276 }
277 } else {
278 None
279 }
280 }
281 Err(e) => {
282 error!("Failed to get Redis connection: {}", e);
283 None
284 }
285 }
286 }
287
288 async fn push(&self, work: T) -> Result<()> {
289 let mut conn = self
290 .pool
291 .get()
292 .await
293 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?;
294
295 let data = serde_json::to_vec(&work)
296 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
297
298 let primary_key = self.primary_queue_key();
299
300 conn.lpush::<_, _, ()>(&primary_key, data)
301 .await
302 .map_err(|e| QueueError::RedisOperationFailed {
303 operation: "LPUSH".to_string(),
304 details: e.to_string(),
305 })?;
306
307 debug!("Pushed item to queue");
308 Ok(())
309 }
310
311 async fn ack(&self, item: &T) -> Result<()> {
312 let mut conn = self
313 .pool
314 .get()
315 .await
316 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?;
317
318 let data =
319 serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
320
321 let worker_key = self.worker_queue_key();
322
323 // Remove exactly one occurrence of this item from the worker queue
324 let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| {
325 QueueError::RedisOperationFailed {
326 operation: "LREM".to_string(),
327 details: e.to_string(),
328 }
329 })?;
330
331 if removed == 0 {
332 warn!(
333 worker_id = %self.worker_id,
334 "Item not found in worker queue during acknowledgment"
335 );
336 } else {
337 debug!(
338 worker_id = %self.worker_id,
339 "Acknowledged item"
340 );
341 }
342
343 Ok(())
344 }
345
346 async fn depth(&self) -> Option<usize> {
347 match self.pool.get().await {
348 Ok(mut conn) => {
349 let primary_key = self.primary_queue_key();
350 match conn.llen::<_, usize>(&primary_key).await {
351 Ok(len) => Some(len),
352 Err(e) => {
353 error!("Failed to get queue depth: {}", e);
354 None
355 }
356 }
357 }
358 Err(e) => {
359 error!("Failed to get Redis connection: {}", e);
360 None
361 }
362 }
363 }
364
365 async fn is_healthy(&self) -> bool {
366 match self.pool.get().await {
367 Ok(mut conn) => {
368 // Ping Redis to check health
369 match deadpool_redis::redis::cmd("PING")
370 .query_async::<String>(&mut conn)
371 .await
372 {
373 Ok(response) => response == "PONG",
374 Err(_) => false,
375 }
376 }
377 Err(_) => false,
378 }
379 }
380}
381
382/// No-operation queue adapter that discards all work items.
383///
384/// This adapter is useful for configurations where queuing is disabled
385/// or as a fallback when other queue adapters fail to initialize.
386pub(crate) struct NoopQueueAdapter<T>
387where
388 T: Send + Sync + 'static,
389{
390 _phantom: std::marker::PhantomData<T>,
391}
392
393impl<T> NoopQueueAdapter<T>
394where
395 T: Send + Sync + 'static,
396{
397 /// Create a new no-op queue adapter
398 pub(crate) fn new() -> Self {
399 Self {
400 _phantom: std::marker::PhantomData,
401 }
402 }
403}
404
405impl<T> Default for NoopQueueAdapter<T>
406where
407 T: Send + Sync + 'static,
408{
409 fn default() -> Self {
410 Self::new()
411 }
412}
413
414#[async_trait]
415impl<T> QueueAdapter<T> for NoopQueueAdapter<T>
416where
417 T: Send + Sync + 'static,
418{
419 async fn pull(&self) -> Option<T> {
420 // Never returns any work
421 tokio::time::sleep(std::time::Duration::from_secs(60)).await;
422 None
423 }
424
425 async fn push(&self, _work: T) -> Result<()> {
426 // Silently discard the work
427 Ok(())
428 }
429
430 async fn ack(&self, _item: &T) -> Result<()> {
431 // No-op
432 Ok(())
433 }
434
435 async fn try_push(&self, _work: T) -> Result<()> {
436 // Silently discard the work
437 Ok(())
438 }
439
440 async fn depth(&self) -> Option<usize> {
441 // Always empty
442 Some(0)
443 }
444
445 async fn is_healthy(&self) -> bool {
446 // Always healthy
447 true
448 }
449}
450
451// ========= Factory Functions for Queue Adapters =========
452
453/// Create a new MPSC queue adapter with the specified buffer size.
454///
455/// This creates an in-memory queue suitable for single-instance deployments.
456///
457/// # Arguments
458///
459/// * `buffer` - The buffer size for the channel
460pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>>
461where
462 T: Send + Sync + 'static,
463{
464 Arc::new(MpscQueueAdapter::new(buffer))
465}
466
467/// Create an MPSC queue adapter from existing channels.
468///
469/// This allows integration with existing channel-based architectures.
470///
471/// # Arguments
472///
473/// * `sender` - The sender half of the channel
474/// * `receiver` - The receiver half of the channel
475pub fn create_mpsc_queue_from_channel<T>(
476 sender: mpsc::Sender<T>,
477 receiver: mpsc::Receiver<T>,
478) -> Arc<dyn QueueAdapter<T>>
479where
480 T: Send + Sync + 'static,
481{
482 Arc::new(MpscQueueAdapter::from_channel(sender, receiver))
483}
484
485/// Create a new Redis-backed queue adapter.
486///
487/// This creates a distributed queue suitable for multi-instance deployments.
488///
489/// # Arguments
490///
491/// * `pool` - Redis connection pool
492/// * `worker_id` - Optional worker identifier (auto-generated if None)
493/// * `key_prefix` - Redis key prefix for queue operations
494/// * `timeout_seconds` - Timeout for blocking operations
495pub fn create_redis_queue<T>(
496 pool: RedisPool,
497 worker_id: Option<String>,
498 key_prefix: String,
499 timeout_seconds: u64,
500) -> Arc<dyn QueueAdapter<T>>
501where
502 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
503{
504 Arc::new(RedisQueueAdapter::with_config(
505 pool,
506 worker_id,
507 key_prefix,
508 timeout_seconds,
509 ))
510}
511
512/// Create a no-operation queue adapter.
513///
514/// This creates a queue that discards all work items, useful for testing
515/// or when queue processing is disabled.
516pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>>
517where
518 T: Send + Sync + 'static,
519{
520 Arc::new(NoopQueueAdapter::new())
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526
527 #[tokio::test]
528 async fn test_mpsc_queue_adapter_push_pull() {
529 let adapter = Arc::new(MpscQueueAdapter::<String>::new(10));
530
531 // Test push
532 adapter.push("test".to_string()).await.unwrap();
533
534 // Test pull
535 let pulled = adapter.pull().await;
536 assert!(pulled.is_some());
537 assert_eq!(pulled.unwrap(), "test");
538 }
539
540 #[tokio::test]
541 async fn test_handle_resolution_work() {
542 let work = HandleResolutionWork::new("alice.example.com".to_string());
543
544 assert_eq!(work.handle, "alice.example.com");
545 }
546
547 #[tokio::test]
548 async fn test_redis_queue_adapter_push_pull() {
549 let pool = match crate::test_helpers::get_test_redis_pool() {
550 Some(p) => p,
551 None => return,
552 };
553
554 // Create adapter with unique prefix for testing
555 let test_prefix = format!("test:queue:{}:", uuid::Uuid::new_v4());
556 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config(
557 pool.clone(),
558 Some("test-worker".to_string()),
559 test_prefix.clone(),
560 1, // 1 second timeout for tests
561 ));
562
563 // Test push
564 adapter.push("test-item".to_string()).await.unwrap();
565
566 // Test pull
567 let pulled = adapter.pull().await;
568 assert!(pulled.is_some());
569 assert_eq!(pulled.unwrap(), "test-item");
570
571 // Test ack
572 adapter
573 .ack(&"test-item".to_string())
574 .await
575 .expect("Ack should succeed");
576
577 // Clean up test data - manually clean worker queue since cleanup was removed
578 // In production, items would timeout or be processed
579 }
580
581 #[tokio::test]
582 async fn test_redis_queue_adapter_reliable_queue() {
583 let pool = match crate::test_helpers::get_test_redis_pool() {
584 Some(p) => p,
585 None => return,
586 };
587
588 let test_prefix = format!("test:queue:{}:", uuid::Uuid::new_v4());
589 let worker_id = "test-worker-reliable";
590
591 // Create first adapter
592 let adapter1 = Arc::new(RedisQueueAdapter::<String>::with_config(
593 pool.clone(),
594 Some(worker_id.to_string()),
595 test_prefix.clone(),
596 1,
597 ));
598
599 // Push multiple items
600 adapter1.push("item1".to_string()).await.unwrap();
601 adapter1.push("item2".to_string()).await.unwrap();
602 adapter1.push("item3".to_string()).await.unwrap();
603
604 // Pull but don't ack (simulating worker crash)
605 let item1 = adapter1.pull().await;
606 assert!(item1.is_some());
607 assert_eq!(item1.unwrap(), "item1");
608
609 // Create second adapter with same worker_id (simulating restart)
610 let adapter2 = Arc::new(RedisQueueAdapter::<String>::with_config(
611 pool.clone(),
612 Some(worker_id.to_string()),
613 test_prefix.clone(),
614 1,
615 ));
616
617 // In a real scenario, unacked items would be handled by timeout or manual recovery
618 // For this test, we just verify the item is in the worker queue
619 let recovered = adapter2.pull().await;
620 assert!(recovered.is_some());
621 }
622
623 #[tokio::test]
624 async fn test_redis_queue_adapter_depth() {
625 let pool = match crate::test_helpers::get_test_redis_pool() {
626 Some(p) => p,
627 None => return,
628 };
629
630 let test_prefix = format!("test:queue:{}:", uuid::Uuid::new_v4());
631 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config(
632 pool.clone(),
633 None,
634 test_prefix.clone(),
635 1,
636 ));
637
638 // Initially empty
639 let depth = adapter.depth().await;
640 assert_eq!(depth, Some(0));
641
642 // Push items and check depth
643 adapter.push("item1".to_string()).await.unwrap();
644 assert_eq!(adapter.depth().await, Some(1));
645
646 adapter.push("item2".to_string()).await.unwrap();
647 assert_eq!(adapter.depth().await, Some(2));
648
649 // Pull and check depth decreases
650 let _ = adapter.pull().await;
651 // Note: depth checks primary queue, not worker queue
652 assert_eq!(adapter.depth().await, Some(1));
653
654 // Test cleanup is automatic when adapter is dropped
655 }
656
657 #[tokio::test]
658 async fn test_redis_queue_adapter_health() {
659 let pool = match crate::test_helpers::get_test_redis_pool() {
660 Some(p) => p,
661 None => return,
662 };
663
664 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config(
665 pool,
666 None,
667 "test:queue:health:".to_string(),
668 1,
669 ));
670
671 // Should be healthy if Redis is running
672 assert!(adapter.is_healthy().await);
673 }
674}