QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at 0ce1bed80f0bf3d761fd231ec2f4ddd5efc9d85b 673 lines 20 kB view raw
1//! Generic queue adapter system for work queue abstraction. 2//! 3//! This module provides a generic trait and implementations for queue adapters 4//! that can be used with any work type for handle resolution and other tasks. 5 6use async_trait::async_trait; 7use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands}; 8use serde::{Deserialize, Serialize}; 9use std::sync::Arc; 10use thiserror::Error; 11use tokio::sync::{Mutex, mpsc}; 12use tracing::{debug, error, warn}; 13 14/// Queue operation errors 15#[derive(Error, Debug)] 16pub enum QueueError { 17 #[error("error-quickdid-queue-1 Failed to push to queue: {0}")] 18 PushFailed(String), 19 20 #[error("error-quickdid-queue-2 Queue is full")] 21 QueueFull, 22 23 #[error("error-quickdid-queue-3 Queue is closed")] 24 QueueClosed, 25 26 #[error("error-quickdid-queue-4 Redis connection failed: {0}")] 27 RedisConnectionFailed(String), 28 29 #[error("error-quickdid-queue-5 Redis operation failed: {operation}: {details}")] 30 RedisOperationFailed { operation: String, details: String }, 31 32 #[error("error-quickdid-queue-6 Serialization failed: {0}")] 33 SerializationFailed(String), 34 35 #[error("error-quickdid-queue-7 Deserialization failed: {0}")] 36 DeserializationFailed(String), 37 38 #[error("error-quickdid-queue-8 Item not found in worker queue during acknowledgment")] 39 AckItemNotFound, 40} 41 42type Result<T> = std::result::Result<T, QueueError>; 43 44/// Generic trait for queue adapters that can work with any work type. 45/// 46/// This trait provides a common interface for different queue implementations 47/// (MPSC, Redis, PostgreSQL, etc.) allowing them to be used interchangeably. 48#[async_trait] 49pub trait QueueAdapter<T>: Send + Sync 50where 51 T: Send + Sync + 'static, 52{ 53 /// Pull the next work item from the queue. 54 /// 55 /// Returns None if the queue is closed or empty (depending on implementation). 56 async fn pull(&self) -> Option<T>; 57 58 /// Push a work item to the queue. 59 /// 60 /// Returns an error if the queue is full or closed. 61 async fn push(&self, work: T) -> Result<()>; 62 63 /// Acknowledge that a work item has been successfully processed. 64 /// 65 /// This is used by reliable queue implementations to remove the item 66 /// from a temporary processing queue. Implementations that don't require 67 /// acknowledgment (like MPSC) can use the default no-op implementation. 68 async fn ack(&self, _item: &T) -> Result<()> { 69 // Default no-op implementation for queues that don't need acknowledgment 70 Ok(()) 71 } 72 73 /// Try to push a work item without blocking. 74 /// 75 /// Returns an error if the queue is full or closed. 76 async fn try_push(&self, work: T) -> Result<()> { 77 // Default implementation uses regular push 78 self.push(work).await 79 } 80 81 /// Get the current queue depth if available. 82 /// 83 /// Returns None if the implementation doesn't support queue depth. 84 async fn depth(&self) -> Option<usize> { 85 None 86 } 87 88 /// Check if the queue is healthy. 89 /// 90 /// Used for health checks and monitoring. 91 async fn is_healthy(&self) -> bool { 92 true 93 } 94} 95 96/// MPSC channel-based queue adapter implementation. 97/// 98/// This adapter uses tokio's multi-producer, single-consumer channel 99/// for in-memory queuing of work items. It's suitable for single-instance 100/// deployments with moderate throughput requirements. 101pub(crate) struct MpscQueueAdapter<T> 102where 103 T: Send + Sync + 'static, 104{ 105 receiver: Arc<Mutex<mpsc::Receiver<T>>>, 106 sender: mpsc::Sender<T>, 107} 108 109impl<T> MpscQueueAdapter<T> 110where 111 T: Send + Sync + 'static, 112{ 113 /// Create a new MPSC queue adapter with the specified buffer size. 114 pub(crate) fn new(buffer: usize) -> Self { 115 let (sender, receiver) = mpsc::channel(buffer); 116 Self { 117 receiver: Arc::new(Mutex::new(receiver)), 118 sender, 119 } 120 } 121 122 /// Create an adapter from existing MPSC channels (for backward compatibility). 123 pub(crate) fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self { 124 Self { 125 receiver: Arc::new(Mutex::new(receiver)), 126 sender, 127 } 128 } 129} 130 131#[async_trait] 132impl<T> QueueAdapter<T> for MpscQueueAdapter<T> 133where 134 T: Send + Sync + 'static, 135{ 136 async fn pull(&self) -> Option<T> { 137 let mut receiver = self.receiver.lock().await; 138 receiver.recv().await 139 } 140 141 async fn push(&self, work: T) -> Result<()> { 142 self.sender 143 .send(work) 144 .await 145 .map_err(|e| QueueError::PushFailed(e.to_string())) 146 } 147 148 async fn try_push(&self, work: T) -> Result<()> { 149 self.sender.try_send(work).map_err(|e| match e { 150 mpsc::error::TrySendError::Full(_) => QueueError::QueueFull, 151 mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed, 152 }) 153 } 154 155 async fn depth(&self) -> Option<usize> { 156 // Note: This is an approximation as mpsc doesn't provide exact depth 157 Some(self.sender.max_capacity() - self.sender.capacity()) 158 } 159 160 async fn is_healthy(&self) -> bool { 161 !self.sender.is_closed() 162 } 163} 164 165/// Work item for handle resolution tasks 166#[derive(Debug, Clone, Serialize, Deserialize)] 167pub struct HandleResolutionWork { 168 /// The handle to resolve 169 pub handle: String, 170} 171 172impl HandleResolutionWork { 173 /// Create a new handle resolution work item 174 pub fn new(handle: String) -> Self { 175 Self { handle } 176 } 177} 178 179/// Redis-backed queue adapter implementation. 180/// 181/// This adapter uses Redis lists with a reliable queue pattern: 182/// - LPUSH to push items to the primary queue 183/// - RPOPLPUSH to atomically move items from primary to worker queue 184/// - LREM to acknowledge processed items from worker queue 185/// 186/// This ensures at-least-once delivery semantics and allows for recovery 187/// of in-flight items if a worker crashes. 188pub(crate) struct RedisQueueAdapter<T> 189where 190 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 191{ 192 /// Redis connection pool 193 pool: RedisPool, 194 /// Unique worker ID for this adapter instance 195 worker_id: String, 196 /// Key prefix for all queues (default: "queue:handleresolver:") 197 key_prefix: String, 198 /// Timeout for blocking RPOPLPUSH operations 199 timeout_seconds: u64, 200 /// Type marker for generic parameter 201 _phantom: std::marker::PhantomData<T>, 202} 203 204impl<T> RedisQueueAdapter<T> 205where 206 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 207{ 208 /// Create a new Redis queue adapter with custom configuration 209 fn with_config( 210 pool: RedisPool, 211 worker_id: String, 212 key_prefix: String, 213 timeout_seconds: u64, 214 ) -> Self { 215 Self { 216 pool, 217 worker_id, 218 key_prefix, 219 timeout_seconds, 220 _phantom: std::marker::PhantomData, 221 } 222 } 223 224 /// Get the primary queue key 225 fn primary_queue_key(&self) -> String { 226 format!("{}primary", self.key_prefix) 227 } 228 229 /// Get the worker-specific temporary queue key 230 fn worker_queue_key(&self) -> String { 231 format!("{}{}", self.key_prefix, self.worker_id) 232 } 233} 234 235#[async_trait] 236impl<T> QueueAdapter<T> for RedisQueueAdapter<T> 237where 238 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 239{ 240 async fn pull(&self) -> Option<T> { 241 match self.pool.get().await { 242 Ok(mut conn) => { 243 let primary_key = self.primary_queue_key(); 244 let worker_key = self.worker_queue_key(); 245 246 // Use blocking RPOPLPUSH to atomically move item from primary to worker queue 247 let data: Option<Vec<u8>> = match conn 248 .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64) 249 .await 250 { 251 Ok(data) => data, 252 Err(e) => { 253 error!("Failed to pull from queue: {}", e); 254 return None; 255 } 256 }; 257 258 if let Some(data) = data { 259 // Deserialize the item 260 match serde_json::from_slice(&data) { 261 Ok(item) => { 262 debug!( 263 worker_id = %self.worker_id, 264 "Pulled item from queue" 265 ); 266 Some(item) 267 } 268 Err(e) => { 269 error!("Failed to deserialize item: {}", e); 270 // Remove the corrupted item from worker queue 271 let _: std::result::Result<(), _> = 272 conn.lrem(&worker_key, 1, &data).await; 273 None 274 } 275 } 276 } else { 277 None 278 } 279 } 280 Err(e) => { 281 error!("Failed to get Redis connection: {}", e); 282 None 283 } 284 } 285 } 286 287 async fn push(&self, work: T) -> Result<()> { 288 let mut conn = self 289 .pool 290 .get() 291 .await 292 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 293 294 let data = serde_json::to_vec(&work) 295 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 296 297 let primary_key = self.primary_queue_key(); 298 299 conn.lpush::<_, _, ()>(&primary_key, data) 300 .await 301 .map_err(|e| QueueError::RedisOperationFailed { 302 operation: "LPUSH".to_string(), 303 details: e.to_string(), 304 })?; 305 306 debug!("Pushed item to queue"); 307 Ok(()) 308 } 309 310 async fn ack(&self, item: &T) -> Result<()> { 311 let mut conn = self 312 .pool 313 .get() 314 .await 315 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 316 317 let data = 318 serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 319 320 let worker_key = self.worker_queue_key(); 321 322 // Remove exactly one occurrence of this item from the worker queue 323 let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| { 324 QueueError::RedisOperationFailed { 325 operation: "LREM".to_string(), 326 details: e.to_string(), 327 } 328 })?; 329 330 if removed == 0 { 331 warn!( 332 worker_id = %self.worker_id, 333 "Item not found in worker queue during acknowledgment" 334 ); 335 } else { 336 debug!( 337 worker_id = %self.worker_id, 338 "Acknowledged item" 339 ); 340 } 341 342 Ok(()) 343 } 344 345 async fn depth(&self) -> Option<usize> { 346 match self.pool.get().await { 347 Ok(mut conn) => { 348 let primary_key = self.primary_queue_key(); 349 match conn.llen::<_, usize>(&primary_key).await { 350 Ok(len) => Some(len), 351 Err(e) => { 352 error!("Failed to get queue depth: {}", e); 353 None 354 } 355 } 356 } 357 Err(e) => { 358 error!("Failed to get Redis connection: {}", e); 359 None 360 } 361 } 362 } 363 364 async fn is_healthy(&self) -> bool { 365 match self.pool.get().await { 366 Ok(mut conn) => { 367 // Ping Redis to check health 368 match deadpool_redis::redis::cmd("PING") 369 .query_async::<String>(&mut conn) 370 .await 371 { 372 Ok(response) => response == "PONG", 373 Err(_) => false, 374 } 375 } 376 Err(_) => false, 377 } 378 } 379} 380 381/// No-operation queue adapter that discards all work items. 382/// 383/// This adapter is useful for configurations where queuing is disabled 384/// or as a fallback when other queue adapters fail to initialize. 385pub(crate) struct NoopQueueAdapter<T> 386where 387 T: Send + Sync + 'static, 388{ 389 _phantom: std::marker::PhantomData<T>, 390} 391 392impl<T> NoopQueueAdapter<T> 393where 394 T: Send + Sync + 'static, 395{ 396 /// Create a new no-op queue adapter 397 pub(crate) fn new() -> Self { 398 Self { 399 _phantom: std::marker::PhantomData, 400 } 401 } 402} 403 404impl<T> Default for NoopQueueAdapter<T> 405where 406 T: Send + Sync + 'static, 407{ 408 fn default() -> Self { 409 Self::new() 410 } 411} 412 413#[async_trait] 414impl<T> QueueAdapter<T> for NoopQueueAdapter<T> 415where 416 T: Send + Sync + 'static, 417{ 418 async fn pull(&self) -> Option<T> { 419 // Never returns any work 420 tokio::time::sleep(std::time::Duration::from_secs(60)).await; 421 None 422 } 423 424 async fn push(&self, _work: T) -> Result<()> { 425 // Silently discard the work 426 Ok(()) 427 } 428 429 async fn ack(&self, _item: &T) -> Result<()> { 430 // No-op 431 Ok(()) 432 } 433 434 async fn try_push(&self, _work: T) -> Result<()> { 435 // Silently discard the work 436 Ok(()) 437 } 438 439 async fn depth(&self) -> Option<usize> { 440 // Always empty 441 Some(0) 442 } 443 444 async fn is_healthy(&self) -> bool { 445 // Always healthy 446 true 447 } 448} 449 450// ========= Factory Functions for Queue Adapters ========= 451 452/// Create a new MPSC queue adapter with the specified buffer size. 453/// 454/// This creates an in-memory queue suitable for single-instance deployments. 455/// 456/// # Arguments 457/// 458/// * `buffer` - The buffer size for the channel 459pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>> 460where 461 T: Send + Sync + 'static, 462{ 463 Arc::new(MpscQueueAdapter::new(buffer)) 464} 465 466/// Create an MPSC queue adapter from existing channels. 467/// 468/// This allows integration with existing channel-based architectures. 469/// 470/// # Arguments 471/// 472/// * `sender` - The sender half of the channel 473/// * `receiver` - The receiver half of the channel 474pub fn create_mpsc_queue_from_channel<T>( 475 sender: mpsc::Sender<T>, 476 receiver: mpsc::Receiver<T>, 477) -> Arc<dyn QueueAdapter<T>> 478where 479 T: Send + Sync + 'static, 480{ 481 Arc::new(MpscQueueAdapter::from_channel(sender, receiver)) 482} 483 484/// Create a new Redis-backed queue adapter. 485/// 486/// This creates a distributed queue suitable for multi-instance deployments. 487/// 488/// # Arguments 489/// 490/// * `pool` - Redis connection pool 491/// * `worker_id` - Worker identifier for this queue instance 492/// * `key_prefix` - Redis key prefix for queue operations 493/// * `timeout_seconds` - Timeout for blocking operations 494pub fn create_redis_queue<T>( 495 pool: RedisPool, 496 worker_id: String, 497 key_prefix: String, 498 timeout_seconds: u64, 499) -> Arc<dyn QueueAdapter<T>> 500where 501 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 502{ 503 Arc::new(RedisQueueAdapter::with_config( 504 pool, 505 worker_id, 506 key_prefix, 507 timeout_seconds, 508 )) 509} 510 511/// Create a no-operation queue adapter. 512/// 513/// This creates a queue that discards all work items, useful for testing 514/// or when queue processing is disabled. 515pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>> 516where 517 T: Send + Sync + 'static, 518{ 519 Arc::new(NoopQueueAdapter::new()) 520} 521 522#[cfg(test)] 523mod tests { 524 use super::*; 525 526 #[tokio::test] 527 async fn test_mpsc_queue_adapter_push_pull() { 528 let adapter = Arc::new(MpscQueueAdapter::<String>::new(10)); 529 530 // Test push 531 adapter.push("test".to_string()).await.unwrap(); 532 533 // Test pull 534 let pulled = adapter.pull().await; 535 assert!(pulled.is_some()); 536 assert_eq!(pulled.unwrap(), "test"); 537 } 538 539 #[tokio::test] 540 async fn test_handle_resolution_work() { 541 let work = HandleResolutionWork::new("alice.example.com".to_string()); 542 543 assert_eq!(work.handle, "alice.example.com"); 544 } 545 546 #[tokio::test] 547 async fn test_redis_queue_adapter_push_pull() { 548 let pool = match crate::test_helpers::get_test_redis_pool() { 549 Some(p) => p, 550 None => return, 551 }; 552 553 // Create adapter with unique prefix for testing 554 let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); 555 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 556 pool.clone(), 557 "test-worker".to_string(), 558 test_prefix.clone(), 559 1, // 1 second timeout for tests 560 )); 561 562 // Test push 563 adapter.push("test-item".to_string()).await.unwrap(); 564 565 // Test pull 566 let pulled = adapter.pull().await; 567 assert!(pulled.is_some()); 568 assert_eq!(pulled.unwrap(), "test-item"); 569 570 // Test ack 571 adapter 572 .ack(&"test-item".to_string()) 573 .await 574 .expect("Ack should succeed"); 575 576 // Clean up test data - manually clean worker queue since cleanup was removed 577 // In production, items would timeout or be processed 578 } 579 580 #[tokio::test] 581 async fn test_redis_queue_adapter_reliable_queue() { 582 let pool = match crate::test_helpers::get_test_redis_pool() { 583 Some(p) => p, 584 None => return, 585 }; 586 587 let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); 588 let worker_id = "test-worker-reliable"; 589 590 // Create first adapter 591 let adapter1 = Arc::new(RedisQueueAdapter::<String>::with_config( 592 pool.clone(), 593 worker_id.to_string(), 594 test_prefix.clone(), 595 1, 596 )); 597 598 // Push multiple items 599 adapter1.push("item1".to_string()).await.unwrap(); 600 adapter1.push("item2".to_string()).await.unwrap(); 601 adapter1.push("item3".to_string()).await.unwrap(); 602 603 // Pull but don't ack (simulating worker crash) 604 let item1 = adapter1.pull().await; 605 assert!(item1.is_some()); 606 assert_eq!(item1.unwrap(), "item1"); 607 608 // Create second adapter with same worker_id (simulating restart) 609 let adapter2 = Arc::new(RedisQueueAdapter::<String>::with_config( 610 pool.clone(), 611 worker_id.to_string(), 612 test_prefix.clone(), 613 1, 614 )); 615 616 // In a real scenario, unacked items would be handled by timeout or manual recovery 617 // For this test, we just verify the item is in the worker queue 618 let recovered = adapter2.pull().await; 619 assert!(recovered.is_some()); 620 } 621 622 #[tokio::test] 623 async fn test_redis_queue_adapter_depth() { 624 let pool = match crate::test_helpers::get_test_redis_pool() { 625 Some(p) => p, 626 None => return, 627 }; 628 629 let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); 630 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 631 pool.clone(), 632 "test-worker-depth".to_string(), 633 test_prefix.clone(), 634 1, 635 )); 636 637 // Initially empty 638 let depth = adapter.depth().await; 639 assert_eq!(depth, Some(0)); 640 641 // Push items and check depth 642 adapter.push("item1".to_string()).await.unwrap(); 643 assert_eq!(adapter.depth().await, Some(1)); 644 645 adapter.push("item2".to_string()).await.unwrap(); 646 assert_eq!(adapter.depth().await, Some(2)); 647 648 // Pull and check depth decreases 649 let _ = adapter.pull().await; 650 // Note: depth checks primary queue, not worker queue 651 assert_eq!(adapter.depth().await, Some(1)); 652 653 // Test cleanup is automatic when adapter is dropped 654 } 655 656 #[tokio::test] 657 async fn test_redis_queue_adapter_health() { 658 let pool = match crate::test_helpers::get_test_redis_pool() { 659 Some(p) => p, 660 None => return, 661 }; 662 663 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 664 pool, 665 "test-worker-health".to_string(), 666 "test:queue:health:".to_string(), 667 1, 668 )); 669 670 // Should be healthy if Redis is running 671 assert!(adapter.is_healthy().await); 672 } 673}