QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Generic queue adapter system for work queue abstraction.
2//!
3//! This module provides a generic trait and implementations for queue adapters
4//! that can be used with any work type for handle resolution and other tasks.
5
6use async_trait::async_trait;
7use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands};
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use thiserror::Error;
11use tokio::sync::{Mutex, mpsc};
12use tracing::{debug, error, warn};
13
14/// Queue operation errors
15#[derive(Error, Debug)]
16pub enum QueueError {
17 #[error("error-quickdid-queue-1 Failed to push to queue: {0}")]
18 PushFailed(String),
19
20 #[error("error-quickdid-queue-2 Queue is full")]
21 QueueFull,
22
23 #[error("error-quickdid-queue-3 Queue is closed")]
24 QueueClosed,
25
26 #[error("error-quickdid-queue-4 Redis connection failed: {0}")]
27 RedisConnectionFailed(String),
28
29 #[error("error-quickdid-queue-5 Redis operation failed: {operation}: {details}")]
30 RedisOperationFailed { operation: String, details: String },
31
32 #[error("error-quickdid-queue-6 Serialization failed: {0}")]
33 SerializationFailed(String),
34
35 #[error("error-quickdid-queue-7 Deserialization failed: {0}")]
36 DeserializationFailed(String),
37
38 #[error("error-quickdid-queue-8 Item not found in worker queue during acknowledgment")]
39 AckItemNotFound,
40}
41
42type Result<T> = std::result::Result<T, QueueError>;
43
44/// Generic trait for queue adapters that can work with any work type.
45///
46/// This trait provides a common interface for different queue implementations
47/// (MPSC, Redis, PostgreSQL, etc.) allowing them to be used interchangeably.
48#[async_trait]
49pub trait QueueAdapter<T>: Send + Sync
50where
51 T: Send + Sync + 'static,
52{
53 /// Pull the next work item from the queue.
54 ///
55 /// Returns None if the queue is closed or empty (depending on implementation).
56 async fn pull(&self) -> Option<T>;
57
58 /// Push a work item to the queue.
59 ///
60 /// Returns an error if the queue is full or closed.
61 async fn push(&self, work: T) -> Result<()>;
62
63 /// Acknowledge that a work item has been successfully processed.
64 ///
65 /// This is used by reliable queue implementations to remove the item
66 /// from a temporary processing queue. Implementations that don't require
67 /// acknowledgment (like MPSC) can use the default no-op implementation.
68 async fn ack(&self, _item: &T) -> Result<()> {
69 // Default no-op implementation for queues that don't need acknowledgment
70 Ok(())
71 }
72
73 /// Try to push a work item without blocking.
74 ///
75 /// Returns an error if the queue is full or closed.
76 async fn try_push(&self, work: T) -> Result<()> {
77 // Default implementation uses regular push
78 self.push(work).await
79 }
80
81 /// Get the current queue depth if available.
82 ///
83 /// Returns None if the implementation doesn't support queue depth.
84 async fn depth(&self) -> Option<usize> {
85 None
86 }
87
88 /// Check if the queue is healthy.
89 ///
90 /// Used for health checks and monitoring.
91 async fn is_healthy(&self) -> bool {
92 true
93 }
94}
95
96/// MPSC channel-based queue adapter implementation.
97///
98/// This adapter uses tokio's multi-producer, single-consumer channel
99/// for in-memory queuing of work items. It's suitable for single-instance
100/// deployments with moderate throughput requirements.
101pub(crate) struct MpscQueueAdapter<T>
102where
103 T: Send + Sync + 'static,
104{
105 receiver: Arc<Mutex<mpsc::Receiver<T>>>,
106 sender: mpsc::Sender<T>,
107}
108
109impl<T> MpscQueueAdapter<T>
110where
111 T: Send + Sync + 'static,
112{
113 /// Create a new MPSC queue adapter with the specified buffer size.
114 pub(crate) fn new(buffer: usize) -> Self {
115 let (sender, receiver) = mpsc::channel(buffer);
116 Self {
117 receiver: Arc::new(Mutex::new(receiver)),
118 sender,
119 }
120 }
121
122 /// Create an adapter from existing MPSC channels (for backward compatibility).
123 pub(crate) fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self {
124 Self {
125 receiver: Arc::new(Mutex::new(receiver)),
126 sender,
127 }
128 }
129}
130
131#[async_trait]
132impl<T> QueueAdapter<T> for MpscQueueAdapter<T>
133where
134 T: Send + Sync + 'static,
135{
136 async fn pull(&self) -> Option<T> {
137 let mut receiver = self.receiver.lock().await;
138 receiver.recv().await
139 }
140
141 async fn push(&self, work: T) -> Result<()> {
142 self.sender
143 .send(work)
144 .await
145 .map_err(|e| QueueError::PushFailed(e.to_string()))
146 }
147
148 async fn try_push(&self, work: T) -> Result<()> {
149 self.sender.try_send(work).map_err(|e| match e {
150 mpsc::error::TrySendError::Full(_) => QueueError::QueueFull,
151 mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed,
152 })
153 }
154
155 async fn depth(&self) -> Option<usize> {
156 // Note: This is an approximation as mpsc doesn't provide exact depth
157 Some(self.sender.max_capacity() - self.sender.capacity())
158 }
159
160 async fn is_healthy(&self) -> bool {
161 !self.sender.is_closed()
162 }
163}
164
165/// Work item for handle resolution tasks
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct HandleResolutionWork {
168 /// The handle to resolve
169 pub handle: String,
170}
171
172impl HandleResolutionWork {
173 /// Create a new handle resolution work item
174 pub fn new(handle: String) -> Self {
175 Self { handle }
176 }
177}
178
179/// Redis-backed queue adapter implementation.
180///
181/// This adapter uses Redis lists with a reliable queue pattern:
182/// - LPUSH to push items to the primary queue
183/// - RPOPLPUSH to atomically move items from primary to worker queue
184/// - LREM to acknowledge processed items from worker queue
185///
186/// This ensures at-least-once delivery semantics and allows for recovery
187/// of in-flight items if a worker crashes.
188pub(crate) struct RedisQueueAdapter<T>
189where
190 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
191{
192 /// Redis connection pool
193 pool: RedisPool,
194 /// Unique worker ID for this adapter instance
195 worker_id: String,
196 /// Key prefix for all queues (default: "queue:handleresolver:")
197 key_prefix: String,
198 /// Timeout for blocking RPOPLPUSH operations
199 timeout_seconds: u64,
200 /// Type marker for generic parameter
201 _phantom: std::marker::PhantomData<T>,
202}
203
204impl<T> RedisQueueAdapter<T>
205where
206 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
207{
208 /// Create a new Redis queue adapter with custom configuration
209 fn with_config(
210 pool: RedisPool,
211 worker_id: String,
212 key_prefix: String,
213 timeout_seconds: u64,
214 ) -> Self {
215 Self {
216 pool,
217 worker_id,
218 key_prefix,
219 timeout_seconds,
220 _phantom: std::marker::PhantomData,
221 }
222 }
223
224 /// Get the primary queue key
225 fn primary_queue_key(&self) -> String {
226 format!("{}primary", self.key_prefix)
227 }
228
229 /// Get the worker-specific temporary queue key
230 fn worker_queue_key(&self) -> String {
231 format!("{}{}", self.key_prefix, self.worker_id)
232 }
233}
234
235#[async_trait]
236impl<T> QueueAdapter<T> for RedisQueueAdapter<T>
237where
238 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
239{
240 async fn pull(&self) -> Option<T> {
241 match self.pool.get().await {
242 Ok(mut conn) => {
243 let primary_key = self.primary_queue_key();
244 let worker_key = self.worker_queue_key();
245
246 // Use blocking RPOPLPUSH to atomically move item from primary to worker queue
247 let data: Option<Vec<u8>> = match conn
248 .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64)
249 .await
250 {
251 Ok(data) => data,
252 Err(e) => {
253 error!("Failed to pull from queue: {}", e);
254 return None;
255 }
256 };
257
258 if let Some(data) = data {
259 // Deserialize the item
260 match serde_json::from_slice(&data) {
261 Ok(item) => {
262 debug!(
263 worker_id = %self.worker_id,
264 "Pulled item from queue"
265 );
266 Some(item)
267 }
268 Err(e) => {
269 error!("Failed to deserialize item: {}", e);
270 // Remove the corrupted item from worker queue
271 let _: std::result::Result<(), _> =
272 conn.lrem(&worker_key, 1, &data).await;
273 None
274 }
275 }
276 } else {
277 None
278 }
279 }
280 Err(e) => {
281 error!("Failed to get Redis connection: {}", e);
282 None
283 }
284 }
285 }
286
287 async fn push(&self, work: T) -> Result<()> {
288 let mut conn = self
289 .pool
290 .get()
291 .await
292 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?;
293
294 let data = serde_json::to_vec(&work)
295 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
296
297 let primary_key = self.primary_queue_key();
298
299 conn.lpush::<_, _, ()>(&primary_key, data)
300 .await
301 .map_err(|e| QueueError::RedisOperationFailed {
302 operation: "LPUSH".to_string(),
303 details: e.to_string(),
304 })?;
305
306 debug!("Pushed item to queue");
307 Ok(())
308 }
309
310 async fn ack(&self, item: &T) -> Result<()> {
311 let mut conn = self
312 .pool
313 .get()
314 .await
315 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?;
316
317 let data =
318 serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
319
320 let worker_key = self.worker_queue_key();
321
322 // Remove exactly one occurrence of this item from the worker queue
323 let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| {
324 QueueError::RedisOperationFailed {
325 operation: "LREM".to_string(),
326 details: e.to_string(),
327 }
328 })?;
329
330 if removed == 0 {
331 warn!(
332 worker_id = %self.worker_id,
333 "Item not found in worker queue during acknowledgment"
334 );
335 } else {
336 debug!(
337 worker_id = %self.worker_id,
338 "Acknowledged item"
339 );
340 }
341
342 Ok(())
343 }
344
345 async fn depth(&self) -> Option<usize> {
346 match self.pool.get().await {
347 Ok(mut conn) => {
348 let primary_key = self.primary_queue_key();
349 match conn.llen::<_, usize>(&primary_key).await {
350 Ok(len) => Some(len),
351 Err(e) => {
352 error!("Failed to get queue depth: {}", e);
353 None
354 }
355 }
356 }
357 Err(e) => {
358 error!("Failed to get Redis connection: {}", e);
359 None
360 }
361 }
362 }
363
364 async fn is_healthy(&self) -> bool {
365 match self.pool.get().await {
366 Ok(mut conn) => {
367 // Ping Redis to check health
368 match deadpool_redis::redis::cmd("PING")
369 .query_async::<String>(&mut conn)
370 .await
371 {
372 Ok(response) => response == "PONG",
373 Err(_) => false,
374 }
375 }
376 Err(_) => false,
377 }
378 }
379}
380
381/// No-operation queue adapter that discards all work items.
382///
383/// This adapter is useful for configurations where queuing is disabled
384/// or as a fallback when other queue adapters fail to initialize.
385pub(crate) struct NoopQueueAdapter<T>
386where
387 T: Send + Sync + 'static,
388{
389 _phantom: std::marker::PhantomData<T>,
390}
391
392impl<T> NoopQueueAdapter<T>
393where
394 T: Send + Sync + 'static,
395{
396 /// Create a new no-op queue adapter
397 pub(crate) fn new() -> Self {
398 Self {
399 _phantom: std::marker::PhantomData,
400 }
401 }
402}
403
404impl<T> Default for NoopQueueAdapter<T>
405where
406 T: Send + Sync + 'static,
407{
408 fn default() -> Self {
409 Self::new()
410 }
411}
412
413#[async_trait]
414impl<T> QueueAdapter<T> for NoopQueueAdapter<T>
415where
416 T: Send + Sync + 'static,
417{
418 async fn pull(&self) -> Option<T> {
419 // Never returns any work
420 tokio::time::sleep(std::time::Duration::from_secs(60)).await;
421 None
422 }
423
424 async fn push(&self, _work: T) -> Result<()> {
425 // Silently discard the work
426 Ok(())
427 }
428
429 async fn ack(&self, _item: &T) -> Result<()> {
430 // No-op
431 Ok(())
432 }
433
434 async fn try_push(&self, _work: T) -> Result<()> {
435 // Silently discard the work
436 Ok(())
437 }
438
439 async fn depth(&self) -> Option<usize> {
440 // Always empty
441 Some(0)
442 }
443
444 async fn is_healthy(&self) -> bool {
445 // Always healthy
446 true
447 }
448}
449
450// ========= Factory Functions for Queue Adapters =========
451
452/// Create a new MPSC queue adapter with the specified buffer size.
453///
454/// This creates an in-memory queue suitable for single-instance deployments.
455///
456/// # Arguments
457///
458/// * `buffer` - The buffer size for the channel
459pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>>
460where
461 T: Send + Sync + 'static,
462{
463 Arc::new(MpscQueueAdapter::new(buffer))
464}
465
466/// Create an MPSC queue adapter from existing channels.
467///
468/// This allows integration with existing channel-based architectures.
469///
470/// # Arguments
471///
472/// * `sender` - The sender half of the channel
473/// * `receiver` - The receiver half of the channel
474pub fn create_mpsc_queue_from_channel<T>(
475 sender: mpsc::Sender<T>,
476 receiver: mpsc::Receiver<T>,
477) -> Arc<dyn QueueAdapter<T>>
478where
479 T: Send + Sync + 'static,
480{
481 Arc::new(MpscQueueAdapter::from_channel(sender, receiver))
482}
483
484/// Create a new Redis-backed queue adapter.
485///
486/// This creates a distributed queue suitable for multi-instance deployments.
487///
488/// # Arguments
489///
490/// * `pool` - Redis connection pool
491/// * `worker_id` - Worker identifier for this queue instance
492/// * `key_prefix` - Redis key prefix for queue operations
493/// * `timeout_seconds` - Timeout for blocking operations
494pub fn create_redis_queue<T>(
495 pool: RedisPool,
496 worker_id: String,
497 key_prefix: String,
498 timeout_seconds: u64,
499) -> Arc<dyn QueueAdapter<T>>
500where
501 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
502{
503 Arc::new(RedisQueueAdapter::with_config(
504 pool,
505 worker_id,
506 key_prefix,
507 timeout_seconds,
508 ))
509}
510
511/// Create a no-operation queue adapter.
512///
513/// This creates a queue that discards all work items, useful for testing
514/// or when queue processing is disabled.
515pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>>
516where
517 T: Send + Sync + 'static,
518{
519 Arc::new(NoopQueueAdapter::new())
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 #[tokio::test]
527 async fn test_mpsc_queue_adapter_push_pull() {
528 let adapter = Arc::new(MpscQueueAdapter::<String>::new(10));
529
530 // Test push
531 adapter.push("test".to_string()).await.unwrap();
532
533 // Test pull
534 let pulled = adapter.pull().await;
535 assert!(pulled.is_some());
536 assert_eq!(pulled.unwrap(), "test");
537 }
538
539 #[tokio::test]
540 async fn test_handle_resolution_work() {
541 let work = HandleResolutionWork::new("alice.example.com".to_string());
542
543 assert_eq!(work.handle, "alice.example.com");
544 }
545
546 #[tokio::test]
547 async fn test_redis_queue_adapter_push_pull() {
548 let pool = match crate::test_helpers::get_test_redis_pool() {
549 Some(p) => p,
550 None => return,
551 };
552
553 // Create adapter with unique prefix for testing
554 let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
555 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config(
556 pool.clone(),
557 "test-worker".to_string(),
558 test_prefix.clone(),
559 1, // 1 second timeout for tests
560 ));
561
562 // Test push
563 adapter.push("test-item".to_string()).await.unwrap();
564
565 // Test pull
566 let pulled = adapter.pull().await;
567 assert!(pulled.is_some());
568 assert_eq!(pulled.unwrap(), "test-item");
569
570 // Test ack
571 adapter
572 .ack(&"test-item".to_string())
573 .await
574 .expect("Ack should succeed");
575
576 // Clean up test data - manually clean worker queue since cleanup was removed
577 // In production, items would timeout or be processed
578 }
579
580 #[tokio::test]
581 async fn test_redis_queue_adapter_reliable_queue() {
582 let pool = match crate::test_helpers::get_test_redis_pool() {
583 Some(p) => p,
584 None => return,
585 };
586
587 let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
588 let worker_id = "test-worker-reliable";
589
590 // Create first adapter
591 let adapter1 = Arc::new(RedisQueueAdapter::<String>::with_config(
592 pool.clone(),
593 worker_id.to_string(),
594 test_prefix.clone(),
595 1,
596 ));
597
598 // Push multiple items
599 adapter1.push("item1".to_string()).await.unwrap();
600 adapter1.push("item2".to_string()).await.unwrap();
601 adapter1.push("item3".to_string()).await.unwrap();
602
603 // Pull but don't ack (simulating worker crash)
604 let item1 = adapter1.pull().await;
605 assert!(item1.is_some());
606 assert_eq!(item1.unwrap(), "item1");
607
608 // Create second adapter with same worker_id (simulating restart)
609 let adapter2 = Arc::new(RedisQueueAdapter::<String>::with_config(
610 pool.clone(),
611 worker_id.to_string(),
612 test_prefix.clone(),
613 1,
614 ));
615
616 // In a real scenario, unacked items would be handled by timeout or manual recovery
617 // For this test, we just verify the item is in the worker queue
618 let recovered = adapter2.pull().await;
619 assert!(recovered.is_some());
620 }
621
622 #[tokio::test]
623 async fn test_redis_queue_adapter_depth() {
624 let pool = match crate::test_helpers::get_test_redis_pool() {
625 Some(p) => p,
626 None => return,
627 };
628
629 let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
630 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config(
631 pool.clone(),
632 "test-worker-depth".to_string(),
633 test_prefix.clone(),
634 1,
635 ));
636
637 // Initially empty
638 let depth = adapter.depth().await;
639 assert_eq!(depth, Some(0));
640
641 // Push items and check depth
642 adapter.push("item1".to_string()).await.unwrap();
643 assert_eq!(adapter.depth().await, Some(1));
644
645 adapter.push("item2".to_string()).await.unwrap();
646 assert_eq!(adapter.depth().await, Some(2));
647
648 // Pull and check depth decreases
649 let _ = adapter.pull().await;
650 // Note: depth checks primary queue, not worker queue
651 assert_eq!(adapter.depth().await, Some(1));
652
653 // Test cleanup is automatic when adapter is dropped
654 }
655
656 #[tokio::test]
657 async fn test_redis_queue_adapter_health() {
658 let pool = match crate::test_helpers::get_test_redis_pool() {
659 Some(p) => p,
660 None => return,
661 };
662
663 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config(
664 pool,
665 "test-worker-health".to_string(),
666 "test:queue:health:".to_string(),
667 1,
668 ));
669
670 // Should be healthy if Redis is running
671 assert!(adapter.is_healthy().await);
672 }
673}