QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at d3f53a2fcd3c1a14ed73f56c60bf67a500b71946 674 lines 20 kB view raw
1//! Generic queue adapter system for work queue abstraction. 2//! 3//! This module provides a generic trait and implementations for queue adapters 4//! that can be used with any work type for handle resolution and other tasks. 5 6use async_trait::async_trait; 7use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands}; 8use serde::{Deserialize, Serialize}; 9use std::sync::Arc; 10use thiserror::Error; 11use tokio::sync::{Mutex, mpsc}; 12use tracing::{debug, error, warn}; 13 14/// Queue operation errors 15#[derive(Error, Debug)] 16pub enum QueueError { 17 #[error("error-quickdid-queue-1 Failed to push to queue: {0}")] 18 PushFailed(String), 19 20 #[error("error-quickdid-queue-2 Queue is full")] 21 QueueFull, 22 23 #[error("error-quickdid-queue-3 Queue is closed")] 24 QueueClosed, 25 26 #[error("error-quickdid-queue-4 Redis connection failed: {0}")] 27 RedisConnectionFailed(String), 28 29 #[error("error-quickdid-queue-5 Redis operation failed: {operation}: {details}")] 30 RedisOperationFailed { operation: String, details: String }, 31 32 #[error("error-quickdid-queue-6 Serialization failed: {0}")] 33 SerializationFailed(String), 34 35 #[error("error-quickdid-queue-7 Deserialization failed: {0}")] 36 DeserializationFailed(String), 37 38 #[error("error-quickdid-queue-8 Item not found in worker queue during acknowledgment")] 39 AckItemNotFound, 40} 41 42type Result<T> = std::result::Result<T, QueueError>; 43 44/// Generic trait for queue adapters that can work with any work type. 45/// 46/// This trait provides a common interface for different queue implementations 47/// (MPSC, Redis, PostgreSQL, etc.) allowing them to be used interchangeably. 48#[async_trait] 49pub trait QueueAdapter<T>: Send + Sync 50where 51 T: Send + Sync + 'static, 52{ 53 /// Pull the next work item from the queue. 54 /// 55 /// Returns None if the queue is closed or empty (depending on implementation). 56 async fn pull(&self) -> Option<T>; 57 58 /// Push a work item to the queue. 59 /// 60 /// Returns an error if the queue is full or closed. 61 async fn push(&self, work: T) -> Result<()>; 62 63 /// Acknowledge that a work item has been successfully processed. 64 /// 65 /// This is used by reliable queue implementations to remove the item 66 /// from a temporary processing queue. Implementations that don't require 67 /// acknowledgment (like MPSC) can use the default no-op implementation. 68 async fn ack(&self, _item: &T) -> Result<()> { 69 // Default no-op implementation for queues that don't need acknowledgment 70 Ok(()) 71 } 72 73 /// Try to push a work item without blocking. 74 /// 75 /// Returns an error if the queue is full or closed. 76 async fn try_push(&self, work: T) -> Result<()> { 77 // Default implementation uses regular push 78 self.push(work).await 79 } 80 81 /// Get the current queue depth if available. 82 /// 83 /// Returns None if the implementation doesn't support queue depth. 84 async fn depth(&self) -> Option<usize> { 85 None 86 } 87 88 /// Check if the queue is healthy. 89 /// 90 /// Used for health checks and monitoring. 91 async fn is_healthy(&self) -> bool { 92 true 93 } 94} 95 96/// MPSC channel-based queue adapter implementation. 97/// 98/// This adapter uses tokio's multi-producer, single-consumer channel 99/// for in-memory queuing of work items. It's suitable for single-instance 100/// deployments with moderate throughput requirements. 101pub(crate) struct MpscQueueAdapter<T> 102where 103 T: Send + Sync + 'static, 104{ 105 receiver: Arc<Mutex<mpsc::Receiver<T>>>, 106 sender: mpsc::Sender<T>, 107} 108 109impl<T> MpscQueueAdapter<T> 110where 111 T: Send + Sync + 'static, 112{ 113 /// Create a new MPSC queue adapter with the specified buffer size. 114 pub(crate) fn new(buffer: usize) -> Self { 115 let (sender, receiver) = mpsc::channel(buffer); 116 Self { 117 receiver: Arc::new(Mutex::new(receiver)), 118 sender, 119 } 120 } 121 122 /// Create an adapter from existing MPSC channels (for backward compatibility). 123 pub(crate) fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self { 124 Self { 125 receiver: Arc::new(Mutex::new(receiver)), 126 sender, 127 } 128 } 129} 130 131#[async_trait] 132impl<T> QueueAdapter<T> for MpscQueueAdapter<T> 133where 134 T: Send + Sync + 'static, 135{ 136 async fn pull(&self) -> Option<T> { 137 let mut receiver = self.receiver.lock().await; 138 receiver.recv().await 139 } 140 141 async fn push(&self, work: T) -> Result<()> { 142 self.sender 143 .send(work) 144 .await 145 .map_err(|e| QueueError::PushFailed(e.to_string())) 146 } 147 148 async fn try_push(&self, work: T) -> Result<()> { 149 self.sender.try_send(work).map_err(|e| match e { 150 mpsc::error::TrySendError::Full(_) => QueueError::QueueFull, 151 mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed, 152 }) 153 } 154 155 async fn depth(&self) -> Option<usize> { 156 // Note: This is an approximation as mpsc doesn't provide exact depth 157 Some(self.sender.max_capacity() - self.sender.capacity()) 158 } 159 160 async fn is_healthy(&self) -> bool { 161 !self.sender.is_closed() 162 } 163} 164 165/// Work item for handle resolution tasks 166#[derive(Debug, Clone, Serialize, Deserialize)] 167pub struct HandleResolutionWork { 168 /// The handle to resolve 169 pub handle: String, 170} 171 172impl HandleResolutionWork { 173 /// Create a new handle resolution work item 174 pub fn new(handle: String) -> Self { 175 Self { handle } 176 } 177} 178 179/// Redis-backed queue adapter implementation. 180/// 181/// This adapter uses Redis lists with a reliable queue pattern: 182/// - LPUSH to push items to the primary queue 183/// - RPOPLPUSH to atomically move items from primary to worker queue 184/// - LREM to acknowledge processed items from worker queue 185/// 186/// This ensures at-least-once delivery semantics and allows for recovery 187/// of in-flight items if a worker crashes. 188pub(crate) struct RedisQueueAdapter<T> 189where 190 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 191{ 192 /// Redis connection pool 193 pool: RedisPool, 194 /// Unique worker ID for this adapter instance 195 worker_id: String, 196 /// Key prefix for all queues (default: "queue:handleresolver:") 197 key_prefix: String, 198 /// Timeout for blocking RPOPLPUSH operations 199 timeout_seconds: u64, 200 /// Type marker for generic parameter 201 _phantom: std::marker::PhantomData<T>, 202} 203 204impl<T> RedisQueueAdapter<T> 205where 206 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 207{ 208 /// Create a new Redis queue adapter with custom configuration 209 fn with_config( 210 pool: RedisPool, 211 worker_id: Option<String>, 212 key_prefix: String, 213 timeout_seconds: u64, 214 ) -> Self { 215 let worker_id = worker_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); 216 Self { 217 pool, 218 worker_id, 219 key_prefix, 220 timeout_seconds, 221 _phantom: std::marker::PhantomData, 222 } 223 } 224 225 /// Get the primary queue key 226 fn primary_queue_key(&self) -> String { 227 format!("{}primary", self.key_prefix) 228 } 229 230 /// Get the worker-specific temporary queue key 231 fn worker_queue_key(&self) -> String { 232 format!("{}{}", self.key_prefix, self.worker_id) 233 } 234} 235 236#[async_trait] 237impl<T> QueueAdapter<T> for RedisQueueAdapter<T> 238where 239 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 240{ 241 async fn pull(&self) -> Option<T> { 242 match self.pool.get().await { 243 Ok(mut conn) => { 244 let primary_key = self.primary_queue_key(); 245 let worker_key = self.worker_queue_key(); 246 247 // Use blocking RPOPLPUSH to atomically move item from primary to worker queue 248 let data: Option<Vec<u8>> = match conn 249 .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64) 250 .await 251 { 252 Ok(data) => data, 253 Err(e) => { 254 error!("Failed to pull from queue: {}", e); 255 return None; 256 } 257 }; 258 259 if let Some(data) = data { 260 // Deserialize the item 261 match serde_json::from_slice(&data) { 262 Ok(item) => { 263 debug!( 264 worker_id = %self.worker_id, 265 "Pulled item from queue" 266 ); 267 Some(item) 268 } 269 Err(e) => { 270 error!("Failed to deserialize item: {}", e); 271 // Remove the corrupted item from worker queue 272 let _: std::result::Result<(), _> = 273 conn.lrem(&worker_key, 1, &data).await; 274 None 275 } 276 } 277 } else { 278 None 279 } 280 } 281 Err(e) => { 282 error!("Failed to get Redis connection: {}", e); 283 None 284 } 285 } 286 } 287 288 async fn push(&self, work: T) -> Result<()> { 289 let mut conn = self 290 .pool 291 .get() 292 .await 293 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 294 295 let data = serde_json::to_vec(&work) 296 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 297 298 let primary_key = self.primary_queue_key(); 299 300 conn.lpush::<_, _, ()>(&primary_key, data) 301 .await 302 .map_err(|e| QueueError::RedisOperationFailed { 303 operation: "LPUSH".to_string(), 304 details: e.to_string(), 305 })?; 306 307 debug!("Pushed item to queue"); 308 Ok(()) 309 } 310 311 async fn ack(&self, item: &T) -> Result<()> { 312 let mut conn = self 313 .pool 314 .get() 315 .await 316 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 317 318 let data = 319 serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 320 321 let worker_key = self.worker_queue_key(); 322 323 // Remove exactly one occurrence of this item from the worker queue 324 let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| { 325 QueueError::RedisOperationFailed { 326 operation: "LREM".to_string(), 327 details: e.to_string(), 328 } 329 })?; 330 331 if removed == 0 { 332 warn!( 333 worker_id = %self.worker_id, 334 "Item not found in worker queue during acknowledgment" 335 ); 336 } else { 337 debug!( 338 worker_id = %self.worker_id, 339 "Acknowledged item" 340 ); 341 } 342 343 Ok(()) 344 } 345 346 async fn depth(&self) -> Option<usize> { 347 match self.pool.get().await { 348 Ok(mut conn) => { 349 let primary_key = self.primary_queue_key(); 350 match conn.llen::<_, usize>(&primary_key).await { 351 Ok(len) => Some(len), 352 Err(e) => { 353 error!("Failed to get queue depth: {}", e); 354 None 355 } 356 } 357 } 358 Err(e) => { 359 error!("Failed to get Redis connection: {}", e); 360 None 361 } 362 } 363 } 364 365 async fn is_healthy(&self) -> bool { 366 match self.pool.get().await { 367 Ok(mut conn) => { 368 // Ping Redis to check health 369 match deadpool_redis::redis::cmd("PING") 370 .query_async::<String>(&mut conn) 371 .await 372 { 373 Ok(response) => response == "PONG", 374 Err(_) => false, 375 } 376 } 377 Err(_) => false, 378 } 379 } 380} 381 382/// No-operation queue adapter that discards all work items. 383/// 384/// This adapter is useful for configurations where queuing is disabled 385/// or as a fallback when other queue adapters fail to initialize. 386pub(crate) struct NoopQueueAdapter<T> 387where 388 T: Send + Sync + 'static, 389{ 390 _phantom: std::marker::PhantomData<T>, 391} 392 393impl<T> NoopQueueAdapter<T> 394where 395 T: Send + Sync + 'static, 396{ 397 /// Create a new no-op queue adapter 398 pub(crate) fn new() -> Self { 399 Self { 400 _phantom: std::marker::PhantomData, 401 } 402 } 403} 404 405impl<T> Default for NoopQueueAdapter<T> 406where 407 T: Send + Sync + 'static, 408{ 409 fn default() -> Self { 410 Self::new() 411 } 412} 413 414#[async_trait] 415impl<T> QueueAdapter<T> for NoopQueueAdapter<T> 416where 417 T: Send + Sync + 'static, 418{ 419 async fn pull(&self) -> Option<T> { 420 // Never returns any work 421 tokio::time::sleep(std::time::Duration::from_secs(60)).await; 422 None 423 } 424 425 async fn push(&self, _work: T) -> Result<()> { 426 // Silently discard the work 427 Ok(()) 428 } 429 430 async fn ack(&self, _item: &T) -> Result<()> { 431 // No-op 432 Ok(()) 433 } 434 435 async fn try_push(&self, _work: T) -> Result<()> { 436 // Silently discard the work 437 Ok(()) 438 } 439 440 async fn depth(&self) -> Option<usize> { 441 // Always empty 442 Some(0) 443 } 444 445 async fn is_healthy(&self) -> bool { 446 // Always healthy 447 true 448 } 449} 450 451// ========= Factory Functions for Queue Adapters ========= 452 453/// Create a new MPSC queue adapter with the specified buffer size. 454/// 455/// This creates an in-memory queue suitable for single-instance deployments. 456/// 457/// # Arguments 458/// 459/// * `buffer` - The buffer size for the channel 460pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>> 461where 462 T: Send + Sync + 'static, 463{ 464 Arc::new(MpscQueueAdapter::new(buffer)) 465} 466 467/// Create an MPSC queue adapter from existing channels. 468/// 469/// This allows integration with existing channel-based architectures. 470/// 471/// # Arguments 472/// 473/// * `sender` - The sender half of the channel 474/// * `receiver` - The receiver half of the channel 475pub fn create_mpsc_queue_from_channel<T>( 476 sender: mpsc::Sender<T>, 477 receiver: mpsc::Receiver<T>, 478) -> Arc<dyn QueueAdapter<T>> 479where 480 T: Send + Sync + 'static, 481{ 482 Arc::new(MpscQueueAdapter::from_channel(sender, receiver)) 483} 484 485/// Create a new Redis-backed queue adapter. 486/// 487/// This creates a distributed queue suitable for multi-instance deployments. 488/// 489/// # Arguments 490/// 491/// * `pool` - Redis connection pool 492/// * `worker_id` - Optional worker identifier (auto-generated if None) 493/// * `key_prefix` - Redis key prefix for queue operations 494/// * `timeout_seconds` - Timeout for blocking operations 495pub fn create_redis_queue<T>( 496 pool: RedisPool, 497 worker_id: Option<String>, 498 key_prefix: String, 499 timeout_seconds: u64, 500) -> Arc<dyn QueueAdapter<T>> 501where 502 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 503{ 504 Arc::new(RedisQueueAdapter::with_config( 505 pool, 506 worker_id, 507 key_prefix, 508 timeout_seconds, 509 )) 510} 511 512/// Create a no-operation queue adapter. 513/// 514/// This creates a queue that discards all work items, useful for testing 515/// or when queue processing is disabled. 516pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>> 517where 518 T: Send + Sync + 'static, 519{ 520 Arc::new(NoopQueueAdapter::new()) 521} 522 523#[cfg(test)] 524mod tests { 525 use super::*; 526 527 #[tokio::test] 528 async fn test_mpsc_queue_adapter_push_pull() { 529 let adapter = Arc::new(MpscQueueAdapter::<String>::new(10)); 530 531 // Test push 532 adapter.push("test".to_string()).await.unwrap(); 533 534 // Test pull 535 let pulled = adapter.pull().await; 536 assert!(pulled.is_some()); 537 assert_eq!(pulled.unwrap(), "test"); 538 } 539 540 #[tokio::test] 541 async fn test_handle_resolution_work() { 542 let work = HandleResolutionWork::new("alice.example.com".to_string()); 543 544 assert_eq!(work.handle, "alice.example.com"); 545 } 546 547 #[tokio::test] 548 async fn test_redis_queue_adapter_push_pull() { 549 let pool = match crate::test_helpers::get_test_redis_pool() { 550 Some(p) => p, 551 None => return, 552 }; 553 554 // Create adapter with unique prefix for testing 555 let test_prefix = format!("test:queue:{}:", uuid::Uuid::new_v4()); 556 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 557 pool.clone(), 558 Some("test-worker".to_string()), 559 test_prefix.clone(), 560 1, // 1 second timeout for tests 561 )); 562 563 // Test push 564 adapter.push("test-item".to_string()).await.unwrap(); 565 566 // Test pull 567 let pulled = adapter.pull().await; 568 assert!(pulled.is_some()); 569 assert_eq!(pulled.unwrap(), "test-item"); 570 571 // Test ack 572 adapter 573 .ack(&"test-item".to_string()) 574 .await 575 .expect("Ack should succeed"); 576 577 // Clean up test data - manually clean worker queue since cleanup was removed 578 // In production, items would timeout or be processed 579 } 580 581 #[tokio::test] 582 async fn test_redis_queue_adapter_reliable_queue() { 583 let pool = match crate::test_helpers::get_test_redis_pool() { 584 Some(p) => p, 585 None => return, 586 }; 587 588 let test_prefix = format!("test:queue:{}:", uuid::Uuid::new_v4()); 589 let worker_id = "test-worker-reliable"; 590 591 // Create first adapter 592 let adapter1 = Arc::new(RedisQueueAdapter::<String>::with_config( 593 pool.clone(), 594 Some(worker_id.to_string()), 595 test_prefix.clone(), 596 1, 597 )); 598 599 // Push multiple items 600 adapter1.push("item1".to_string()).await.unwrap(); 601 adapter1.push("item2".to_string()).await.unwrap(); 602 adapter1.push("item3".to_string()).await.unwrap(); 603 604 // Pull but don't ack (simulating worker crash) 605 let item1 = adapter1.pull().await; 606 assert!(item1.is_some()); 607 assert_eq!(item1.unwrap(), "item1"); 608 609 // Create second adapter with same worker_id (simulating restart) 610 let adapter2 = Arc::new(RedisQueueAdapter::<String>::with_config( 611 pool.clone(), 612 Some(worker_id.to_string()), 613 test_prefix.clone(), 614 1, 615 )); 616 617 // In a real scenario, unacked items would be handled by timeout or manual recovery 618 // For this test, we just verify the item is in the worker queue 619 let recovered = adapter2.pull().await; 620 assert!(recovered.is_some()); 621 } 622 623 #[tokio::test] 624 async fn test_redis_queue_adapter_depth() { 625 let pool = match crate::test_helpers::get_test_redis_pool() { 626 Some(p) => p, 627 None => return, 628 }; 629 630 let test_prefix = format!("test:queue:{}:", uuid::Uuid::new_v4()); 631 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 632 pool.clone(), 633 None, 634 test_prefix.clone(), 635 1, 636 )); 637 638 // Initially empty 639 let depth = adapter.depth().await; 640 assert_eq!(depth, Some(0)); 641 642 // Push items and check depth 643 adapter.push("item1".to_string()).await.unwrap(); 644 assert_eq!(adapter.depth().await, Some(1)); 645 646 adapter.push("item2".to_string()).await.unwrap(); 647 assert_eq!(adapter.depth().await, Some(2)); 648 649 // Pull and check depth decreases 650 let _ = adapter.pull().await; 651 // Note: depth checks primary queue, not worker queue 652 assert_eq!(adapter.depth().await, Some(1)); 653 654 // Test cleanup is automatic when adapter is dropped 655 } 656 657 #[tokio::test] 658 async fn test_redis_queue_adapter_health() { 659 let pool = match crate::test_helpers::get_test_redis_pool() { 660 Some(p) => p, 661 None => return, 662 }; 663 664 let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 665 pool, 666 None, 667 "test:queue:health:".to_string(), 668 1, 669 )); 670 671 // Should be healthy if Redis is running 672 assert!(adapter.is_healthy().await); 673 } 674}