QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.

refactor: queue package org

+2264 -1635
-1
Cargo.lock
··· 2012 2012 "bincode", 2013 2013 "clap", 2014 2014 "deadpool-redis", 2015 - "http", 2016 2015 "metrohash", 2017 2016 "reqwest", 2018 2017 "serde",
-1
Cargo.toml
··· 21 21 bincode = { version = "2.0.1", features = ["serde"] } 22 22 clap = { version = "4", features = ["derive", "env"] } 23 23 deadpool-redis = { version = "0.22", features = ["connection-manager", "tokio-comp", "tokio-rustls-comp"] } 24 - http = "1.0" 25 24 metrohash = "1.0.7" 26 25 reqwest = { version = "0.12", features = ["json"] } 27 26 serde = { version = "1.0", features = ["derive"] }
+1 -1
src/bin/quickdid.rs
··· 15 15 sqlite_schema::create_sqlite_pool, 16 16 handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config}, 17 17 http::{AppContext, create_router}, 18 - queue_adapter::{ 18 + queue::{ 19 19 HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue, 20 20 create_redis_queue, create_sqlite_queue, create_sqlite_queue_with_max_size, 21 21 },
+2 -2
src/handle_resolver_task.rs
··· 5 5 //! and ensures resolved handles are cached for efficient subsequent lookups. 6 6 7 7 use crate::handle_resolver::HandleResolver; 8 - use crate::queue_adapter::{HandleResolutionWork, QueueAdapter}; 8 + use crate::queue::{HandleResolutionWork, QueueAdapter}; 9 9 use anyhow::Result; 10 10 use std::sync::Arc; 11 11 use std::time::Duration; ··· 259 259 #[cfg(test)] 260 260 mod tests { 261 261 use super::*; 262 - use crate::queue_adapter::MpscQueueAdapter; 262 + use crate::queue::MpscQueueAdapter; 263 263 use async_trait::async_trait; 264 264 use std::sync::Arc; 265 265 use tokio::sync::mpsc;
+1 -1
src/http/handle_xrpc_resolve_handle.rs
··· 2 2 3 3 use crate::{ 4 4 handle_resolver::HandleResolver, 5 - queue_adapter::{HandleResolutionWork, QueueAdapter}, 5 + queue::{HandleResolutionWork, QueueAdapter}, 6 6 }; 7 7 8 8 use atproto_identity::resolve::{InputType, parse_input};
+2 -2
src/http/server.rs
··· 1 1 use crate::handle_resolver::HandleResolver; 2 - use crate::queue_adapter::{HandleResolutionWork, QueueAdapter}; 2 + use crate::queue::{HandleResolutionWork, QueueAdapter}; 3 3 use axum::{ 4 4 Router, 5 5 extract::State, 6 6 response::{Html, IntoResponse, Json, Response}, 7 7 routing::get, 8 + http::StatusCode, 8 9 }; 9 - use http::StatusCode; 10 10 use serde_json::json; 11 11 use std::sync::Arc; 12 12
+1 -1
src/lib.rs
··· 6 6 // Semi-public modules - needed by binary but with limited exposure 7 7 pub mod cache; // Only create_redis_pool exposed 8 8 pub mod handle_resolver_task; // Factory functions and TaskConfig exposed 9 - pub mod queue_adapter; // Trait and factory functions exposed 9 + pub mod queue; // Queue adapter system with trait and factory functions 10 10 pub mod sqlite_schema; // SQLite schema management functions exposed 11 11 pub mod task_manager; // Only spawn_cancellable_task exposed 12 12
+189
src/queue/adapter.rs
··· 1 + //! Queue adapter trait definition. 2 + //! 3 + //! This module defines the core `QueueAdapter` trait that provides a common 4 + //! interface for different queue implementations (MPSC, Redis, SQLite, etc.). 5 + 6 + use async_trait::async_trait; 7 + use super::error::Result; 8 + 9 + /// Generic trait for queue adapters that can work with any work type. 10 + /// 11 + /// This trait provides a common interface for different queue implementations 12 + /// (MPSC, Redis, PostgreSQL, SQLite, etc.) allowing them to be used interchangeably. 13 + /// 14 + /// # Type Parameters 15 + /// 16 + /// * `T` - The type of work items that this queue processes. Must be `Send + Sync + 'static`. 17 + /// 18 + /// # Implementation Notes 19 + /// 20 + /// Implementors should ensure that: 21 + /// - `pull()` blocks until an item is available or the queue is closed 22 + /// - `push()` may block if the queue has a bounded capacity 23 + /// - `ack()` is used for reliable delivery semantics (can be no-op for simple queues) 24 + /// - `try_push()` never blocks and returns an error if the queue is full 25 + /// 26 + /// # Examples 27 + /// 28 + /// ```no_run 29 + /// use quickdid::queue::{QueueAdapter, MpscQueueAdapter}; 30 + /// use std::sync::Arc; 31 + /// 32 + /// # async fn example() -> anyhow::Result<()> { 33 + /// // Create a queue adapter for String work items 34 + /// let queue: Arc<dyn QueueAdapter<String>> = Arc::new(MpscQueueAdapter::new(100)); 35 + /// 36 + /// // Push work to the queue 37 + /// queue.push("process-this".to_string()).await?; 38 + /// 39 + /// // Pull work from the queue 40 + /// if let Some(work) = queue.pull().await { 41 + /// println!("Processing: {}", work); 42 + /// // Acknowledge completion 43 + /// queue.ack(&work).await?; 44 + /// } 45 + /// # Ok(()) 46 + /// # } 47 + /// ``` 48 + #[async_trait] 49 + pub trait QueueAdapter<T>: Send + Sync 50 + where 51 + T: Send + Sync + 'static, 52 + { 53 + /// Pull the next work item from the queue. 54 + /// 55 + /// This method blocks until an item is available or the queue is closed. 56 + /// Returns `None` if the queue is closed or empty (depending on implementation). 57 + /// 58 + /// # Returns 59 + /// 60 + /// * `Some(T)` - The next work item from the queue 61 + /// * `None` - The queue is closed or empty 62 + async fn pull(&self) -> Option<T>; 63 + 64 + /// Push a work item to the queue. 65 + /// 66 + /// This method may block if the queue has bounded capacity and is full. 67 + /// 68 + /// # Arguments 69 + /// 70 + /// * `work` - The work item to add to the queue 71 + /// 72 + /// # Errors 73 + /// 74 + /// Returns an error if: 75 + /// - The queue is full (for bounded queues) 76 + /// - The queue is closed 77 + /// - Serialization fails (for persistent queues) 78 + /// - Backend connection fails (for Redis/SQLite) 79 + async fn push(&self, work: T) -> Result<()>; 80 + 81 + /// Acknowledge that a work item has been successfully processed. 82 + /// 83 + /// This is used by reliable queue implementations to remove the item 84 + /// from a temporary processing queue. Implementations that don't require 85 + /// acknowledgment (like MPSC) can use the default no-op implementation. 86 + /// 87 + /// # Arguments 88 + /// 89 + /// * `item` - The work item to acknowledge 90 + /// 91 + /// # Errors 92 + /// 93 + /// Returns an error if acknowledgment fails (backend-specific). 94 + async fn ack(&self, _item: &T) -> Result<()> { 95 + // Default no-op implementation for queues that don't need acknowledgment 96 + Ok(()) 97 + } 98 + 99 + /// Try to push a work item without blocking. 100 + /// 101 + /// This method returns immediately with an error if the queue is full. 102 + /// 103 + /// # Arguments 104 + /// 105 + /// * `work` - The work item to add to the queue 106 + /// 107 + /// # Errors 108 + /// 109 + /// Returns an error if: 110 + /// - The queue is full 111 + /// - The queue is closed 112 + /// - Other backend-specific errors occur 113 + async fn try_push(&self, work: T) -> Result<()> { 114 + // Default implementation uses regular push 115 + self.push(work).await 116 + } 117 + 118 + /// Get the current queue depth if available. 119 + /// 120 + /// # Returns 121 + /// 122 + /// * `Some(usize)` - The number of items currently in the queue 123 + /// * `None` - Queue depth is not available or cannot be determined 124 + async fn depth(&self) -> Option<usize> { 125 + None 126 + } 127 + 128 + /// Check if the queue is healthy. 129 + /// 130 + /// Used for health checks and monitoring. Implementations should verify 131 + /// backend connectivity and basic functionality. 132 + /// 133 + /// # Returns 134 + /// 135 + /// * `true` - The queue is operational 136 + /// * `false` - The queue has issues or is disconnected 137 + async fn is_healthy(&self) -> bool { 138 + true 139 + } 140 + } 141 + 142 + #[cfg(test)] 143 + mod tests { 144 + use super::*; 145 + 146 + // Mock implementation for testing the trait 147 + struct MockQueue<T> { 148 + _phantom: std::marker::PhantomData<T>, 149 + } 150 + 151 + impl<T> MockQueue<T> { 152 + fn new() -> Self { 153 + Self { 154 + _phantom: std::marker::PhantomData, 155 + } 156 + } 157 + } 158 + 159 + #[async_trait] 160 + impl<T> QueueAdapter<T> for MockQueue<T> 161 + where 162 + T: Send + Sync + 'static, 163 + { 164 + async fn pull(&self) -> Option<T> { 165 + None 166 + } 167 + 168 + async fn push(&self, _work: T) -> Result<()> { 169 + Ok(()) 170 + } 171 + } 172 + 173 + #[tokio::test] 174 + async fn test_default_trait_methods() { 175 + let queue = MockQueue::<String>::new(); 176 + 177 + // Test default ack implementation 178 + assert!(queue.ack(&"test".to_string()).await.is_ok()); 179 + 180 + // Test default try_push implementation 181 + assert!(queue.try_push("test".to_string()).await.is_ok()); 182 + 183 + // Test default depth implementation 184 + assert_eq!(queue.depth().await, None); 185 + 186 + // Test default is_healthy implementation 187 + assert!(queue.is_healthy().await); 188 + } 189 + }
+76
src/queue/error.rs
··· 1 + //! Queue operation error types. 2 + //! 3 + //! This module defines the error types that can occur during queue operations, 4 + //! including push failures, serialization issues, and backend-specific errors. 5 + 6 + use thiserror::Error; 7 + 8 + /// Queue operation errors. 9 + /// 10 + /// These errors represent various failure modes that can occur when working 11 + /// with queue adapters, from connection issues to serialization problems. 12 + #[derive(Error, Debug)] 13 + pub enum QueueError { 14 + /// Failed to push an item to the queue. 15 + #[error("error-quickdid-queue-1 Failed to push to queue: {0}")] 16 + PushFailed(String), 17 + 18 + /// The queue is full and cannot accept new items. 19 + #[error("error-quickdid-queue-2 Queue is full")] 20 + QueueFull, 21 + 22 + /// The queue has been closed and is no longer accepting items. 23 + #[error("error-quickdid-queue-3 Queue is closed")] 24 + QueueClosed, 25 + 26 + /// Redis connection failed. 27 + #[error("error-quickdid-queue-4 Redis connection failed: {0}")] 28 + RedisConnectionFailed(String), 29 + 30 + /// Redis operation failed. 31 + #[error("error-quickdid-queue-5 Redis operation failed: {operation}: {details}")] 32 + RedisOperationFailed { 33 + /// The Redis operation that failed 34 + operation: String, 35 + /// Details about the failure 36 + details: String 37 + }, 38 + 39 + /// Failed to serialize an item for storage. 40 + #[error("error-quickdid-queue-6 Serialization failed: {0}")] 41 + SerializationFailed(String), 42 + 43 + /// Failed to deserialize an item from storage. 44 + #[error("error-quickdid-queue-7 Deserialization failed: {0}")] 45 + DeserializationFailed(String), 46 + 47 + /// Item not found in worker queue during acknowledgment. 48 + #[error("error-quickdid-queue-8 Item not found in worker queue during acknowledgment")] 49 + AckItemNotFound, 50 + } 51 + 52 + /// Result type alias for queue operations. 53 + pub type Result<T> = std::result::Result<T, QueueError>; 54 + 55 + #[cfg(test)] 56 + mod tests { 57 + use super::*; 58 + 59 + #[test] 60 + fn test_error_messages() { 61 + let err = QueueError::PushFailed("test failure".to_string()); 62 + assert!(err.to_string().contains("error-quickdid-queue-1")); 63 + assert!(err.to_string().contains("test failure")); 64 + 65 + let err = QueueError::QueueFull; 66 + assert_eq!(err.to_string(), "error-quickdid-queue-2 Queue is full"); 67 + 68 + let err = QueueError::RedisOperationFailed { 69 + operation: "LPUSH".to_string(), 70 + details: "connection timeout".to_string(), 71 + }; 72 + assert!(err.to_string().contains("error-quickdid-queue-5")); 73 + assert!(err.to_string().contains("LPUSH")); 74 + assert!(err.to_string().contains("connection timeout")); 75 + } 76 + }
+330
src/queue/factory.rs
··· 1 + //! Factory functions for creating queue adapters. 2 + //! 3 + //! This module provides convenient factory functions for creating different 4 + //! types of queue adapters with appropriate configurations. 5 + 6 + use deadpool_redis::Pool as RedisPool; 7 + use serde::{Deserialize, Serialize}; 8 + use std::sync::Arc; 9 + use tokio::sync::mpsc; 10 + 11 + use super::{ 12 + adapter::QueueAdapter, 13 + mpsc::MpscQueueAdapter, 14 + noop::NoopQueueAdapter, 15 + redis::RedisQueueAdapter, 16 + sqlite::SqliteQueueAdapter, 17 + }; 18 + 19 + // ========= MPSC Queue Factories ========= 20 + 21 + /// Create a new MPSC queue adapter with the specified buffer size. 22 + /// 23 + /// This creates an in-memory queue suitable for single-instance deployments. 24 + /// 25 + /// # Arguments 26 + /// 27 + /// * `buffer` - The buffer size for the channel 28 + /// 29 + /// # Examples 30 + /// 31 + /// ``` 32 + /// use quickdid::queue::create_mpsc_queue; 33 + /// 34 + /// let queue = create_mpsc_queue::<String>(100); 35 + /// ``` 36 + pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>> 37 + where 38 + T: Send + Sync + 'static, 39 + { 40 + Arc::new(MpscQueueAdapter::new(buffer)) 41 + } 42 + 43 + /// Create an MPSC queue adapter from existing channels. 44 + /// 45 + /// This allows integration with existing channel-based architectures. 46 + /// 47 + /// # Arguments 48 + /// 49 + /// * `sender` - The sender half of the channel 50 + /// * `receiver` - The receiver half of the channel 51 + /// 52 + /// # Examples 53 + /// 54 + /// ``` 55 + /// use tokio::sync::mpsc; 56 + /// use quickdid::queue::create_mpsc_queue_from_channel; 57 + /// 58 + /// let (sender, receiver) = mpsc::channel::<String>(50); 59 + /// let queue = create_mpsc_queue_from_channel(sender, receiver); 60 + /// ``` 61 + pub fn create_mpsc_queue_from_channel<T>( 62 + sender: mpsc::Sender<T>, 63 + receiver: mpsc::Receiver<T>, 64 + ) -> Arc<dyn QueueAdapter<T>> 65 + where 66 + T: Send + Sync + 'static, 67 + { 68 + Arc::new(MpscQueueAdapter::from_channel(sender, receiver)) 69 + } 70 + 71 + // ========= Redis Queue Factories ========= 72 + 73 + /// Create a new Redis-backed queue adapter. 74 + /// 75 + /// This creates a distributed queue suitable for multi-instance deployments. 76 + /// 77 + /// # Arguments 78 + /// 79 + /// * `pool` - Redis connection pool 80 + /// * `worker_id` - Worker identifier for this queue instance 81 + /// * `key_prefix` - Redis key prefix for queue operations 82 + /// * `timeout_seconds` - Timeout for blocking operations 83 + /// 84 + /// # Examples 85 + /// 86 + /// ```no_run 87 + /// use quickdid::queue::create_redis_queue; 88 + /// use deadpool_redis::Config; 89 + /// 90 + /// # async fn example() -> anyhow::Result<()> { 91 + /// let cfg = Config::from_url("redis://localhost:6379"); 92 + /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; 93 + /// 94 + /// let queue = create_redis_queue::<String>( 95 + /// pool, 96 + /// "worker-1".to_string(), 97 + /// "queue:myapp:".to_string(), 98 + /// 5, 99 + /// ); 100 + /// # Ok(()) 101 + /// # } 102 + /// ``` 103 + pub fn create_redis_queue<T>( 104 + pool: RedisPool, 105 + worker_id: String, 106 + key_prefix: String, 107 + timeout_seconds: u64, 108 + ) -> Arc<dyn QueueAdapter<T>> 109 + where 110 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 111 + { 112 + Arc::new(RedisQueueAdapter::new( 113 + pool, 114 + worker_id, 115 + key_prefix, 116 + timeout_seconds, 117 + )) 118 + } 119 + 120 + // ========= SQLite Queue Factories ========= 121 + 122 + /// Create a new SQLite queue adapter with unlimited queue size. 123 + /// 124 + /// This creates a persistent queue backed by SQLite database suitable 125 + /// for single-instance deployments that need persistence across restarts. 126 + /// The queue has no size limit and may grow unbounded. 127 + /// 128 + /// # Arguments 129 + /// 130 + /// * `pool` - SQLite connection pool 131 + /// 132 + /// # Examples 133 + /// 134 + /// ```no_run 135 + /// use quickdid::queue::{create_sqlite_queue, HandleResolutionWork}; 136 + /// use quickdid::sqlite_schema::create_sqlite_pool; 137 + /// 138 + /// # async fn example() -> anyhow::Result<()> { 139 + /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 140 + /// let queue = create_sqlite_queue::<HandleResolutionWork>(pool); 141 + /// # Ok(()) 142 + /// # } 143 + /// ``` 144 + pub fn create_sqlite_queue<T>(pool: sqlx::SqlitePool) -> Arc<dyn QueueAdapter<T>> 145 + where 146 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 147 + { 148 + Arc::new(SqliteQueueAdapter::new(pool)) 149 + } 150 + 151 + /// Create a new SQLite queue adapter with work shedding. 152 + /// 153 + /// This creates a persistent queue with configurable maximum size. 154 + /// When the queue exceeds `max_size`, the oldest entries are automatically 155 + /// deleted to maintain the limit, preserving the most recent work items. 156 + /// 157 + /// # Arguments 158 + /// 159 + /// * `pool` - SQLite connection pool 160 + /// * `max_size` - Maximum number of entries (0 = unlimited) 161 + /// 162 + /// # Work Shedding Behavior 163 + /// 164 + /// - New work items are always accepted 165 + /// - When queue size exceeds `max_size`, oldest entries are deleted 166 + /// - Deletion happens atomically with insertion in a single transaction 167 + /// - Essential for long-running deployments to prevent disk space issues 168 + /// 169 + /// # Examples 170 + /// 171 + /// ```no_run 172 + /// use quickdid::queue::{create_sqlite_queue_with_max_size, HandleResolutionWork}; 173 + /// use quickdid::sqlite_schema::create_sqlite_pool; 174 + /// 175 + /// # async fn example() -> anyhow::Result<()> { 176 + /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 177 + /// // Limit queue to 10,000 entries with automatic work shedding 178 + /// let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 10000); 179 + /// # Ok(()) 180 + /// # } 181 + /// ``` 182 + pub fn create_sqlite_queue_with_max_size<T>( 183 + pool: sqlx::SqlitePool, 184 + max_size: u64, 185 + ) -> Arc<dyn QueueAdapter<T>> 186 + where 187 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 188 + { 189 + Arc::new(SqliteQueueAdapter::with_max_size(pool, max_size)) 190 + } 191 + 192 + // ========= No-op Queue Factory ========= 193 + 194 + /// Create a no-operation queue adapter. 195 + /// 196 + /// This creates a queue that discards all work items, useful for testing 197 + /// or when queue processing is disabled. 198 + /// 199 + /// # Examples 200 + /// 201 + /// ``` 202 + /// use quickdid::queue::create_noop_queue; 203 + /// 204 + /// let queue = create_noop_queue::<String>(); 205 + /// ``` 206 + pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>> 207 + where 208 + T: Send + Sync + 'static, 209 + { 210 + Arc::new(NoopQueueAdapter::new()) 211 + } 212 + 213 + #[cfg(test)] 214 + mod tests { 215 + use super::*; 216 + use crate::queue::HandleResolutionWork; 217 + 218 + #[tokio::test] 219 + async fn test_create_mpsc_queue() { 220 + let queue = create_mpsc_queue::<String>(10); 221 + 222 + queue.push("test".to_string()).await.unwrap(); 223 + let item = queue.pull().await; 224 + assert_eq!(item, Some("test".to_string())); 225 + } 226 + 227 + #[tokio::test] 228 + async fn test_create_mpsc_queue_from_channel() { 229 + let (sender, receiver) = mpsc::channel(5); 230 + let queue = create_mpsc_queue_from_channel(sender.clone(), receiver); 231 + 232 + // Send via original sender 233 + sender.send("external".to_string()).await.unwrap(); 234 + 235 + // Receive via queue 236 + let item = queue.pull().await; 237 + assert_eq!(item, Some("external".to_string())); 238 + } 239 + 240 + #[tokio::test] 241 + async fn test_create_noop_queue() { 242 + let queue = create_noop_queue::<String>(); 243 + 244 + // Should accept pushes 245 + queue.push("ignored".to_string()).await.unwrap(); 246 + 247 + // Should report as healthy 248 + assert!(queue.is_healthy().await); 249 + 250 + // Should report depth as 0 251 + assert_eq!(queue.depth().await, Some(0)); 252 + } 253 + 254 + #[tokio::test] 255 + async fn test_create_sqlite_queue() { 256 + // Create in-memory SQLite database for testing 257 + let pool = sqlx::SqlitePool::connect("sqlite::memory:") 258 + .await 259 + .expect("Failed to connect to in-memory SQLite"); 260 + 261 + // Create the queue schema 262 + crate::sqlite_schema::create_schema(&pool) 263 + .await 264 + .expect("Failed to create schema"); 265 + 266 + let queue = create_sqlite_queue::<HandleResolutionWork>(pool); 267 + 268 + let work = HandleResolutionWork::new("test.example.com".to_string()); 269 + queue.push(work.clone()).await.unwrap(); 270 + 271 + let pulled = queue.pull().await; 272 + assert_eq!(pulled, Some(work)); 273 + } 274 + 275 + #[tokio::test] 276 + async fn test_create_sqlite_queue_with_max_size() { 277 + // Create in-memory SQLite database for testing 278 + let pool = sqlx::SqlitePool::connect("sqlite::memory:") 279 + .await 280 + .expect("Failed to connect to in-memory SQLite"); 281 + 282 + // Create the queue schema 283 + crate::sqlite_schema::create_schema(&pool) 284 + .await 285 + .expect("Failed to create schema"); 286 + 287 + // Create queue with small max size 288 + let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 5); 289 + 290 + // Push items 291 + for i in 0..10 { 292 + let work = HandleResolutionWork::new(format!("test-{}.example.com", i)); 293 + queue.push(work).await.unwrap(); 294 + } 295 + 296 + // Should have limited items due to work shedding 297 + let depth = queue.depth().await.unwrap(); 298 + assert!(depth <= 5, "Queue should have at most 5 items after work shedding"); 299 + } 300 + 301 + #[tokio::test] 302 + async fn test_create_redis_queue() { 303 + let pool = match crate::test_helpers::get_test_redis_pool() { 304 + Some(p) => p, 305 + None => { 306 + eprintln!("Skipping Redis test - no Redis connection available"); 307 + return; 308 + } 309 + }; 310 + 311 + let test_prefix = format!( 312 + "test:factory:{}:", 313 + std::time::SystemTime::now() 314 + .duration_since(std::time::UNIX_EPOCH) 315 + .unwrap() 316 + .as_nanos() 317 + ); 318 + 319 + let queue = create_redis_queue::<String>( 320 + pool, 321 + "test-worker".to_string(), 322 + test_prefix, 323 + 1, 324 + ); 325 + 326 + queue.push("test-item".to_string()).await.unwrap(); 327 + let pulled = queue.pull().await; 328 + assert_eq!(pulled, Some("test-item".to_string())); 329 + } 330 + }
+83
src/queue/mod.rs
··· 1 + //! Queue adapter system for work queue abstraction. 2 + //! 3 + //! This module provides a generic trait and implementations for queue adapters 4 + //! that can be used with any work type for handle resolution and other tasks. 5 + //! 6 + //! # Architecture 7 + //! 8 + //! The queue system is designed with the following components: 9 + //! 10 + //! - **Trait**: `QueueAdapter` - Common interface for all queue implementations 11 + //! - **Implementations**: 12 + //! - `MpscQueueAdapter` - In-memory MPSC channel-based queue 13 + //! - `RedisQueueAdapter` - Distributed Redis-backed queue 14 + //! - `SqliteQueueAdapter` - Persistent SQLite-backed queue 15 + //! - `NoopQueueAdapter` - No-operation queue for testing 16 + //! - **Work Types**: `HandleResolutionWork` - Work items for handle resolution 17 + //! - **Factory Functions**: Convenient functions for creating queue adapters 18 + //! 19 + //! # Examples 20 + //! 21 + //! ## Simple In-Memory Queue 22 + //! 23 + //! ``` 24 + //! use quickdid::queue::{create_mpsc_queue, QueueAdapter}; 25 + //! 26 + //! # async fn example() -> anyhow::Result<()> { 27 + //! let queue = create_mpsc_queue::<String>(100); 28 + //! 29 + //! queue.push("work-item".to_string()).await?; 30 + //! if let Some(item) = queue.pull().await { 31 + //! println!("Processing: {}", item); 32 + //! } 33 + //! # Ok(()) 34 + //! # } 35 + //! ``` 36 + //! 37 + //! ## Persistent Queue with Work Shedding 38 + //! 39 + //! ```no_run 40 + //! use quickdid::queue::{create_sqlite_queue_with_max_size, HandleResolutionWork}; 41 + //! use quickdid::sqlite_schema::create_sqlite_pool; 42 + //! 43 + //! # async fn example() -> anyhow::Result<()> { 44 + //! let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 45 + //! let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 10000); 46 + //! 47 + //! let work = HandleResolutionWork::new("alice.bsky.social".to_string()); 48 + //! queue.push(work).await?; 49 + //! # Ok(()) 50 + //! # } 51 + //! ``` 52 + 53 + // Internal modules 54 + mod adapter; 55 + mod error; 56 + mod factory; 57 + mod mpsc; 58 + mod noop; 59 + mod redis; 60 + mod sqlite; 61 + mod work; 62 + 63 + // Re-export core types 64 + pub use adapter::QueueAdapter; 65 + pub use error::{QueueError, Result}; 66 + pub use work::HandleResolutionWork; 67 + 68 + // Re-export implementations (with limited visibility) 69 + pub use mpsc::MpscQueueAdapter; 70 + pub use noop::NoopQueueAdapter; 71 + pub use redis::RedisQueueAdapter; 72 + pub use sqlite::SqliteQueueAdapter; 73 + 74 + // Re-export factory functions 75 + pub use factory::{ 76 + create_mpsc_queue, 77 + create_mpsc_queue_from_channel, 78 + create_noop_queue, 79 + create_redis_queue, 80 + create_sqlite_queue, 81 + create_sqlite_queue_with_max_size, 82 + }; 83 +
+286
src/queue/mpsc.rs
··· 1 + //! MPSC channel-based queue adapter implementation. 2 + //! 3 + //! This module provides an in-memory queue implementation using Tokio's 4 + //! multi-producer, single-consumer (MPSC) channels. It's suitable for 5 + //! single-instance deployments with moderate throughput requirements. 6 + 7 + use async_trait::async_trait; 8 + use std::sync::Arc; 9 + use tokio::sync::{mpsc, Mutex}; 10 + 11 + use super::adapter::QueueAdapter; 12 + use super::error::{QueueError, Result}; 13 + 14 + /// MPSC channel-based queue adapter implementation. 15 + /// 16 + /// This adapter uses tokio's multi-producer, single-consumer channel 17 + /// for in-memory queuing of work items. It provides fast, lock-free 18 + /// operation for single-instance deployments. 19 + /// 20 + /// # Features 21 + /// 22 + /// - In-memory operation (no persistence) 23 + /// - Bounded capacity with backpressure 24 + /// - Fast push/pull operations 25 + /// - No acknowledgment needed (fire-and-forget) 26 + /// 27 + /// # Limitations 28 + /// 29 + /// - No persistence across restarts 30 + /// - Single consumer only 31 + /// - No distributed operation 32 + /// 33 + /// # Examples 34 + /// 35 + /// ``` 36 + /// use quickdid::queue::MpscQueueAdapter; 37 + /// use quickdid::queue::QueueAdapter; 38 + /// 39 + /// # async fn example() -> anyhow::Result<()> { 40 + /// // Create a queue with buffer size of 100 41 + /// let queue = MpscQueueAdapter::<String>::new(100); 42 + /// 43 + /// // Push items 44 + /// queue.push("item1".to_string()).await?; 45 + /// queue.push("item2".to_string()).await?; 46 + /// 47 + /// // Pull items 48 + /// while let Some(item) = queue.pull().await { 49 + /// println!("Processing: {}", item); 50 + /// } 51 + /// # Ok(()) 52 + /// # } 53 + /// ``` 54 + pub struct MpscQueueAdapter<T> 55 + where 56 + T: Send + Sync + 'static, 57 + { 58 + receiver: Arc<Mutex<mpsc::Receiver<T>>>, 59 + sender: mpsc::Sender<T>, 60 + } 61 + 62 + impl<T> MpscQueueAdapter<T> 63 + where 64 + T: Send + Sync + 'static, 65 + { 66 + /// Create a new MPSC queue adapter with the specified buffer size. 67 + /// 68 + /// # Arguments 69 + /// 70 + /// * `buffer` - The maximum number of items that can be buffered 71 + /// 72 + /// # Examples 73 + /// 74 + /// ``` 75 + /// use quickdid::queue::MpscQueueAdapter; 76 + /// 77 + /// let queue = MpscQueueAdapter::<String>::new(100); 78 + /// ``` 79 + pub fn new(buffer: usize) -> Self { 80 + let (sender, receiver) = mpsc::channel(buffer); 81 + Self { 82 + receiver: Arc::new(Mutex::new(receiver)), 83 + sender, 84 + } 85 + } 86 + 87 + /// Create an adapter from existing MPSC channels. 88 + /// 89 + /// This constructor is useful for integrating with existing channel-based 90 + /// architectures or when you need custom channel configuration. 91 + /// 92 + /// # Arguments 93 + /// 94 + /// * `sender` - The sender half of the channel 95 + /// * `receiver` - The receiver half of the channel 96 + /// 97 + /// # Examples 98 + /// 99 + /// ``` 100 + /// use tokio::sync::mpsc; 101 + /// use quickdid::queue::MpscQueueAdapter; 102 + /// 103 + /// let (sender, receiver) = mpsc::channel::<String>(50); 104 + /// let queue = MpscQueueAdapter::from_channel(sender, receiver); 105 + /// ``` 106 + pub fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self { 107 + Self { 108 + receiver: Arc::new(Mutex::new(receiver)), 109 + sender, 110 + } 111 + } 112 + } 113 + 114 + #[async_trait] 115 + impl<T> QueueAdapter<T> for MpscQueueAdapter<T> 116 + where 117 + T: Send + Sync + 'static, 118 + { 119 + async fn pull(&self) -> Option<T> { 120 + let mut receiver = self.receiver.lock().await; 121 + receiver.recv().await 122 + } 123 + 124 + async fn push(&self, work: T) -> Result<()> { 125 + self.sender 126 + .send(work) 127 + .await 128 + .map_err(|e| QueueError::PushFailed(e.to_string())) 129 + } 130 + 131 + async fn try_push(&self, work: T) -> Result<()> { 132 + self.sender.try_send(work).map_err(|e| match e { 133 + mpsc::error::TrySendError::Full(_) => QueueError::QueueFull, 134 + mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed, 135 + }) 136 + } 137 + 138 + async fn depth(&self) -> Option<usize> { 139 + // Note: This is an approximation as mpsc doesn't provide exact depth 140 + Some(self.sender.max_capacity() - self.sender.capacity()) 141 + } 142 + 143 + async fn is_healthy(&self) -> bool { 144 + !self.sender.is_closed() 145 + } 146 + } 147 + 148 + #[cfg(test)] 149 + mod tests { 150 + use super::*; 151 + 152 + #[tokio::test] 153 + async fn test_mpsc_queue_push_pull() { 154 + let queue = MpscQueueAdapter::<String>::new(10); 155 + 156 + // Test push 157 + queue.push("test1".to_string()).await.unwrap(); 158 + queue.push("test2".to_string()).await.unwrap(); 159 + 160 + // Test pull in FIFO order 161 + let item1 = queue.pull().await; 162 + assert_eq!(item1, Some("test1".to_string())); 163 + 164 + let item2 = queue.pull().await; 165 + assert_eq!(item2, Some("test2".to_string())); 166 + } 167 + 168 + #[tokio::test] 169 + async fn test_mpsc_queue_try_push() { 170 + // Create a small queue to test full condition 171 + let queue = MpscQueueAdapter::<i32>::new(2); 172 + 173 + // Fill the queue 174 + queue.push(1).await.unwrap(); 175 + queue.push(2).await.unwrap(); 176 + 177 + // Try to push when full should fail 178 + let result = queue.try_push(3).await; 179 + assert!(matches!(result, Err(QueueError::QueueFull))); 180 + 181 + // Pull one item to make space 182 + let _ = queue.pull().await; 183 + 184 + // Now try_push should succeed 185 + queue.try_push(3).await.unwrap(); 186 + } 187 + 188 + #[tokio::test] 189 + async fn test_mpsc_queue_from_channel() { 190 + let (sender, receiver) = mpsc::channel(5); 191 + let queue = MpscQueueAdapter::from_channel(sender.clone(), receiver); 192 + 193 + // Send via original sender 194 + sender.send("external".to_string()).await.unwrap(); 195 + 196 + // Send via queue 197 + queue.push("internal".to_string()).await.unwrap(); 198 + 199 + // Pull both items 200 + assert_eq!(queue.pull().await, Some("external".to_string())); 201 + assert_eq!(queue.pull().await, Some("internal".to_string())); 202 + } 203 + 204 + #[tokio::test] 205 + async fn test_mpsc_queue_health() { 206 + let queue = MpscQueueAdapter::<String>::new(10); 207 + 208 + // Queue should be healthy initially 209 + assert!(queue.is_healthy().await); 210 + 211 + // Create a queue and drop the receiver to close it 212 + let (sender, receiver) = mpsc::channel::<String>(10); 213 + drop(receiver); 214 + let closed_queue = MpscQueueAdapter::from_channel(sender, mpsc::channel(1).1); 215 + 216 + // Push should fail on closed queue 217 + let result = closed_queue.push("test".to_string()).await; 218 + assert!(result.is_err()); 219 + } 220 + 221 + #[tokio::test] 222 + async fn test_mpsc_queue_depth() { 223 + let queue = MpscQueueAdapter::<i32>::new(10); 224 + 225 + // Initially empty 226 + let depth = queue.depth().await; 227 + assert_eq!(depth, Some(0)); 228 + 229 + // Add items and check depth 230 + queue.push(1).await.unwrap(); 231 + queue.push(2).await.unwrap(); 232 + queue.push(3).await.unwrap(); 233 + 234 + let depth = queue.depth().await; 235 + assert_eq!(depth, Some(3)); 236 + 237 + // Pull an item and check depth 238 + let _ = queue.pull().await; 239 + let depth = queue.depth().await; 240 + assert_eq!(depth, Some(2)); 241 + } 242 + 243 + #[tokio::test] 244 + async fn test_mpsc_queue_concurrent_operations() { 245 + use std::sync::Arc; 246 + 247 + let queue = Arc::new(MpscQueueAdapter::<i32>::new(100)); 248 + 249 + // Spawn multiple producers 250 + let mut handles = vec![]; 251 + for i in 0..10 { 252 + let q = queue.clone(); 253 + handles.push(tokio::spawn(async move { 254 + for j in 0..10 { 255 + q.push(i * 10 + j).await.unwrap(); 256 + } 257 + })); 258 + } 259 + 260 + // Wait for all producers 261 + for handle in handles { 262 + handle.await.unwrap(); 263 + } 264 + 265 + // Verify we can pull all 100 items 266 + let mut count = 0; 267 + while queue.pull().await.is_some() { 268 + count += 1; 269 + if count >= 100 { 270 + break; 271 + } 272 + } 273 + assert_eq!(count, 100); 274 + } 275 + 276 + #[tokio::test] 277 + async fn test_mpsc_queue_no_ack_needed() { 278 + let queue = MpscQueueAdapter::<String>::new(10); 279 + 280 + queue.push("test".to_string()).await.unwrap(); 281 + let item = queue.pull().await.unwrap(); 282 + 283 + // Ack should always succeed (no-op) 284 + queue.ack(&item).await.unwrap(); 285 + } 286 + }
+222
src/queue/noop.rs
··· 1 + //! No-operation queue adapter implementation. 2 + //! 3 + //! This module provides a queue adapter that discards all work items, 4 + //! useful for testing or when queue processing is disabled. 5 + 6 + use async_trait::async_trait; 7 + use std::time::Duration; 8 + use tokio::time::sleep; 9 + 10 + use super::adapter::QueueAdapter; 11 + use super::error::Result; 12 + 13 + /// No-operation queue adapter that discards all work items. 14 + /// 15 + /// This adapter is useful for configurations where queuing is disabled 16 + /// or as a fallback when other queue adapters fail to initialize. 17 + /// All work items pushed to this queue are silently discarded. 18 + /// 19 + /// # Features 20 + /// 21 + /// - Zero resource usage 22 + /// - Always healthy 23 + /// - Discards all work items 24 + /// - Never returns items from pull 25 + /// 26 + /// # Use Cases 27 + /// 28 + /// - Testing environments where queue processing isn't needed 29 + /// - Graceful degradation when queue backends are unavailable 30 + /// - Configurations where queue processing is explicitly disabled 31 + /// 32 + /// # Examples 33 + /// 34 + /// ``` 35 + /// use quickdid::queue::NoopQueueAdapter; 36 + /// use quickdid::queue::QueueAdapter; 37 + /// 38 + /// # async fn example() -> anyhow::Result<()> { 39 + /// let queue = NoopQueueAdapter::<String>::new(); 40 + /// 41 + /// // Push is silently discarded 42 + /// queue.push("ignored".to_string()).await?; 43 + /// 44 + /// // Pull never returns items (blocks indefinitely) 45 + /// // let item = queue.pull().await; // Would block forever 46 + /// 47 + /// // Always reports healthy 48 + /// assert!(queue.is_healthy().await); 49 + /// 50 + /// // Always reports empty 51 + /// assert_eq!(queue.depth().await, Some(0)); 52 + /// # Ok(()) 53 + /// # } 54 + /// ``` 55 + pub struct NoopQueueAdapter<T> 56 + where 57 + T: Send + Sync + 'static, 58 + { 59 + _phantom: std::marker::PhantomData<T>, 60 + } 61 + 62 + impl<T> NoopQueueAdapter<T> 63 + where 64 + T: Send + Sync + 'static, 65 + { 66 + /// Create a new no-op queue adapter. 67 + /// 68 + /// # Examples 69 + /// 70 + /// ``` 71 + /// use quickdid::queue::NoopQueueAdapter; 72 + /// 73 + /// let queue = NoopQueueAdapter::<String>::new(); 74 + /// ``` 75 + pub fn new() -> Self { 76 + Self { 77 + _phantom: std::marker::PhantomData, 78 + } 79 + } 80 + } 81 + 82 + impl<T> Default for NoopQueueAdapter<T> 83 + where 84 + T: Send + Sync + 'static, 85 + { 86 + fn default() -> Self { 87 + Self::new() 88 + } 89 + } 90 + 91 + #[async_trait] 92 + impl<T> QueueAdapter<T> for NoopQueueAdapter<T> 93 + where 94 + T: Send + Sync + 'static, 95 + { 96 + async fn pull(&self) -> Option<T> { 97 + // Never returns any work - sleeps to avoid busy-waiting 98 + sleep(Duration::from_secs(60)).await; 99 + None 100 + } 101 + 102 + async fn push(&self, _work: T) -> Result<()> { 103 + // Silently discard the work 104 + Ok(()) 105 + } 106 + 107 + async fn ack(&self, _item: &T) -> Result<()> { 108 + // No-op 109 + Ok(()) 110 + } 111 + 112 + async fn try_push(&self, _work: T) -> Result<()> { 113 + // Silently discard the work 114 + Ok(()) 115 + } 116 + 117 + async fn depth(&self) -> Option<usize> { 118 + // Always empty 119 + Some(0) 120 + } 121 + 122 + async fn is_healthy(&self) -> bool { 123 + // Always healthy 124 + true 125 + } 126 + } 127 + 128 + #[cfg(test)] 129 + mod tests { 130 + use super::*; 131 + 132 + #[tokio::test] 133 + async fn test_noop_queue_push() { 134 + let queue = NoopQueueAdapter::<String>::new(); 135 + 136 + // Push should always succeed 137 + queue.push("test1".to_string()).await.unwrap(); 138 + queue.push("test2".to_string()).await.unwrap(); 139 + queue.push("test3".to_string()).await.unwrap(); 140 + } 141 + 142 + #[tokio::test] 143 + async fn test_noop_queue_try_push() { 144 + let queue = NoopQueueAdapter::<i32>::new(); 145 + 146 + // Try push should always succeed 147 + queue.try_push(1).await.unwrap(); 148 + queue.try_push(2).await.unwrap(); 149 + queue.try_push(3).await.unwrap(); 150 + } 151 + 152 + #[tokio::test] 153 + async fn test_noop_queue_ack() { 154 + let queue = NoopQueueAdapter::<String>::new(); 155 + 156 + // Ack should always succeed 157 + queue.ack(&"any".to_string()).await.unwrap(); 158 + } 159 + 160 + #[tokio::test] 161 + async fn test_noop_queue_depth() { 162 + let queue = NoopQueueAdapter::<String>::new(); 163 + 164 + // Should always report empty 165 + assert_eq!(queue.depth().await, Some(0)); 166 + 167 + // Even after pushing items 168 + queue.push("item".to_string()).await.unwrap(); 169 + assert_eq!(queue.depth().await, Some(0)); 170 + } 171 + 172 + #[tokio::test] 173 + async fn test_noop_queue_health() { 174 + let queue = NoopQueueAdapter::<String>::new(); 175 + 176 + // Should always be healthy 177 + assert!(queue.is_healthy().await); 178 + } 179 + 180 + #[tokio::test] 181 + async fn test_noop_queue_default() { 182 + let queue: NoopQueueAdapter<String> = Default::default(); 183 + 184 + // Default instance should work normally 185 + queue.push("test".to_string()).await.unwrap(); 186 + assert!(queue.is_healthy().await); 187 + } 188 + 189 + #[tokio::test(flavor = "multi_thread")] 190 + async fn test_noop_queue_pull_blocks() { 191 + use tokio::time::timeout; 192 + 193 + let queue = NoopQueueAdapter::<String>::new(); 194 + 195 + // Pull should block and not return immediately 196 + let result = timeout(Duration::from_millis(100), queue.pull()).await; 197 + assert!(result.is_err(), "Pull should have timed out"); 198 + } 199 + 200 + #[tokio::test] 201 + async fn test_noop_queue_with_custom_type() { 202 + use serde::{Deserialize, Serialize}; 203 + 204 + #[derive(Debug, Clone, Serialize, Deserialize)] 205 + struct CustomWork { 206 + id: u64, 207 + data: Vec<String>, 208 + } 209 + 210 + let queue = NoopQueueAdapter::<CustomWork>::new(); 211 + 212 + let work = CustomWork { 213 + id: 123, 214 + data: vec!["test".to_string()], 215 + }; 216 + 217 + // Should handle custom types without issue 218 + queue.push(work.clone()).await.unwrap(); 219 + queue.ack(&work).await.unwrap(); 220 + assert_eq!(queue.depth().await, Some(0)); 221 + } 222 + }
+474
src/queue/redis.rs
··· 1 + //! Redis-backed queue adapter implementation. 2 + //! 3 + //! This module provides a distributed queue implementation using Redis lists 4 + //! with a reliable queue pattern for at-least-once delivery semantics. 5 + 6 + use async_trait::async_trait; 7 + use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands}; 8 + use serde::{Deserialize, Serialize}; 9 + use tracing::{debug, error, warn}; 10 + 11 + use super::adapter::QueueAdapter; 12 + use super::error::{QueueError, Result}; 13 + 14 + /// Redis-backed queue adapter implementation. 15 + /// 16 + /// This adapter uses Redis lists with a reliable queue pattern: 17 + /// - LPUSH to push items to the primary queue 18 + /// - BRPOPLPUSH to atomically move items from primary to worker queue 19 + /// - LREM to acknowledge processed items from worker queue 20 + /// 21 + /// This ensures at-least-once delivery semantics and allows for recovery 22 + /// of in-flight items if a worker crashes. 23 + /// 24 + /// # Features 25 + /// 26 + /// - Distributed operation across multiple instances 27 + /// - Persistent storage with Redis 28 + /// - At-least-once delivery guarantees 29 + /// - Automatic recovery of failed items 30 + /// - Configurable timeouts 31 + /// 32 + /// # Architecture 33 + /// 34 + /// ```text 35 + /// Producer -> [Primary Queue] -> BRPOPLPUSH -> [Worker Queue] -> Consumer 36 + /// | 37 + /// LREM (on ack) 38 + /// ``` 39 + /// 40 + /// # Examples 41 + /// 42 + /// ```no_run 43 + /// use quickdid::queue::RedisQueueAdapter; 44 + /// use quickdid::queue::QueueAdapter; 45 + /// use deadpool_redis::Config; 46 + /// 47 + /// # async fn example() -> anyhow::Result<()> { 48 + /// // Create Redis pool 49 + /// let cfg = Config::from_url("redis://localhost:6379"); 50 + /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; 51 + /// 52 + /// // Create queue adapter 53 + /// let queue = RedisQueueAdapter::<String>::new( 54 + /// pool, 55 + /// "worker-1".to_string(), 56 + /// "queue:myapp:".to_string(), 57 + /// 5, // 5 second timeout 58 + /// ); 59 + /// 60 + /// // Use the queue 61 + /// queue.push("work-item".to_string()).await?; 62 + /// if let Some(item) = queue.pull().await { 63 + /// // Process item 64 + /// queue.ack(&item).await?; 65 + /// } 66 + /// # Ok(()) 67 + /// # } 68 + /// ``` 69 + pub struct RedisQueueAdapter<T> 70 + where 71 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 72 + { 73 + /// Redis connection pool 74 + pool: RedisPool, 75 + /// Unique worker ID for this adapter instance 76 + worker_id: String, 77 + /// Key prefix for all queues (default: "queue:handleresolver:") 78 + key_prefix: String, 79 + /// Timeout for blocking RPOPLPUSH operations (in seconds) 80 + timeout_seconds: u64, 81 + /// Type marker for generic parameter 82 + _phantom: std::marker::PhantomData<T>, 83 + } 84 + 85 + impl<T> RedisQueueAdapter<T> 86 + where 87 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 88 + { 89 + /// Create a new Redis queue adapter. 90 + /// 91 + /// # Arguments 92 + /// 93 + /// * `pool` - Redis connection pool 94 + /// * `worker_id` - Unique identifier for this worker instance 95 + /// * `key_prefix` - Redis key prefix for queue operations 96 + /// * `timeout_seconds` - Timeout for blocking pull operations 97 + /// 98 + /// # Examples 99 + /// 100 + /// ```no_run 101 + /// use quickdid::queue::RedisQueueAdapter; 102 + /// use deadpool_redis::Config; 103 + /// 104 + /// # async fn example() -> anyhow::Result<()> { 105 + /// let cfg = Config::from_url("redis://localhost:6379"); 106 + /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; 107 + /// 108 + /// let queue = RedisQueueAdapter::<String>::new( 109 + /// pool, 110 + /// "worker-1".to_string(), 111 + /// "queue:myapp:".to_string(), 112 + /// 5, 113 + /// ); 114 + /// # Ok(()) 115 + /// # } 116 + /// ``` 117 + pub fn new( 118 + pool: RedisPool, 119 + worker_id: String, 120 + key_prefix: String, 121 + timeout_seconds: u64, 122 + ) -> Self { 123 + Self { 124 + pool, 125 + worker_id, 126 + key_prefix, 127 + timeout_seconds, 128 + _phantom: std::marker::PhantomData, 129 + } 130 + } 131 + 132 + /// Get the primary queue key. 133 + fn primary_queue_key(&self) -> String { 134 + format!("{}primary", self.key_prefix) 135 + } 136 + 137 + /// Get the worker-specific temporary queue key. 138 + fn worker_queue_key(&self) -> String { 139 + format!("{}{}", self.key_prefix, self.worker_id) 140 + } 141 + } 142 + 143 + #[async_trait] 144 + impl<T> QueueAdapter<T> for RedisQueueAdapter<T> 145 + where 146 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 147 + { 148 + async fn pull(&self) -> Option<T> { 149 + match self.pool.get().await { 150 + Ok(mut conn) => { 151 + let primary_key = self.primary_queue_key(); 152 + let worker_key = self.worker_queue_key(); 153 + 154 + // Use blocking RPOPLPUSH to atomically move item from primary to worker queue 155 + let data: Option<Vec<u8>> = match conn 156 + .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64) 157 + .await 158 + { 159 + Ok(data) => data, 160 + Err(e) => { 161 + error!("Failed to pull from queue: {}", e); 162 + return None; 163 + } 164 + }; 165 + 166 + if let Some(data) = data { 167 + // Deserialize the item 168 + match serde_json::from_slice(&data) { 169 + Ok(item) => { 170 + debug!( 171 + worker_id = %self.worker_id, 172 + "Pulled item from queue" 173 + ); 174 + Some(item) 175 + } 176 + Err(e) => { 177 + error!("Failed to deserialize item: {}", e); 178 + // Remove the corrupted item from worker queue 179 + let _: std::result::Result<(), _> = 180 + conn.lrem(&worker_key, 1, &data).await; 181 + None 182 + } 183 + } 184 + } else { 185 + None 186 + } 187 + } 188 + Err(e) => { 189 + error!("Failed to get Redis connection: {}", e); 190 + None 191 + } 192 + } 193 + } 194 + 195 + async fn push(&self, work: T) -> Result<()> { 196 + let mut conn = self 197 + .pool 198 + .get() 199 + .await 200 + .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 201 + 202 + let data = serde_json::to_vec(&work) 203 + .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 204 + 205 + let primary_key = self.primary_queue_key(); 206 + 207 + conn.lpush::<_, _, ()>(&primary_key, data) 208 + .await 209 + .map_err(|e| QueueError::RedisOperationFailed { 210 + operation: "LPUSH".to_string(), 211 + details: e.to_string(), 212 + })?; 213 + 214 + debug!("Pushed item to queue"); 215 + Ok(()) 216 + } 217 + 218 + async fn ack(&self, item: &T) -> Result<()> { 219 + let mut conn = self 220 + .pool 221 + .get() 222 + .await 223 + .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 224 + 225 + let data = 226 + serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 227 + 228 + let worker_key = self.worker_queue_key(); 229 + 230 + // Remove exactly one occurrence of this item from the worker queue 231 + let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| { 232 + QueueError::RedisOperationFailed { 233 + operation: "LREM".to_string(), 234 + details: e.to_string(), 235 + } 236 + })?; 237 + 238 + if removed == 0 { 239 + warn!( 240 + worker_id = %self.worker_id, 241 + "Item not found in worker queue during acknowledgment" 242 + ); 243 + } else { 244 + debug!( 245 + worker_id = %self.worker_id, 246 + "Acknowledged item" 247 + ); 248 + } 249 + 250 + Ok(()) 251 + } 252 + 253 + async fn depth(&self) -> Option<usize> { 254 + match self.pool.get().await { 255 + Ok(mut conn) => { 256 + let primary_key = self.primary_queue_key(); 257 + match conn.llen::<_, usize>(&primary_key).await { 258 + Ok(len) => Some(len), 259 + Err(e) => { 260 + error!("Failed to get queue depth: {}", e); 261 + None 262 + } 263 + } 264 + } 265 + Err(e) => { 266 + error!("Failed to get Redis connection: {}", e); 267 + None 268 + } 269 + } 270 + } 271 + 272 + async fn is_healthy(&self) -> bool { 273 + match self.pool.get().await { 274 + Ok(mut conn) => { 275 + // Ping Redis to check health 276 + match deadpool_redis::redis::cmd("PING") 277 + .query_async::<String>(&mut conn) 278 + .await 279 + { 280 + Ok(response) => response == "PONG", 281 + Err(_) => false, 282 + } 283 + } 284 + Err(_) => false, 285 + } 286 + } 287 + } 288 + 289 + #[cfg(test)] 290 + mod tests { 291 + use super::*; 292 + 293 + #[tokio::test] 294 + async fn test_redis_queue_push_pull() { 295 + let pool = match crate::test_helpers::get_test_redis_pool() { 296 + Some(p) => p, 297 + None => { 298 + eprintln!("Skipping Redis test - no Redis connection available"); 299 + return; 300 + } 301 + }; 302 + 303 + // Create adapter with unique prefix for testing 304 + let test_prefix = format!( 305 + "test:queue:{}:", 306 + std::time::SystemTime::now() 307 + .duration_since(std::time::UNIX_EPOCH) 308 + .unwrap() 309 + .as_nanos() 310 + ); 311 + let adapter = RedisQueueAdapter::<String>::new( 312 + pool.clone(), 313 + "test-worker".to_string(), 314 + test_prefix.clone(), 315 + 1, // 1 second timeout for tests 316 + ); 317 + 318 + // Test push 319 + adapter.push("test-item".to_string()).await.unwrap(); 320 + 321 + // Test pull 322 + let pulled = adapter.pull().await; 323 + assert_eq!(pulled, Some("test-item".to_string())); 324 + 325 + // Test ack 326 + adapter 327 + .ack(&"test-item".to_string()) 328 + .await 329 + .expect("Ack should succeed"); 330 + } 331 + 332 + #[tokio::test] 333 + async fn test_redis_queue_reliable_delivery() { 334 + let pool = match crate::test_helpers::get_test_redis_pool() { 335 + Some(p) => p, 336 + None => { 337 + eprintln!("Skipping Redis test - no Redis connection available"); 338 + return; 339 + } 340 + }; 341 + 342 + let test_prefix = format!( 343 + "test:queue:{}:", 344 + std::time::SystemTime::now() 345 + .duration_since(std::time::UNIX_EPOCH) 346 + .unwrap() 347 + .as_nanos() 348 + ); 349 + let worker_id = "test-worker-reliable"; 350 + 351 + // Create adapter 352 + let adapter1 = RedisQueueAdapter::<String>::new( 353 + pool.clone(), 354 + worker_id.to_string(), 355 + test_prefix.clone(), 356 + 1, 357 + ); 358 + 359 + // Push multiple items 360 + adapter1.push("item1".to_string()).await.unwrap(); 361 + adapter1.push("item2".to_string()).await.unwrap(); 362 + adapter1.push("item3".to_string()).await.unwrap(); 363 + 364 + // Pull but don't ack (simulating worker crash) 365 + let item1 = adapter1.pull().await; 366 + assert_eq!(item1, Some("item1".to_string())); 367 + 368 + // Item should be in worker queue 369 + // In production, a recovery process would handle unacked items 370 + // For this test, we verify the item is in the worker queue 371 + let item2 = adapter1.pull().await; 372 + assert_eq!(item2, Some("item2".to_string())); 373 + 374 + // Ack the second item 375 + adapter1.ack(&"item2".to_string()).await.unwrap(); 376 + } 377 + 378 + #[tokio::test] 379 + async fn test_redis_queue_depth() { 380 + let pool = match crate::test_helpers::get_test_redis_pool() { 381 + Some(p) => p, 382 + None => { 383 + eprintln!("Skipping Redis test - no Redis connection available"); 384 + return; 385 + } 386 + }; 387 + 388 + let test_prefix = format!( 389 + "test:queue:{}:", 390 + std::time::SystemTime::now() 391 + .duration_since(std::time::UNIX_EPOCH) 392 + .unwrap() 393 + .as_nanos() 394 + ); 395 + let adapter = RedisQueueAdapter::<String>::new( 396 + pool, 397 + "test-worker-depth".to_string(), 398 + test_prefix, 399 + 1, 400 + ); 401 + 402 + // Initially empty 403 + assert_eq!(adapter.depth().await, Some(0)); 404 + 405 + // Push items and check depth 406 + adapter.push("item1".to_string()).await.unwrap(); 407 + assert_eq!(adapter.depth().await, Some(1)); 408 + 409 + adapter.push("item2".to_string()).await.unwrap(); 410 + assert_eq!(adapter.depth().await, Some(2)); 411 + 412 + // Pull and check depth (note: depth checks primary queue) 413 + let _ = adapter.pull().await; 414 + assert_eq!(adapter.depth().await, Some(1)); 415 + } 416 + 417 + #[tokio::test] 418 + async fn test_redis_queue_health() { 419 + let pool = match crate::test_helpers::get_test_redis_pool() { 420 + Some(p) => p, 421 + None => { 422 + eprintln!("Skipping Redis test - no Redis connection available"); 423 + return; 424 + } 425 + }; 426 + 427 + let adapter = RedisQueueAdapter::<String>::new( 428 + pool, 429 + "test-worker-health".to_string(), 430 + "test:queue:health:".to_string(), 431 + 1, 432 + ); 433 + 434 + // Should be healthy if Redis is running 435 + assert!(adapter.is_healthy().await); 436 + } 437 + 438 + #[tokio::test] 439 + async fn test_redis_queue_serialization() { 440 + use crate::queue::HandleResolutionWork; 441 + 442 + let pool = match crate::test_helpers::get_test_redis_pool() { 443 + Some(p) => p, 444 + None => { 445 + eprintln!("Skipping Redis test - no Redis connection available"); 446 + return; 447 + } 448 + }; 449 + 450 + let test_prefix = format!( 451 + "test:queue:{}:", 452 + std::time::SystemTime::now() 453 + .duration_since(std::time::UNIX_EPOCH) 454 + .unwrap() 455 + .as_nanos() 456 + ); 457 + let adapter = RedisQueueAdapter::<HandleResolutionWork>::new( 458 + pool, 459 + "test-worker-ser".to_string(), 460 + test_prefix, 461 + 1, 462 + ); 463 + 464 + let work = HandleResolutionWork::new("alice.example.com".to_string()); 465 + 466 + // Push and pull 467 + adapter.push(work.clone()).await.unwrap(); 468 + let pulled = adapter.pull().await; 469 + assert_eq!(pulled, Some(work.clone())); 470 + 471 + // Ack 472 + adapter.ack(&work).await.unwrap(); 473 + } 474 + }
+502
src/queue/sqlite.rs
··· 1 + //! SQLite-backed queue adapter implementation. 2 + //! 3 + //! This module provides a persistent queue implementation using SQLite 4 + //! with optional work shedding to prevent unbounded growth. 5 + 6 + use async_trait::async_trait; 7 + use serde::{Deserialize, Serialize}; 8 + use sqlx::{self, Row}; 9 + use tracing::{debug, error, info, warn}; 10 + 11 + use super::adapter::QueueAdapter; 12 + use super::error::{QueueError, Result}; 13 + 14 + /// SQLite-backed queue adapter implementation. 15 + /// 16 + /// This adapter uses SQLite database for persistent queuing of work items. 17 + /// It's suitable for single-instance deployments that need persistence 18 + /// across service restarts while remaining lightweight. 19 + /// 20 + /// # Features 21 + /// 22 + /// - Persistent queuing across service restarts 23 + /// - Simple FIFO ordering based on insertion time 24 + /// - Single consumer design (no complex locking needed) 25 + /// - Simple pull-and-delete semantics 26 + /// - Optional work shedding to prevent unbounded queue growth 27 + /// 28 + /// # Work Shedding 29 + /// 30 + /// When `max_size` is configured (> 0), the adapter implements work shedding: 31 + /// - New work items are always accepted 32 + /// - When the queue exceeds `max_size`, oldest entries are automatically deleted 33 + /// - This maintains the most recent work items while preventing unbounded growth 34 + /// - Essential for long-running deployments to avoid disk space issues 35 + /// 36 + /// # Database Schema 37 + /// 38 + /// The adapter expects the following table structure: 39 + /// ```sql 40 + /// CREATE TABLE handle_resolution_queue ( 41 + /// id INTEGER PRIMARY KEY AUTOINCREMENT, 42 + /// work TEXT NOT NULL, 43 + /// queued_at INTEGER NOT NULL 44 + /// ); 45 + /// CREATE INDEX idx_queue_timestamp ON handle_resolution_queue(queued_at); 46 + /// ``` 47 + /// 48 + /// # Examples 49 + /// 50 + /// ```no_run 51 + /// use quickdid::queue::SqliteQueueAdapter; 52 + /// use quickdid::queue::QueueAdapter; 53 + /// use quickdid::sqlite_schema::create_sqlite_pool; 54 + /// 55 + /// # async fn example() -> anyhow::Result<()> { 56 + /// // Create SQLite pool 57 + /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 58 + /// 59 + /// // Create queue with unlimited size 60 + /// let queue = SqliteQueueAdapter::<String>::new(pool.clone()); 61 + /// 62 + /// // Or create queue with work shedding (max 10,000 items) 63 + /// let bounded_queue = SqliteQueueAdapter::<String>::with_max_size(pool, 10000); 64 + /// 65 + /// // Use the queue 66 + /// queue.push("work-item".to_string()).await?; 67 + /// if let Some(item) = queue.pull().await { 68 + /// // Process item (automatically deleted from queue) 69 + /// println!("Processing: {}", item); 70 + /// } 71 + /// # Ok(()) 72 + /// # } 73 + /// ``` 74 + pub struct SqliteQueueAdapter<T> 75 + where 76 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 77 + { 78 + /// SQLite connection pool 79 + pool: sqlx::SqlitePool, 80 + /// Maximum queue size (0 = unlimited) 81 + /// When exceeded, oldest entries are deleted to maintain this limit 82 + max_size: u64, 83 + /// Type marker for generic parameter 84 + _phantom: std::marker::PhantomData<T>, 85 + } 86 + 87 + impl<T> SqliteQueueAdapter<T> 88 + where 89 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 90 + { 91 + /// Create a new SQLite queue adapter with unlimited queue size. 92 + /// 93 + /// # Arguments 94 + /// 95 + /// * `pool` - SQLite connection pool 96 + /// 97 + /// # Examples 98 + /// 99 + /// ```no_run 100 + /// use quickdid::queue::SqliteQueueAdapter; 101 + /// use quickdid::sqlite_schema::create_sqlite_pool; 102 + /// 103 + /// # async fn example() -> anyhow::Result<()> { 104 + /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 105 + /// let queue = SqliteQueueAdapter::<String>::new(pool); 106 + /// # Ok(()) 107 + /// # } 108 + /// ``` 109 + pub fn new(pool: sqlx::SqlitePool) -> Self { 110 + Self::with_max_size(pool, 0) 111 + } 112 + 113 + /// Create a new SQLite queue adapter with specified maximum queue size. 114 + /// 115 + /// # Arguments 116 + /// 117 + /// * `pool` - SQLite connection pool 118 + /// * `max_size` - Maximum number of entries in queue (0 = unlimited) 119 + /// 120 + /// # Work Shedding Behavior 121 + /// 122 + /// When `max_size` > 0: 123 + /// - New work items are always accepted 124 + /// - If queue size exceeds `max_size` after insertion, oldest entries are deleted 125 + /// - This preserves the most recent work while preventing unbounded growth 126 + /// 127 + /// # Examples 128 + /// 129 + /// ```no_run 130 + /// use quickdid::queue::SqliteQueueAdapter; 131 + /// use quickdid::sqlite_schema::create_sqlite_pool; 132 + /// 133 + /// # async fn example() -> anyhow::Result<()> { 134 + /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 135 + /// // Limit queue to 10,000 entries with automatic work shedding 136 + /// let queue = SqliteQueueAdapter::<String>::with_max_size(pool, 10000); 137 + /// # Ok(()) 138 + /// # } 139 + /// ``` 140 + pub fn with_max_size(pool: sqlx::SqlitePool, max_size: u64) -> Self { 141 + Self { 142 + pool, 143 + max_size, 144 + _phantom: std::marker::PhantomData, 145 + } 146 + } 147 + } 148 + 149 + #[async_trait] 150 + impl<T> QueueAdapter<T> for SqliteQueueAdapter<T> 151 + where 152 + T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 153 + { 154 + async fn pull(&self) -> Option<T> { 155 + // Get the oldest queued item and delete it in a transaction 156 + let mut transaction = match self.pool.begin().await { 157 + Ok(tx) => tx, 158 + Err(e) => { 159 + error!("Failed to start SQLite transaction: {}", e); 160 + return None; 161 + } 162 + }; 163 + 164 + // Select the oldest queued item 165 + let record = match sqlx::query( 166 + "SELECT id, work FROM handle_resolution_queue 167 + ORDER BY queued_at ASC 168 + LIMIT 1" 169 + ) 170 + .fetch_optional(&mut *transaction) 171 + .await 172 + { 173 + Ok(Some(row)) => row, 174 + Ok(None) => { 175 + // No queued items available 176 + debug!("No queued items available in SQLite queue"); 177 + return None; 178 + } 179 + Err(e) => { 180 + error!("Failed to query SQLite queue: {}", e); 181 + return None; 182 + } 183 + }; 184 + 185 + let item_id: i64 = record.get("id"); 186 + let work_json: String = record.get("work"); 187 + 188 + // Delete the item from the queue 189 + if let Err(e) = sqlx::query("DELETE FROM handle_resolution_queue WHERE id = ?1") 190 + .bind(item_id) 191 + .execute(&mut *transaction) 192 + .await 193 + { 194 + error!("Failed to delete item from queue: {}", e); 195 + return None; 196 + } 197 + 198 + // Commit the transaction 199 + if let Err(e) = transaction.commit().await { 200 + error!("Failed to commit SQLite transaction: {}", e); 201 + return None; 202 + } 203 + 204 + // Deserialize the work item from JSON 205 + match serde_json::from_str(&work_json) { 206 + Ok(work) => { 207 + debug!("Pulled work item from SQLite queue"); 208 + Some(work) 209 + } 210 + Err(e) => { 211 + error!("Failed to deserialize work item: {}", e); 212 + None 213 + } 214 + } 215 + } 216 + 217 + async fn push(&self, work: T) -> Result<()> { 218 + // Serialize the entire work item as JSON 219 + let work_json = serde_json::to_string(&work) 220 + .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 221 + 222 + let current_timestamp = std::time::SystemTime::now() 223 + .duration_since(std::time::UNIX_EPOCH) 224 + .unwrap_or_default() 225 + .as_secs() as i64; 226 + 227 + // Optimized approach: Insert first, then check if cleanup needed 228 + // This avoids counting on every insert 229 + sqlx::query( 230 + "INSERT INTO handle_resolution_queue (work, queued_at) VALUES (?1, ?2)" 231 + ) 232 + .bind(&work_json) 233 + .bind(current_timestamp) 234 + .execute(&self.pool) 235 + .await 236 + .map_err(|e| QueueError::PushFailed(format!("Failed to insert work item: {}", e)))?; 237 + 238 + // Implement optimized work shedding if max_size is configured 239 + if self.max_size > 0 { 240 + // Optimized approach: Only check and clean periodically or when likely over limit 241 + // Use a limited count to avoid full table scan 242 + let check_limit = self.max_size as i64 + (self.max_size as i64 / 10).max(1); // Check 10% over limit 243 + let approx_count: Option<i64> = sqlx::query_scalar( 244 + "SELECT COUNT(*) FROM ( 245 + SELECT 1 FROM handle_resolution_queue LIMIT ?1 246 + ) AS limited_count" 247 + ) 248 + .bind(check_limit) 249 + .fetch_one(&self.pool) 250 + .await 251 + .map_err(|e| QueueError::PushFailed(format!("Failed to check queue size: {}", e)))?; 252 + 253 + // Only perform cleanup if we're definitely over the limit 254 + if let Some(count) = approx_count && count >= check_limit { 255 + // Perform batch cleanup - delete more than just the excess to reduce frequency 256 + // Delete 20% more than needed to avoid frequent shedding 257 + let target_size = (self.max_size as f64 * 0.8) as i64; // Keep 80% of max_size 258 + let to_delete = count - target_size; 259 + 260 + if to_delete > 0 { 261 + // Optimized deletion: First get the cutoff id and timestamp 262 + // This avoids the expensive subquery in the DELETE statement 263 + let cutoff: Option<(i64, i64)> = sqlx::query_as( 264 + "SELECT id, queued_at FROM handle_resolution_queue 265 + ORDER BY queued_at ASC, id ASC 266 + LIMIT 1 OFFSET ?1" 267 + ) 268 + .bind(to_delete - 1) 269 + .fetch_optional(&self.pool) 270 + .await 271 + .map_err(|e| QueueError::PushFailed(format!("Failed to find cutoff: {}", e)))?; 272 + 273 + if let Some((cutoff_id, cutoff_timestamp)) = cutoff { 274 + // Delete entries older than cutoff, or equal timestamp with lower id 275 + // This handles the case where multiple entries have the same timestamp 276 + let deleted_result = sqlx::query( 277 + "DELETE FROM handle_resolution_queue 278 + WHERE queued_at < ?1 279 + OR (queued_at = ?1 AND id <= ?2)" 280 + ) 281 + .bind(cutoff_timestamp) 282 + .bind(cutoff_id) 283 + .execute(&self.pool) 284 + .await 285 + .map_err(|e| QueueError::PushFailed(format!("Failed to delete excess entries: {}", e)))?; 286 + 287 + let deleted_count = deleted_result.rows_affected(); 288 + if deleted_count > 0 { 289 + info!( 290 + "Work shedding: deleted {} oldest entries (target size: {}, max: {})", 291 + deleted_count, 292 + target_size, 293 + self.max_size 294 + ); 295 + } 296 + } 297 + } 298 + } 299 + } 300 + 301 + debug!("Pushed work item to SQLite queue (max_size: {})", self.max_size); 302 + Ok(()) 303 + } 304 + 305 + async fn ack(&self, _item: &T) -> Result<()> { 306 + // With the simplified SQLite queue design, items are deleted when pulled, 307 + // so acknowledgment is a no-op (item is already processed and removed) 308 + debug!("Acknowledged work item in SQLite queue (no-op)"); 309 + Ok(()) 310 + } 311 + 312 + async fn depth(&self) -> Option<usize> { 313 + match sqlx::query_scalar::<_, i64>( 314 + "SELECT COUNT(*) FROM handle_resolution_queue" 315 + ) 316 + .fetch_one(&self.pool) 317 + .await 318 + { 319 + Ok(count) => Some(count as usize), 320 + Err(e) => { 321 + warn!("Failed to get SQLite queue depth: {}", e); 322 + None 323 + } 324 + } 325 + } 326 + 327 + async fn is_healthy(&self) -> bool { 328 + // Test the connection by running a simple query 329 + sqlx::query_scalar::<_, i64>("SELECT 1") 330 + .fetch_one(&self.pool) 331 + .await 332 + .map(|_| true) 333 + .unwrap_or(false) 334 + } 335 + } 336 + 337 + #[cfg(test)] 338 + mod tests { 339 + use super::*; 340 + use crate::queue::HandleResolutionWork; 341 + 342 + async fn create_test_pool() -> sqlx::SqlitePool { 343 + let pool = sqlx::SqlitePool::connect("sqlite::memory:") 344 + .await 345 + .expect("Failed to connect to in-memory SQLite"); 346 + 347 + // Create the queue schema 348 + crate::sqlite_schema::create_schema(&pool) 349 + .await 350 + .expect("Failed to create schema"); 351 + 352 + pool 353 + } 354 + 355 + #[tokio::test] 356 + async fn test_sqlite_queue_push_pull() { 357 + let pool = create_test_pool().await; 358 + let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool.clone()); 359 + 360 + let work = HandleResolutionWork::new("alice.example.com".to_string()); 361 + 362 + // Test push 363 + adapter.push(work.clone()).await.unwrap(); 364 + 365 + // Verify depth 366 + assert_eq!(adapter.depth().await, Some(1)); 367 + 368 + // Test pull 369 + let pulled = adapter.pull().await; 370 + assert_eq!(pulled, Some(work)); 371 + 372 + // Verify queue is empty after pull 373 + assert_eq!(adapter.depth().await, Some(0)); 374 + assert!(adapter.pull().await.is_none()); 375 + } 376 + 377 + #[tokio::test] 378 + async fn test_sqlite_queue_fifo_ordering() { 379 + let pool = create_test_pool().await; 380 + let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool); 381 + 382 + // Push multiple items 383 + let handles = vec!["alice.example.com", "bob.example.com", "charlie.example.com"]; 384 + for handle in &handles { 385 + let work = HandleResolutionWork::new(handle.to_string()); 386 + adapter.push(work).await.unwrap(); 387 + } 388 + 389 + // Pull items in FIFO order 390 + for expected_handle in handles { 391 + let pulled = adapter.pull().await; 392 + assert!(pulled.is_some()); 393 + assert_eq!(pulled.unwrap().handle, expected_handle); 394 + } 395 + 396 + // Queue should be empty 397 + assert!(adapter.pull().await.is_none()); 398 + } 399 + 400 + #[tokio::test] 401 + async fn test_sqlite_queue_ack_noop() { 402 + let pool = create_test_pool().await; 403 + let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool); 404 + 405 + // Ack should always succeed as it's a no-op 406 + let work = HandleResolutionWork::new("any.example.com".to_string()); 407 + adapter.ack(&work).await.unwrap(); 408 + } 409 + 410 + #[tokio::test] 411 + async fn test_sqlite_queue_health() { 412 + let pool = create_test_pool().await; 413 + let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool); 414 + 415 + // Should be healthy if SQLite is working 416 + assert!(adapter.is_healthy().await); 417 + } 418 + 419 + #[tokio::test] 420 + async fn test_sqlite_queue_work_shedding() { 421 + let pool = create_test_pool().await; 422 + 423 + // Create adapter with small max_size for testing 424 + let max_size = 10; 425 + let adapter = SqliteQueueAdapter::<HandleResolutionWork>::with_max_size( 426 + pool.clone(), 427 + max_size 428 + ); 429 + 430 + // Push items up to the limit (should not trigger shedding) 431 + for i in 0..max_size { 432 + let work = HandleResolutionWork::new(format!("test-{:03}", i)); 433 + adapter.push(work).await.expect("Push should succeed"); 434 + } 435 + 436 + // Verify all items are present 437 + assert_eq!(adapter.depth().await, Some(max_size as usize)); 438 + 439 + // Push beyond 110% of max_size to trigger batch shedding 440 + let trigger_point = max_size + (max_size / 10) + 1; 441 + for i in max_size..trigger_point { 442 + let work = HandleResolutionWork::new(format!("test-{:03}", i)); 443 + adapter.push(work).await.expect("Push should succeed"); 444 + } 445 + 446 + // After triggering shedding, queue should be around 80% of max_size 447 + let depth_after_shedding = adapter.depth().await.unwrap(); 448 + let expected_size = (max_size as f64 * 0.8) as usize; 449 + 450 + // Allow some variance due to batch deletion 451 + assert!( 452 + depth_after_shedding <= expected_size + 1, 453 + "Queue size {} should be around 80% of max_size ({})", 454 + depth_after_shedding, 455 + expected_size 456 + ); 457 + } 458 + 459 + #[tokio::test] 460 + async fn test_sqlite_queue_work_shedding_disabled() { 461 + let pool = create_test_pool().await; 462 + 463 + // Create adapter with max_size = 0 (disabled work shedding) 464 + let adapter = SqliteQueueAdapter::<HandleResolutionWork>::with_max_size( 465 + pool, 466 + 0 467 + ); 468 + 469 + // Push many items (should not trigger any shedding) 470 + for i in 0..100 { 471 + let work = HandleResolutionWork::new(format!("test-{:03}", i)); 472 + adapter.push(work).await.expect("Push should succeed"); 473 + } 474 + 475 + // Verify all items are present (no shedding occurred) 476 + assert_eq!(adapter.depth().await, Some(100)); 477 + } 478 + 479 + #[tokio::test] 480 + async fn test_sqlite_queue_generic_work_type() { 481 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 482 + struct CustomWork { 483 + id: u64, 484 + name: String, 485 + data: Vec<i32>, 486 + } 487 + 488 + let pool = create_test_pool().await; 489 + let adapter = SqliteQueueAdapter::<CustomWork>::new(pool); 490 + 491 + let work = CustomWork { 492 + id: 123, 493 + name: "test_work".to_string(), 494 + data: vec![1, 2, 3, 4, 5], 495 + }; 496 + 497 + // Test push and pull 498 + adapter.push(work.clone()).await.unwrap(); 499 + let pulled = adapter.pull().await; 500 + assert_eq!(pulled, Some(work)); 501 + } 502 + }
+95
src/queue/work.rs
··· 1 + //! Work item types for queue processing. 2 + //! 3 + //! This module defines the various work item types that can be processed 4 + //! through the queue system, such as handle resolution requests. 5 + 6 + use serde::{Deserialize, Serialize}; 7 + 8 + /// Work item for handle resolution tasks. 9 + /// 10 + /// This structure represents a request to resolve an AT Protocol handle 11 + /// to its corresponding DID. It's the primary work type processed by 12 + /// the QuickDID service's background queue workers. 13 + /// 14 + /// # Examples 15 + /// 16 + /// ``` 17 + /// use quickdid::queue::HandleResolutionWork; 18 + /// 19 + /// let work = HandleResolutionWork::new("alice.bsky.social".to_string()); 20 + /// assert_eq!(work.handle, "alice.bsky.social"); 21 + /// ``` 22 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 23 + pub struct HandleResolutionWork { 24 + /// The AT Protocol handle to resolve (e.g., "alice.bsky.social") 25 + pub handle: String, 26 + } 27 + 28 + impl HandleResolutionWork { 29 + /// Create a new handle resolution work item. 30 + /// 31 + /// # Arguments 32 + /// 33 + /// * `handle` - The AT Protocol handle to resolve 34 + /// 35 + /// # Examples 36 + /// 37 + /// ``` 38 + /// use quickdid::queue::HandleResolutionWork; 39 + /// 40 + /// let work = HandleResolutionWork::new("alice.bsky.social".to_string()); 41 + /// ``` 42 + pub fn new(handle: String) -> Self { 43 + Self { handle } 44 + } 45 + } 46 + 47 + impl std::fmt::Display for HandleResolutionWork { 48 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 49 + write!(f, "HandleResolution({})", self.handle) 50 + } 51 + } 52 + 53 + #[cfg(test)] 54 + mod tests { 55 + use super::*; 56 + 57 + #[test] 58 + fn test_handle_resolution_work_creation() { 59 + let handle = "alice.example.com"; 60 + let work = HandleResolutionWork::new(handle.to_string()); 61 + assert_eq!(work.handle, handle); 62 + } 63 + 64 + #[test] 65 + fn test_handle_resolution_work_serialization() { 66 + let work = HandleResolutionWork::new("bob.example.com".to_string()); 67 + 68 + // Test JSON serialization (which is what we actually use in the queue adapters) 69 + let json = serde_json::to_string(&work).expect("Failed to serialize to JSON"); 70 + let deserialized: HandleResolutionWork = 71 + serde_json::from_str(&json).expect("Failed to deserialize from JSON"); 72 + assert_eq!(work, deserialized); 73 + 74 + // Verify the JSON structure 75 + let json_value: serde_json::Value = serde_json::from_str(&json).unwrap(); 76 + assert_eq!(json_value["handle"], "bob.example.com"); 77 + } 78 + 79 + #[test] 80 + fn test_handle_resolution_work_display() { 81 + let work = HandleResolutionWork::new("charlie.example.com".to_string()); 82 + let display = format!("{}", work); 83 + assert_eq!(display, "HandleResolution(charlie.example.com)"); 84 + } 85 + 86 + #[test] 87 + fn test_handle_resolution_work_equality() { 88 + let work1 = HandleResolutionWork::new("alice.example.com".to_string()); 89 + let work2 = HandleResolutionWork::new("alice.example.com".to_string()); 90 + let work3 = HandleResolutionWork::new("bob.example.com".to_string()); 91 + 92 + assert_eq!(work1, work2); 93 + assert_ne!(work1, work3); 94 + } 95 + }
-1626
src/queue_adapter.rs
··· 1 - //! Generic queue adapter system for work queue abstraction. 2 - //! 3 - //! This module provides a generic trait and implementations for queue adapters 4 - //! that can be used with any work type for handle resolution and other tasks. 5 - 6 - use async_trait::async_trait; 7 - use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands}; 8 - use serde::{Deserialize, Serialize}; 9 - use sqlx::{self, Row}; 10 - use std::sync::Arc; 11 - use thiserror::Error; 12 - use tokio::sync::{Mutex, mpsc}; 13 - use tracing::{debug, error, warn}; 14 - 15 - /// Queue operation errors 16 - #[derive(Error, Debug)] 17 - pub enum QueueError { 18 - #[error("error-quickdid-queue-1 Failed to push to queue: {0}")] 19 - PushFailed(String), 20 - 21 - #[error("error-quickdid-queue-2 Queue is full")] 22 - QueueFull, 23 - 24 - #[error("error-quickdid-queue-3 Queue is closed")] 25 - QueueClosed, 26 - 27 - #[error("error-quickdid-queue-4 Redis connection failed: {0}")] 28 - RedisConnectionFailed(String), 29 - 30 - #[error("error-quickdid-queue-5 Redis operation failed: {operation}: {details}")] 31 - RedisOperationFailed { operation: String, details: String }, 32 - 33 - #[error("error-quickdid-queue-6 Serialization failed: {0}")] 34 - SerializationFailed(String), 35 - 36 - #[error("error-quickdid-queue-7 Deserialization failed: {0}")] 37 - DeserializationFailed(String), 38 - 39 - #[error("error-quickdid-queue-8 Item not found in worker queue during acknowledgment")] 40 - AckItemNotFound, 41 - } 42 - 43 - type Result<T> = std::result::Result<T, QueueError>; 44 - 45 - /// Generic trait for queue adapters that can work with any work type. 46 - /// 47 - /// This trait provides a common interface for different queue implementations 48 - /// (MPSC, Redis, PostgreSQL, etc.) allowing them to be used interchangeably. 49 - #[async_trait] 50 - pub trait QueueAdapter<T>: Send + Sync 51 - where 52 - T: Send + Sync + 'static, 53 - { 54 - /// Pull the next work item from the queue. 55 - /// 56 - /// Returns None if the queue is closed or empty (depending on implementation). 57 - async fn pull(&self) -> Option<T>; 58 - 59 - /// Push a work item to the queue. 60 - /// 61 - /// Returns an error if the queue is full or closed. 62 - async fn push(&self, work: T) -> Result<()>; 63 - 64 - /// Acknowledge that a work item has been successfully processed. 65 - /// 66 - /// This is used by reliable queue implementations to remove the item 67 - /// from a temporary processing queue. Implementations that don't require 68 - /// acknowledgment (like MPSC) can use the default no-op implementation. 69 - async fn ack(&self, _item: &T) -> Result<()> { 70 - // Default no-op implementation for queues that don't need acknowledgment 71 - Ok(()) 72 - } 73 - 74 - /// Try to push a work item without blocking. 75 - /// 76 - /// Returns an error if the queue is full or closed. 77 - async fn try_push(&self, work: T) -> Result<()> { 78 - // Default implementation uses regular push 79 - self.push(work).await 80 - } 81 - 82 - /// Get the current queue depth if available. 83 - /// 84 - /// Returns None if the implementation doesn't support queue depth. 85 - async fn depth(&self) -> Option<usize> { 86 - None 87 - } 88 - 89 - /// Check if the queue is healthy. 90 - /// 91 - /// Used for health checks and monitoring. 92 - async fn is_healthy(&self) -> bool { 93 - true 94 - } 95 - } 96 - 97 - /// MPSC channel-based queue adapter implementation. 98 - /// 99 - /// This adapter uses tokio's multi-producer, single-consumer channel 100 - /// for in-memory queuing of work items. It's suitable for single-instance 101 - /// deployments with moderate throughput requirements. 102 - pub(crate) struct MpscQueueAdapter<T> 103 - where 104 - T: Send + Sync + 'static, 105 - { 106 - receiver: Arc<Mutex<mpsc::Receiver<T>>>, 107 - sender: mpsc::Sender<T>, 108 - } 109 - 110 - impl<T> MpscQueueAdapter<T> 111 - where 112 - T: Send + Sync + 'static, 113 - { 114 - /// Create a new MPSC queue adapter with the specified buffer size. 115 - pub(crate) fn new(buffer: usize) -> Self { 116 - let (sender, receiver) = mpsc::channel(buffer); 117 - Self { 118 - receiver: Arc::new(Mutex::new(receiver)), 119 - sender, 120 - } 121 - } 122 - 123 - /// Create an adapter from existing MPSC channels (for backward compatibility). 124 - pub(crate) fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self { 125 - Self { 126 - receiver: Arc::new(Mutex::new(receiver)), 127 - sender, 128 - } 129 - } 130 - } 131 - 132 - #[async_trait] 133 - impl<T> QueueAdapter<T> for MpscQueueAdapter<T> 134 - where 135 - T: Send + Sync + 'static, 136 - { 137 - async fn pull(&self) -> Option<T> { 138 - let mut receiver = self.receiver.lock().await; 139 - receiver.recv().await 140 - } 141 - 142 - async fn push(&self, work: T) -> Result<()> { 143 - self.sender 144 - .send(work) 145 - .await 146 - .map_err(|e| QueueError::PushFailed(e.to_string())) 147 - } 148 - 149 - async fn try_push(&self, work: T) -> Result<()> { 150 - self.sender.try_send(work).map_err(|e| match e { 151 - mpsc::error::TrySendError::Full(_) => QueueError::QueueFull, 152 - mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed, 153 - }) 154 - } 155 - 156 - async fn depth(&self) -> Option<usize> { 157 - // Note: This is an approximation as mpsc doesn't provide exact depth 158 - Some(self.sender.max_capacity() - self.sender.capacity()) 159 - } 160 - 161 - async fn is_healthy(&self) -> bool { 162 - !self.sender.is_closed() 163 - } 164 - } 165 - 166 - /// Work item for handle resolution tasks 167 - #[derive(Debug, Clone, Serialize, Deserialize)] 168 - pub struct HandleResolutionWork { 169 - /// The handle to resolve 170 - pub handle: String, 171 - } 172 - 173 - impl HandleResolutionWork { 174 - /// Create a new handle resolution work item 175 - pub fn new(handle: String) -> Self { 176 - Self { handle } 177 - } 178 - } 179 - 180 - /// Redis-backed queue adapter implementation. 181 - /// 182 - /// This adapter uses Redis lists with a reliable queue pattern: 183 - /// - LPUSH to push items to the primary queue 184 - /// - RPOPLPUSH to atomically move items from primary to worker queue 185 - /// - LREM to acknowledge processed items from worker queue 186 - /// 187 - /// This ensures at-least-once delivery semantics and allows for recovery 188 - /// of in-flight items if a worker crashes. 189 - pub(crate) struct RedisQueueAdapter<T> 190 - where 191 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 192 - { 193 - /// Redis connection pool 194 - pool: RedisPool, 195 - /// Unique worker ID for this adapter instance 196 - worker_id: String, 197 - /// Key prefix for all queues (default: "queue:handleresolver:") 198 - key_prefix: String, 199 - /// Timeout for blocking RPOPLPUSH operations 200 - timeout_seconds: u64, 201 - /// Type marker for generic parameter 202 - _phantom: std::marker::PhantomData<T>, 203 - } 204 - 205 - impl<T> RedisQueueAdapter<T> 206 - where 207 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 208 - { 209 - /// Create a new Redis queue adapter with custom configuration 210 - fn with_config( 211 - pool: RedisPool, 212 - worker_id: String, 213 - key_prefix: String, 214 - timeout_seconds: u64, 215 - ) -> Self { 216 - Self { 217 - pool, 218 - worker_id, 219 - key_prefix, 220 - timeout_seconds, 221 - _phantom: std::marker::PhantomData, 222 - } 223 - } 224 - 225 - /// Get the primary queue key 226 - fn primary_queue_key(&self) -> String { 227 - format!("{}primary", self.key_prefix) 228 - } 229 - 230 - /// Get the worker-specific temporary queue key 231 - fn worker_queue_key(&self) -> String { 232 - format!("{}{}", self.key_prefix, self.worker_id) 233 - } 234 - } 235 - 236 - #[async_trait] 237 - impl<T> QueueAdapter<T> for RedisQueueAdapter<T> 238 - where 239 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 240 - { 241 - async fn pull(&self) -> Option<T> { 242 - match self.pool.get().await { 243 - Ok(mut conn) => { 244 - let primary_key = self.primary_queue_key(); 245 - let worker_key = self.worker_queue_key(); 246 - 247 - // Use blocking RPOPLPUSH to atomically move item from primary to worker queue 248 - let data: Option<Vec<u8>> = match conn 249 - .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64) 250 - .await 251 - { 252 - Ok(data) => data, 253 - Err(e) => { 254 - error!("Failed to pull from queue: {}", e); 255 - return None; 256 - } 257 - }; 258 - 259 - if let Some(data) = data { 260 - // Deserialize the item 261 - match serde_json::from_slice(&data) { 262 - Ok(item) => { 263 - debug!( 264 - worker_id = %self.worker_id, 265 - "Pulled item from queue" 266 - ); 267 - Some(item) 268 - } 269 - Err(e) => { 270 - error!("Failed to deserialize item: {}", e); 271 - // Remove the corrupted item from worker queue 272 - let _: std::result::Result<(), _> = 273 - conn.lrem(&worker_key, 1, &data).await; 274 - None 275 - } 276 - } 277 - } else { 278 - None 279 - } 280 - } 281 - Err(e) => { 282 - error!("Failed to get Redis connection: {}", e); 283 - None 284 - } 285 - } 286 - } 287 - 288 - async fn push(&self, work: T) -> Result<()> { 289 - let mut conn = self 290 - .pool 291 - .get() 292 - .await 293 - .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 294 - 295 - let data = serde_json::to_vec(&work) 296 - .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 297 - 298 - let primary_key = self.primary_queue_key(); 299 - 300 - conn.lpush::<_, _, ()>(&primary_key, data) 301 - .await 302 - .map_err(|e| QueueError::RedisOperationFailed { 303 - operation: "LPUSH".to_string(), 304 - details: e.to_string(), 305 - })?; 306 - 307 - debug!("Pushed item to queue"); 308 - Ok(()) 309 - } 310 - 311 - async fn ack(&self, item: &T) -> Result<()> { 312 - let mut conn = self 313 - .pool 314 - .get() 315 - .await 316 - .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 317 - 318 - let data = 319 - serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 320 - 321 - let worker_key = self.worker_queue_key(); 322 - 323 - // Remove exactly one occurrence of this item from the worker queue 324 - let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| { 325 - QueueError::RedisOperationFailed { 326 - operation: "LREM".to_string(), 327 - details: e.to_string(), 328 - } 329 - })?; 330 - 331 - if removed == 0 { 332 - warn!( 333 - worker_id = %self.worker_id, 334 - "Item not found in worker queue during acknowledgment" 335 - ); 336 - } else { 337 - debug!( 338 - worker_id = %self.worker_id, 339 - "Acknowledged item" 340 - ); 341 - } 342 - 343 - Ok(()) 344 - } 345 - 346 - async fn depth(&self) -> Option<usize> { 347 - match self.pool.get().await { 348 - Ok(mut conn) => { 349 - let primary_key = self.primary_queue_key(); 350 - match conn.llen::<_, usize>(&primary_key).await { 351 - Ok(len) => Some(len), 352 - Err(e) => { 353 - error!("Failed to get queue depth: {}", e); 354 - None 355 - } 356 - } 357 - } 358 - Err(e) => { 359 - error!("Failed to get Redis connection: {}", e); 360 - None 361 - } 362 - } 363 - } 364 - 365 - async fn is_healthy(&self) -> bool { 366 - match self.pool.get().await { 367 - Ok(mut conn) => { 368 - // Ping Redis to check health 369 - match deadpool_redis::redis::cmd("PING") 370 - .query_async::<String>(&mut conn) 371 - .await 372 - { 373 - Ok(response) => response == "PONG", 374 - Err(_) => false, 375 - } 376 - } 377 - Err(_) => false, 378 - } 379 - } 380 - } 381 - 382 - /// No-operation queue adapter that discards all work items. 383 - /// 384 - /// This adapter is useful for configurations where queuing is disabled 385 - /// or as a fallback when other queue adapters fail to initialize. 386 - pub(crate) struct NoopQueueAdapter<T> 387 - where 388 - T: Send + Sync + 'static, 389 - { 390 - _phantom: std::marker::PhantomData<T>, 391 - } 392 - 393 - impl<T> NoopQueueAdapter<T> 394 - where 395 - T: Send + Sync + 'static, 396 - { 397 - /// Create a new no-op queue adapter 398 - pub(crate) fn new() -> Self { 399 - Self { 400 - _phantom: std::marker::PhantomData, 401 - } 402 - } 403 - } 404 - 405 - impl<T> Default for NoopQueueAdapter<T> 406 - where 407 - T: Send + Sync + 'static, 408 - { 409 - fn default() -> Self { 410 - Self::new() 411 - } 412 - } 413 - 414 - #[async_trait] 415 - impl<T> QueueAdapter<T> for NoopQueueAdapter<T> 416 - where 417 - T: Send + Sync + 'static, 418 - { 419 - async fn pull(&self) -> Option<T> { 420 - // Never returns any work 421 - tokio::time::sleep(std::time::Duration::from_secs(60)).await; 422 - None 423 - } 424 - 425 - async fn push(&self, _work: T) -> Result<()> { 426 - // Silently discard the work 427 - Ok(()) 428 - } 429 - 430 - async fn ack(&self, _item: &T) -> Result<()> { 431 - // No-op 432 - Ok(()) 433 - } 434 - 435 - async fn try_push(&self, _work: T) -> Result<()> { 436 - // Silently discard the work 437 - Ok(()) 438 - } 439 - 440 - async fn depth(&self) -> Option<usize> { 441 - // Always empty 442 - Some(0) 443 - } 444 - 445 - async fn is_healthy(&self) -> bool { 446 - // Always healthy 447 - true 448 - } 449 - } 450 - 451 - /// SQLite-backed queue adapter implementation. 452 - /// 453 - /// This adapter uses SQLite database for persistent queuing of work items. 454 - /// It's suitable for single-instance deployments that need persistence 455 - /// across service restarts while remaining lightweight. 456 - /// 457 - /// # Features 458 - /// 459 - /// - Persistent queuing across service restarts 460 - /// - Simple FIFO ordering based on insertion time 461 - /// - Single consumer design (no complex locking needed) 462 - /// - Simple pull-and-delete semantics 463 - /// - Work shedding to prevent unbounded queue growth 464 - /// 465 - /// # Work Shedding 466 - /// 467 - /// When `max_size` is configured (> 0), the adapter implements work shedding: 468 - /// - New work items are always accepted 469 - /// - When the queue exceeds `max_size`, oldest entries are automatically deleted 470 - /// - This maintains the most recent work items while preventing unbounded growth 471 - /// - Essential for long-running deployments to avoid disk space issues 472 - pub(crate) struct SqliteQueueAdapter<T> 473 - where 474 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 475 - { 476 - /// SQLite connection pool 477 - pool: sqlx::SqlitePool, 478 - /// Maximum queue size (0 = unlimited) 479 - /// When exceeded, oldest entries are deleted to maintain this limit 480 - max_size: u64, 481 - /// Type marker for generic parameter 482 - _phantom: std::marker::PhantomData<T>, 483 - } 484 - 485 - impl<T> SqliteQueueAdapter<T> 486 - where 487 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 488 - { 489 - /// Create a new SQLite queue adapter with unlimited queue size. 490 - pub(crate) fn new(pool: sqlx::SqlitePool) -> Self { 491 - Self::with_max_size(pool, 0) 492 - } 493 - 494 - /// Create a new SQLite queue adapter with specified maximum queue size. 495 - /// 496 - /// # Arguments 497 - /// 498 - /// * `pool` - SQLite connection pool 499 - /// * `max_size` - Maximum number of entries in queue (0 = unlimited) 500 - /// 501 - /// # Work Shedding Behavior 502 - /// 503 - /// When `max_size` > 0: 504 - /// - New work items are always accepted 505 - /// - If queue size exceeds `max_size` after insertion, oldest entries are deleted 506 - /// - This preserves the most recent work while preventing unbounded growth 507 - pub(crate) fn with_max_size(pool: sqlx::SqlitePool, max_size: u64) -> Self { 508 - Self { 509 - pool, 510 - max_size, 511 - _phantom: std::marker::PhantomData, 512 - } 513 - } 514 - } 515 - 516 - #[async_trait] 517 - impl<T> QueueAdapter<T> for SqliteQueueAdapter<T> 518 - where 519 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 520 - { 521 - async fn pull(&self) -> Option<T> { 522 - // Get the oldest queued item and delete it in a transaction 523 - let mut transaction = match self.pool.begin().await { 524 - Ok(tx) => tx, 525 - Err(e) => { 526 - error!("Failed to start SQLite transaction: {}", e); 527 - return None; 528 - } 529 - }; 530 - 531 - // Select the oldest queued item 532 - let record = match sqlx::query( 533 - "SELECT id, work FROM handle_resolution_queue 534 - ORDER BY queued_at ASC 535 - LIMIT 1" 536 - ) 537 - .fetch_optional(&mut *transaction) 538 - .await 539 - { 540 - Ok(Some(row)) => row, 541 - Ok(None) => { 542 - // No queued items available 543 - debug!("No queued items available in SQLite queue"); 544 - return None; 545 - } 546 - Err(e) => { 547 - error!("Failed to query SQLite queue: {}", e); 548 - return None; 549 - } 550 - }; 551 - 552 - let item_id: i64 = record.get("id"); 553 - let work_json: String = record.get("work"); 554 - 555 - // Delete the item from the queue 556 - if let Err(e) = sqlx::query("DELETE FROM handle_resolution_queue WHERE id = ?1") 557 - .bind(item_id) 558 - .execute(&mut *transaction) 559 - .await 560 - { 561 - error!("Failed to delete item from queue: {}", e); 562 - return None; 563 - } 564 - 565 - // Commit the transaction 566 - if let Err(e) = transaction.commit().await { 567 - error!("Failed to commit SQLite transaction: {}", e); 568 - return None; 569 - } 570 - 571 - // Deserialize the work item from JSON 572 - match serde_json::from_str(&work_json) { 573 - Ok(work) => { 574 - debug!("Pulled work item from SQLite queue"); 575 - Some(work) 576 - } 577 - Err(e) => { 578 - error!("Failed to deserialize work item: {}", e); 579 - None 580 - } 581 - } 582 - } 583 - 584 - async fn push(&self, work: T) -> Result<()> { 585 - // Serialize the entire work item as JSON 586 - let work_json = serde_json::to_string(&work) 587 - .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 588 - 589 - let current_timestamp = std::time::SystemTime::now() 590 - .duration_since(std::time::UNIX_EPOCH) 591 - .unwrap_or_default() 592 - .as_secs() as i64; 593 - 594 - // Optimized approach: Insert first, then check if cleanup needed 595 - // This avoids counting on every insert 596 - sqlx::query( 597 - "INSERT INTO handle_resolution_queue (work, queued_at) VALUES (?1, ?2)" 598 - ) 599 - .bind(&work_json) 600 - .bind(current_timestamp) 601 - .execute(&self.pool) 602 - .await 603 - .map_err(|e| QueueError::PushFailed(format!("Failed to insert work item: {}", e)))?; 604 - 605 - // Implement optimized work shedding if max_size is configured 606 - if self.max_size > 0 { 607 - // Optimized approach: Only check and clean periodically or when likely over limit 608 - // Use a limited count to avoid full table scan 609 - let check_limit = self.max_size as i64 + (self.max_size as i64 / 10).max(1); // Check 10% over limit 610 - let approx_count: Option<i64> = sqlx::query_scalar( 611 - "SELECT COUNT(*) FROM ( 612 - SELECT 1 FROM handle_resolution_queue LIMIT ?1 613 - ) AS limited_count" 614 - ) 615 - .bind(check_limit) 616 - .fetch_one(&self.pool) 617 - .await 618 - .map_err(|e| QueueError::PushFailed(format!("Failed to check queue size: {}", e)))?; 619 - 620 - // Only perform cleanup if we're definitely over the limit 621 - if let Some(count) = approx_count && count >= check_limit { 622 - // Perform batch cleanup - delete more than just the excess to reduce frequency 623 - // Delete 20% more than needed to avoid frequent shedding 624 - let target_size = (self.max_size as f64 * 0.8) as i64; // Keep 80% of max_size 625 - let to_delete = count - target_size; 626 - 627 - if to_delete > 0 { 628 - // Optimized deletion: First get the cutoff id and timestamp 629 - // This avoids the expensive subquery in the DELETE statement 630 - let cutoff: Option<(i64, i64)> = sqlx::query_as( 631 - "SELECT id, queued_at FROM handle_resolution_queue 632 - ORDER BY queued_at ASC, id ASC 633 - LIMIT 1 OFFSET ?1" 634 - ) 635 - .bind(to_delete - 1) 636 - .fetch_optional(&self.pool) 637 - .await 638 - .map_err(|e| QueueError::PushFailed(format!("Failed to find cutoff: {}", e)))?; 639 - 640 - if let Some((cutoff_id, cutoff_timestamp)) = cutoff { 641 - // Delete entries older than cutoff, or equal timestamp with lower id 642 - // This handles the case where multiple entries have the same timestamp 643 - let deleted_result = sqlx::query( 644 - "DELETE FROM handle_resolution_queue 645 - WHERE queued_at < ?1 646 - OR (queued_at = ?1 AND id <= ?2)" 647 - ) 648 - .bind(cutoff_timestamp) 649 - .bind(cutoff_id) 650 - .execute(&self.pool) 651 - .await 652 - .map_err(|e| QueueError::PushFailed(format!("Failed to delete excess entries: {}", e)))?; 653 - 654 - let deleted_count = deleted_result.rows_affected(); 655 - if deleted_count > 0 { 656 - tracing::info!( 657 - "Work shedding: deleted {} oldest entries (target size: {}, max: {})", 658 - deleted_count, 659 - target_size, 660 - self.max_size 661 - ); 662 - } 663 - } 664 - } 665 - } 666 - } 667 - 668 - debug!("Pushed work item to SQLite queue (max_size: {})", self.max_size); 669 - Ok(()) 670 - } 671 - 672 - async fn ack(&self, _item: &T) -> Result<()> { 673 - // With the simplified SQLite queue design, items are deleted when pulled, 674 - // so acknowledgment is a no-op (item is already processed and removed) 675 - debug!("Acknowledged work item in SQLite queue (no-op)"); 676 - Ok(()) 677 - } 678 - 679 - async fn depth(&self) -> Option<usize> { 680 - match sqlx::query_scalar::<_, i64>( 681 - "SELECT COUNT(*) FROM handle_resolution_queue" 682 - ) 683 - .fetch_one(&self.pool) 684 - .await 685 - { 686 - Ok(count) => Some(count as usize), 687 - Err(e) => { 688 - warn!("Failed to get SQLite queue depth: {}", e); 689 - None 690 - } 691 - } 692 - } 693 - 694 - async fn is_healthy(&self) -> bool { 695 - // Test the connection by running a simple query 696 - sqlx::query_scalar::<_, i64>("SELECT 1") 697 - .fetch_one(&self.pool) 698 - .await 699 - .map(|_| true) 700 - .unwrap_or(false) 701 - } 702 - } 703 - 704 - // ========= Factory Functions for Queue Adapters ========= 705 - 706 - /// Create a new MPSC queue adapter with the specified buffer size. 707 - /// 708 - /// This creates an in-memory queue suitable for single-instance deployments. 709 - /// 710 - /// # Arguments 711 - /// 712 - /// * `buffer` - The buffer size for the channel 713 - pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>> 714 - where 715 - T: Send + Sync + 'static, 716 - { 717 - Arc::new(MpscQueueAdapter::new(buffer)) 718 - } 719 - 720 - /// Create an MPSC queue adapter from existing channels. 721 - /// 722 - /// This allows integration with existing channel-based architectures. 723 - /// 724 - /// # Arguments 725 - /// 726 - /// * `sender` - The sender half of the channel 727 - /// * `receiver` - The receiver half of the channel 728 - pub fn create_mpsc_queue_from_channel<T>( 729 - sender: mpsc::Sender<T>, 730 - receiver: mpsc::Receiver<T>, 731 - ) -> Arc<dyn QueueAdapter<T>> 732 - where 733 - T: Send + Sync + 'static, 734 - { 735 - Arc::new(MpscQueueAdapter::from_channel(sender, receiver)) 736 - } 737 - 738 - /// Create a new Redis-backed queue adapter. 739 - /// 740 - /// This creates a distributed queue suitable for multi-instance deployments. 741 - /// 742 - /// # Arguments 743 - /// 744 - /// * `pool` - Redis connection pool 745 - /// * `worker_id` - Worker identifier for this queue instance 746 - /// * `key_prefix` - Redis key prefix for queue operations 747 - /// * `timeout_seconds` - Timeout for blocking operations 748 - pub fn create_redis_queue<T>( 749 - pool: RedisPool, 750 - worker_id: String, 751 - key_prefix: String, 752 - timeout_seconds: u64, 753 - ) -> Arc<dyn QueueAdapter<T>> 754 - where 755 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 756 - { 757 - Arc::new(RedisQueueAdapter::with_config( 758 - pool, 759 - worker_id, 760 - key_prefix, 761 - timeout_seconds, 762 - )) 763 - } 764 - 765 - /// Create a no-operation queue adapter. 766 - /// 767 - /// This creates a queue that discards all work items, useful for testing 768 - /// or when queue processing is disabled. 769 - pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>> 770 - where 771 - T: Send + Sync + 'static, 772 - { 773 - Arc::new(NoopQueueAdapter::new()) 774 - } 775 - 776 - /// Create a new SQLite queue adapter with unlimited queue size. 777 - /// 778 - /// This creates a persistent queue backed by SQLite database suitable 779 - /// for single-instance deployments that need persistence across restarts. 780 - /// The queue has no size limit and may grow unbounded. 781 - /// 782 - /// # Arguments 783 - /// 784 - /// * `pool` - SQLite connection pool 785 - /// 786 - /// # Example 787 - /// 788 - /// ```no_run 789 - /// use quickdid::queue_adapter::{create_sqlite_queue, HandleResolutionWork}; 790 - /// use quickdid::sqlite_schema::create_sqlite_pool; 791 - /// use std::sync::Arc; 792 - /// 793 - /// # async fn example() -> anyhow::Result<()> { 794 - /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 795 - /// let queue = create_sqlite_queue::<HandleResolutionWork>(pool); 796 - /// # Ok(()) 797 - /// # } 798 - /// ``` 799 - pub fn create_sqlite_queue<T>(pool: sqlx::SqlitePool) -> Arc<dyn QueueAdapter<T>> 800 - where 801 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 802 - { 803 - Arc::new(SqliteQueueAdapter::new(pool)) 804 - } 805 - 806 - /// Create a new SQLite queue adapter with work shedding. 807 - /// 808 - /// This creates a persistent queue with configurable maximum size. 809 - /// When the queue exceeds `max_size`, the oldest entries are automatically 810 - /// deleted to maintain the limit, preserving the most recent work items. 811 - /// 812 - /// # Arguments 813 - /// 814 - /// * `pool` - SQLite connection pool 815 - /// * `max_size` - Maximum number of entries (0 = unlimited) 816 - /// 817 - /// # Work Shedding Behavior 818 - /// 819 - /// - New work items are always accepted 820 - /// - When queue size exceeds `max_size`, oldest entries are deleted 821 - /// - Deletion happens atomically with insertion in a single transaction 822 - /// - Essential for long-running deployments to prevent disk space issues 823 - /// 824 - /// # Example 825 - /// 826 - /// ```no_run 827 - /// use quickdid::queue_adapter::{create_sqlite_queue_with_max_size, HandleResolutionWork}; 828 - /// use quickdid::sqlite_schema::create_sqlite_pool; 829 - /// use std::sync::Arc; 830 - /// 831 - /// # async fn example() -> anyhow::Result<()> { 832 - /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 833 - /// // Limit queue to 10,000 entries with automatic work shedding 834 - /// let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 10000); 835 - /// # Ok(()) 836 - /// # } 837 - /// ``` 838 - pub fn create_sqlite_queue_with_max_size<T>( 839 - pool: sqlx::SqlitePool, 840 - max_size: u64, 841 - ) -> Arc<dyn QueueAdapter<T>> 842 - where 843 - T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 844 - { 845 - Arc::new(SqliteQueueAdapter::with_max_size(pool, max_size)) 846 - } 847 - 848 - #[cfg(test)] 849 - mod tests { 850 - use super::*; 851 - 852 - #[tokio::test] 853 - async fn test_mpsc_queue_adapter_push_pull() { 854 - let adapter = Arc::new(MpscQueueAdapter::<String>::new(10)); 855 - 856 - // Test push 857 - adapter.push("test".to_string()).await.unwrap(); 858 - 859 - // Test pull 860 - let pulled = adapter.pull().await; 861 - assert!(pulled.is_some()); 862 - assert_eq!(pulled.unwrap(), "test"); 863 - } 864 - 865 - #[tokio::test] 866 - async fn test_handle_resolution_work() { 867 - let work = HandleResolutionWork::new("alice.example.com".to_string()); 868 - 869 - assert_eq!(work.handle, "alice.example.com"); 870 - } 871 - 872 - #[tokio::test] 873 - async fn test_redis_queue_adapter_push_pull() { 874 - let pool = match crate::test_helpers::get_test_redis_pool() { 875 - Some(p) => p, 876 - None => return, 877 - }; 878 - 879 - // Create adapter with unique prefix for testing 880 - let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); 881 - let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 882 - pool.clone(), 883 - "test-worker".to_string(), 884 - test_prefix.clone(), 885 - 1, // 1 second timeout for tests 886 - )); 887 - 888 - // Test push 889 - adapter.push("test-item".to_string()).await.unwrap(); 890 - 891 - // Test pull 892 - let pulled = adapter.pull().await; 893 - assert!(pulled.is_some()); 894 - assert_eq!(pulled.unwrap(), "test-item"); 895 - 896 - // Test ack 897 - adapter 898 - .ack(&"test-item".to_string()) 899 - .await 900 - .expect("Ack should succeed"); 901 - 902 - // Clean up test data - manually clean worker queue since cleanup was removed 903 - // In production, items would timeout or be processed 904 - } 905 - 906 - #[tokio::test] 907 - async fn test_redis_queue_adapter_reliable_queue() { 908 - let pool = match crate::test_helpers::get_test_redis_pool() { 909 - Some(p) => p, 910 - None => return, 911 - }; 912 - 913 - let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); 914 - let worker_id = "test-worker-reliable"; 915 - 916 - // Create first adapter 917 - let adapter1 = Arc::new(RedisQueueAdapter::<String>::with_config( 918 - pool.clone(), 919 - worker_id.to_string(), 920 - test_prefix.clone(), 921 - 1, 922 - )); 923 - 924 - // Push multiple items 925 - adapter1.push("item1".to_string()).await.unwrap(); 926 - adapter1.push("item2".to_string()).await.unwrap(); 927 - adapter1.push("item3".to_string()).await.unwrap(); 928 - 929 - // Pull but don't ack (simulating worker crash) 930 - let item1 = adapter1.pull().await; 931 - assert!(item1.is_some()); 932 - assert_eq!(item1.unwrap(), "item1"); 933 - 934 - // Create second adapter with same worker_id (simulating restart) 935 - let adapter2 = Arc::new(RedisQueueAdapter::<String>::with_config( 936 - pool.clone(), 937 - worker_id.to_string(), 938 - test_prefix.clone(), 939 - 1, 940 - )); 941 - 942 - // In a real scenario, unacked items would be handled by timeout or manual recovery 943 - // For this test, we just verify the item is in the worker queue 944 - let recovered = adapter2.pull().await; 945 - assert!(recovered.is_some()); 946 - } 947 - 948 - #[tokio::test] 949 - async fn test_redis_queue_adapter_depth() { 950 - let pool = match crate::test_helpers::get_test_redis_pool() { 951 - Some(p) => p, 952 - None => return, 953 - }; 954 - 955 - let test_prefix = format!("test:queue:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()); 956 - let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 957 - pool.clone(), 958 - "test-worker-depth".to_string(), 959 - test_prefix.clone(), 960 - 1, 961 - )); 962 - 963 - // Initially empty 964 - let depth = adapter.depth().await; 965 - assert_eq!(depth, Some(0)); 966 - 967 - // Push items and check depth 968 - adapter.push("item1".to_string()).await.unwrap(); 969 - assert_eq!(adapter.depth().await, Some(1)); 970 - 971 - adapter.push("item2".to_string()).await.unwrap(); 972 - assert_eq!(adapter.depth().await, Some(2)); 973 - 974 - // Pull and check depth decreases 975 - let _ = adapter.pull().await; 976 - // Note: depth checks primary queue, not worker queue 977 - assert_eq!(adapter.depth().await, Some(1)); 978 - 979 - // Test cleanup is automatic when adapter is dropped 980 - } 981 - 982 - #[tokio::test] 983 - async fn test_redis_queue_adapter_health() { 984 - let pool = match crate::test_helpers::get_test_redis_pool() { 985 - Some(p) => p, 986 - None => return, 987 - }; 988 - 989 - let adapter = Arc::new(RedisQueueAdapter::<String>::with_config( 990 - pool, 991 - "test-worker-health".to_string(), 992 - "test:queue:health:".to_string(), 993 - 1, 994 - )); 995 - 996 - // Should be healthy if Redis is running 997 - assert!(adapter.is_healthy().await); 998 - } 999 - 1000 - #[tokio::test] 1001 - async fn test_sqlite_queue_adapter_push_pull() { 1002 - // Create in-memory SQLite database for testing 1003 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1004 - .await 1005 - .expect("Failed to connect to in-memory SQLite"); 1006 - 1007 - // Create the queue schema 1008 - crate::sqlite_schema::create_schema(&pool) 1009 - .await 1010 - .expect("Failed to create schema"); 1011 - 1012 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::new(pool.clone())); 1013 - 1014 - let test_handle = "alice.example.com"; 1015 - let work = HandleResolutionWork { 1016 - handle: test_handle.to_string(), 1017 - }; 1018 - 1019 - // Test push 1020 - adapter.push(work.clone()).await.unwrap(); 1021 - 1022 - // Verify the record is actually in the database 1023 - let records: Vec<(i64, String, i64)> = sqlx::query_as( 1024 - "SELECT id, work, queued_at FROM handle_resolution_queue ORDER BY queued_at ASC" 1025 - ) 1026 - .fetch_all(&pool) 1027 - .await 1028 - .expect("Failed to query database"); 1029 - 1030 - assert_eq!(records.len(), 1); 1031 - let (db_id, db_work_json, db_queued_at) = &records[0]; 1032 - assert!(*db_id > 0); 1033 - assert!(*db_queued_at > 0); 1034 - 1035 - // Verify the JSON content 1036 - let stored_work: HandleResolutionWork = serde_json::from_str(db_work_json) 1037 - .expect("Failed to deserialize stored work"); 1038 - assert_eq!(stored_work.handle, test_handle); 1039 - 1040 - // Verify depth 1041 - assert_eq!(adapter.depth().await, Some(1)); 1042 - 1043 - // Test pull 1044 - let pulled = adapter.pull().await; 1045 - assert!(pulled.is_some()); 1046 - let pulled_work = pulled.unwrap(); 1047 - assert_eq!(pulled_work.handle, test_handle); 1048 - 1049 - // Verify the record was deleted from database after pull 1050 - let records_after_pull: Vec<(i64, String, i64)> = sqlx::query_as( 1051 - "SELECT id, work, queued_at FROM handle_resolution_queue" 1052 - ) 1053 - .fetch_all(&pool) 1054 - .await 1055 - .expect("Failed to query database after pull"); 1056 - 1057 - assert_eq!(records_after_pull.len(), 0); 1058 - 1059 - // Test ack - should be no-op since item already deleted 1060 - adapter.ack(&pulled_work).await.expect("Ack should succeed"); 1061 - 1062 - // Verify depth after ack 1063 - assert_eq!(adapter.depth().await, Some(0)); 1064 - 1065 - // Verify no more items to pull 1066 - let empty_pull = adapter.pull().await; 1067 - assert!(empty_pull.is_none()); 1068 - } 1069 - 1070 - #[tokio::test] 1071 - async fn test_sqlite_queue_adapter_multiple_items() { 1072 - // Create in-memory SQLite database for testing 1073 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1074 - .await 1075 - .expect("Failed to connect to in-memory SQLite"); 1076 - 1077 - // Create the queue schema 1078 - crate::sqlite_schema::create_schema(&pool) 1079 - .await 1080 - .expect("Failed to create schema"); 1081 - 1082 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::new(pool.clone())); 1083 - 1084 - // Push multiple items 1085 - let handles = vec!["alice.example.com", "bob.example.com", "charlie.example.com"]; 1086 - for handle in &handles { 1087 - let work = HandleResolutionWork { 1088 - handle: handle.to_string(), 1089 - }; 1090 - adapter.push(work).await.unwrap(); 1091 - } 1092 - 1093 - // Verify all records are in database with correct order 1094 - let records: Vec<(i64, String, i64)> = sqlx::query_as( 1095 - "SELECT id, work, queued_at FROM handle_resolution_queue ORDER BY queued_at ASC" 1096 - ) 1097 - .fetch_all(&pool) 1098 - .await 1099 - .expect("Failed to query database"); 1100 - 1101 - assert_eq!(records.len(), 3); 1102 - 1103 - // Verify FIFO ordering by timestamp 1104 - assert!(records[0].2 <= records[1].2); // queued_at timestamps should be in order 1105 - assert!(records[1].2 <= records[2].2); 1106 - 1107 - // Verify JSON content matches expected handles 1108 - for (i, (db_id, db_work_json, _)) in records.iter().enumerate() { 1109 - assert!(*db_id > 0); 1110 - let stored_work: HandleResolutionWork = serde_json::from_str(db_work_json) 1111 - .expect("Failed to deserialize stored work"); 1112 - assert_eq!(stored_work.handle, handles[i]); 1113 - } 1114 - 1115 - // Verify depth 1116 - assert_eq!(adapter.depth().await, Some(3)); 1117 - 1118 - // Pull items in FIFO order and verify database state changes 1119 - for (i, expected_handle) in handles.iter().enumerate() { 1120 - let pulled = adapter.pull().await; 1121 - assert!(pulled.is_some()); 1122 - let pulled_work = pulled.unwrap(); 1123 - assert_eq!(pulled_work.handle, *expected_handle); 1124 - 1125 - // Verify database now has one fewer record 1126 - let remaining_count: i64 = sqlx::query_scalar( 1127 - "SELECT COUNT(*) FROM handle_resolution_queue" 1128 - ) 1129 - .fetch_one(&pool) 1130 - .await 1131 - .expect("Failed to count remaining records"); 1132 - assert_eq!(remaining_count, (handles.len() - i - 1) as i64); 1133 - 1134 - // Ack the item (no-op) 1135 - adapter.ack(&pulled_work).await.expect("Ack should succeed"); 1136 - } 1137 - 1138 - // Verify queue is empty in both adapter and database 1139 - assert_eq!(adapter.depth().await, Some(0)); 1140 - 1141 - let final_records: Vec<(i64, String, i64)> = sqlx::query_as( 1142 - "SELECT id, work, queued_at FROM handle_resolution_queue" 1143 - ) 1144 - .fetch_all(&pool) 1145 - .await 1146 - .expect("Failed to query database"); 1147 - assert_eq!(final_records.len(), 0); 1148 - 1149 - let empty_pull = adapter.pull().await; 1150 - assert!(empty_pull.is_none()); 1151 - } 1152 - 1153 - #[tokio::test] 1154 - async fn test_sqlite_queue_adapter_simple_pull_delete() { 1155 - // Create in-memory SQLite database for testing 1156 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1157 - .await 1158 - .expect("Failed to connect to in-memory SQLite"); 1159 - 1160 - // Create the queue schema 1161 - crate::sqlite_schema::create_schema(&pool) 1162 - .await 1163 - .expect("Failed to create schema"); 1164 - 1165 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::new(pool.clone())); 1166 - 1167 - let test_handle = "simple.example.com"; 1168 - let work = HandleResolutionWork { 1169 - handle: test_handle.to_string(), 1170 - }; 1171 - 1172 - // Push item 1173 - adapter.push(work.clone()).await.unwrap(); 1174 - 1175 - // Verify the record exists in database with correct JSON 1176 - let records: Vec<(i64, String, i64)> = sqlx::query_as( 1177 - "SELECT id, work, queued_at FROM handle_resolution_queue" 1178 - ) 1179 - .fetch_all(&pool) 1180 - .await 1181 - .expect("Failed to query database"); 1182 - 1183 - assert_eq!(records.len(), 1); 1184 - let (db_id, db_work_json, db_queued_at) = &records[0]; 1185 - assert!(*db_id > 0); 1186 - assert!(*db_queued_at > 0); 1187 - 1188 - // Verify JSON content 1189 - let stored_work: HandleResolutionWork = serde_json::from_str(db_work_json) 1190 - .expect("Failed to deserialize stored work"); 1191 - assert_eq!(stored_work.handle, test_handle); 1192 - 1193 - // Verify item is in queue using schema stats 1194 - let total_before = crate::sqlite_schema::get_queue_stats(&pool) 1195 - .await 1196 - .expect("Failed to get queue stats"); 1197 - assert_eq!(total_before, 1); 1198 - 1199 - // Pull item (should delete it immediately) 1200 - let pulled = adapter.pull().await; 1201 - assert!(pulled.is_some()); 1202 - let pulled_work = pulled.unwrap(); 1203 - assert_eq!(pulled_work.handle, test_handle); 1204 - 1205 - // Verify the record was deleted from database 1206 - let records_after_pull: Vec<(i64, String, i64)> = sqlx::query_as( 1207 - "SELECT id, work, queued_at FROM handle_resolution_queue" 1208 - ) 1209 - .fetch_all(&pool) 1210 - .await 1211 - .expect("Failed to query database after pull"); 1212 - assert_eq!(records_after_pull.len(), 0); 1213 - 1214 - // Verify that pulling again returns None (no records left) 1215 - let empty_pull = adapter.pull().await; 1216 - assert!(empty_pull.is_none()); 1217 - 1218 - // Verify queue is now empty after pull using schema stats 1219 - let total_after = crate::sqlite_schema::get_queue_stats(&pool) 1220 - .await 1221 - .expect("Failed to get queue stats"); 1222 - assert_eq!(total_after, 0); 1223 - 1224 - // Ack the item (should be no-op) 1225 - adapter.ack(&pulled_work).await.expect("Ack should succeed"); 1226 - 1227 - // Verify queue is still empty using schema stats 1228 - let total_final = crate::sqlite_schema::get_queue_stats(&pool) 1229 - .await 1230 - .expect("Failed to get queue stats after ack"); 1231 - assert_eq!(total_final, 0); 1232 - } 1233 - 1234 - #[tokio::test] 1235 - async fn test_sqlite_queue_adapter_health() { 1236 - // Create in-memory SQLite database for testing 1237 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1238 - .await 1239 - .expect("Failed to connect to in-memory SQLite"); 1240 - 1241 - // Create the queue schema 1242 - crate::sqlite_schema::create_schema(&pool) 1243 - .await 1244 - .expect("Failed to create schema"); 1245 - 1246 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::new(pool)); 1247 - 1248 - // Should be healthy if SQLite is working 1249 - assert!(adapter.is_healthy().await); 1250 - } 1251 - 1252 - #[tokio::test] 1253 - async fn test_sqlite_queue_adapter_ack_no_op() { 1254 - // Create in-memory SQLite database for testing 1255 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1256 - .await 1257 - .expect("Failed to connect to in-memory SQLite"); 1258 - 1259 - // Create the queue schema 1260 - crate::sqlite_schema::create_schema(&pool) 1261 - .await 1262 - .expect("Failed to create schema"); 1263 - 1264 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::new(pool)); 1265 - 1266 - // Ack should always succeed as it's a no-op 1267 - let any_work = HandleResolutionWork { 1268 - handle: "any.example.com".to_string(), 1269 - }; 1270 - 1271 - let result = adapter.ack(&any_work).await; 1272 - assert!(result.is_ok()); 1273 - } 1274 - 1275 - #[tokio::test] 1276 - async fn test_sqlite_queue_adapter_generic_work_type() { 1277 - // Test with a different work type to demonstrate genericity 1278 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 1279 - struct CustomWork { 1280 - id: u64, 1281 - name: String, 1282 - data: Vec<i32>, 1283 - } 1284 - 1285 - // Create in-memory SQLite database for testing 1286 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1287 - .await 1288 - .expect("Failed to connect to in-memory SQLite"); 1289 - 1290 - // Create the queue schema 1291 - crate::sqlite_schema::create_schema(&pool) 1292 - .await 1293 - .expect("Failed to create schema"); 1294 - 1295 - let adapter = Arc::new(SqliteQueueAdapter::<CustomWork>::new(pool.clone())); 1296 - 1297 - // Create custom work items 1298 - let work1 = CustomWork { 1299 - id: 123, 1300 - name: "test_work".to_string(), 1301 - data: vec![1, 2, 3, 4, 5], 1302 - }; 1303 - 1304 - let work2 = CustomWork { 1305 - id: 456, 1306 - name: "another_work".to_string(), 1307 - data: vec![10, 20], 1308 - }; 1309 - 1310 - // Test push for both items 1311 - adapter.push(work1.clone()).await.unwrap(); 1312 - adapter.push(work2.clone()).await.unwrap(); 1313 - 1314 - // Verify the records are in database with correct JSON serialization 1315 - let records: Vec<(i64, String, i64)> = sqlx::query_as( 1316 - "SELECT id, work, queued_at FROM handle_resolution_queue ORDER BY queued_at ASC" 1317 - ) 1318 - .fetch_all(&pool) 1319 - .await 1320 - .expect("Failed to query database"); 1321 - 1322 - assert_eq!(records.len(), 2); 1323 - 1324 - // Verify first work item JSON 1325 - let stored_work1: CustomWork = serde_json::from_str(&records[0].1) 1326 - .expect("Failed to deserialize first work item"); 1327 - assert_eq!(stored_work1, work1); 1328 - 1329 - // Verify the JSON contains all expected fields 1330 - let json_value1: serde_json::Value = serde_json::from_str(&records[0].1) 1331 - .expect("Failed to parse JSON"); 1332 - assert_eq!(json_value1["id"], 123); 1333 - assert_eq!(json_value1["name"], "test_work"); 1334 - assert_eq!(json_value1["data"], serde_json::json!([1, 2, 3, 4, 5])); 1335 - 1336 - // Verify second work item JSON 1337 - let stored_work2: CustomWork = serde_json::from_str(&records[1].1) 1338 - .expect("Failed to deserialize second work item"); 1339 - assert_eq!(stored_work2, work2); 1340 - 1341 - let json_value2: serde_json::Value = serde_json::from_str(&records[1].1) 1342 - .expect("Failed to parse JSON"); 1343 - assert_eq!(json_value2["id"], 456); 1344 - assert_eq!(json_value2["name"], "another_work"); 1345 - assert_eq!(json_value2["data"], serde_json::json!([10, 20])); 1346 - 1347 - // Verify depth 1348 - assert_eq!(adapter.depth().await, Some(2)); 1349 - 1350 - // Test pull - should get items in FIFO order 1351 - let pulled1 = adapter.pull().await; 1352 - assert!(pulled1.is_some()); 1353 - let pulled_work1 = pulled1.unwrap(); 1354 - assert_eq!(pulled_work1, work1); 1355 - 1356 - // Verify database now has one record 1357 - let count_after_first_pull: i64 = sqlx::query_scalar( 1358 - "SELECT COUNT(*) FROM handle_resolution_queue" 1359 - ) 1360 - .fetch_one(&pool) 1361 - .await 1362 - .expect("Failed to count records"); 1363 - assert_eq!(count_after_first_pull, 1); 1364 - 1365 - let pulled2 = adapter.pull().await; 1366 - assert!(pulled2.is_some()); 1367 - let pulled_work2 = pulled2.unwrap(); 1368 - assert_eq!(pulled_work2, work2); 1369 - 1370 - // Verify database is now empty 1371 - let count_after_second_pull: i64 = sqlx::query_scalar( 1372 - "SELECT COUNT(*) FROM handle_resolution_queue" 1373 - ) 1374 - .fetch_one(&pool) 1375 - .await 1376 - .expect("Failed to count records"); 1377 - assert_eq!(count_after_second_pull, 0); 1378 - 1379 - // Verify queue is empty 1380 - assert_eq!(adapter.depth().await, Some(0)); 1381 - let empty_pull = adapter.pull().await; 1382 - assert!(empty_pull.is_none()); 1383 - 1384 - // Test ack (should be no-op) 1385 - adapter.ack(&pulled_work1).await.expect("Ack should succeed"); 1386 - adapter.ack(&pulled_work2).await.expect("Ack should succeed"); 1387 - } 1388 - 1389 - #[tokio::test] 1390 - async fn test_sqlite_queue_adapter_work_shedding() { 1391 - // Create in-memory SQLite database for testing 1392 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1393 - .await 1394 - .expect("Failed to connect to in-memory SQLite"); 1395 - 1396 - // Create the queue schema 1397 - crate::sqlite_schema::create_schema(&pool) 1398 - .await 1399 - .expect("Failed to create schema"); 1400 - 1401 - // Create adapter with small max_size for testing work shedding 1402 - let max_size = 10; // Use larger size to properly test batch deletion 1403 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::with_max_size( 1404 - pool.clone(), 1405 - max_size 1406 - )); 1407 - 1408 - // Verify initial empty state 1409 - assert_eq!(adapter.depth().await, Some(0)); 1410 - 1411 - // Push items up to the limit (should not trigger shedding) 1412 - let mut handles = Vec::new(); 1413 - for i in 0..max_size { 1414 - let handle = format!("test-{:03}", i); 1415 - handles.push(handle.clone()); 1416 - let work = HandleResolutionWork { handle }; 1417 - adapter.push(work).await.expect("Push should succeed"); 1418 - } 1419 - 1420 - // Verify all items are present 1421 - assert_eq!(adapter.depth().await, Some(max_size as usize)); 1422 - 1423 - // Push beyond 110% of max_size to trigger batch shedding 1424 - // The implementation checks at 110% and deletes down to 80% 1425 - let trigger_point = max_size + (max_size / 10) + 1; 1426 - for i in max_size..trigger_point { 1427 - let handle = format!("test-{:03}", i); 1428 - handles.push(handle); 1429 - let work = HandleResolutionWork { handle: handles[i as usize].clone() }; 1430 - adapter.push(work).await.expect("Push should succeed"); 1431 - } 1432 - 1433 - // After triggering shedding, queue should be around 80% of max_size 1434 - let depth_after_shedding = adapter.depth().await.unwrap(); 1435 - let expected_size = (max_size as f64 * 0.8) as usize; 1436 - 1437 - // Allow some variance due to batch deletion 1438 - assert!( 1439 - depth_after_shedding <= expected_size + 1, 1440 - "Queue size {} should be around 80% of max_size ({})", 1441 - depth_after_shedding, 1442 - expected_size 1443 - ); 1444 - 1445 - // Verify oldest items were deleted and newest items remain 1446 - let records: Vec<(i64, String, i64)> = sqlx::query_as( 1447 - "SELECT id, work, queued_at FROM handle_resolution_queue ORDER BY queued_at ASC" 1448 - ) 1449 - .fetch_all(&pool) 1450 - .await 1451 - .expect("Failed to query database after shedding"); 1452 - 1453 - // Some of the oldest items should be gone (but not necessarily all the first ones) 1454 - // With batch deletion to 80%, we keep recent items 1455 - let last_item: HandleResolutionWork = serde_json::from_str(&records[records.len() - 1].1) 1456 - .expect("Failed to deserialize last work"); 1457 - // Should have the most recent items 1458 - assert!(last_item.handle.starts_with("test-01"), "Should have recent items"); 1459 - 1460 - // Verify FIFO order is maintained for remaining items 1461 - let mut prev_id = 0; 1462 - for record in &records { 1463 - let id: i64 = record.0; 1464 - assert!(id > prev_id, "IDs should be in ascending order"); 1465 - prev_id = id; 1466 - } 1467 - } 1468 - 1469 - #[tokio::test] 1470 - async fn test_sqlite_queue_adapter_work_shedding_disabled() { 1471 - // Create in-memory SQLite database for testing 1472 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1473 - .await 1474 - .expect("Failed to connect to in-memory SQLite"); 1475 - 1476 - // Create the queue schema 1477 - crate::sqlite_schema::create_schema(&pool) 1478 - .await 1479 - .expect("Failed to create schema"); 1480 - 1481 - // Create adapter with max_size = 0 (disabled work shedding) 1482 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::with_max_size( 1483 - pool.clone(), 1484 - 0 1485 - )); 1486 - 1487 - // Push many items (should not trigger any shedding) 1488 - let mut expected_handles = Vec::new(); 1489 - for i in 0..100 { 1490 - let handle = format!("test-{:03}", i); 1491 - expected_handles.push(handle.clone()); 1492 - let work = HandleResolutionWork { handle }; 1493 - adapter.push(work).await.expect("Push should succeed"); 1494 - } 1495 - 1496 - // Verify all items are present (no shedding occurred) 1497 - assert_eq!(adapter.depth().await, Some(100)); 1498 - 1499 - // Verify all items are in database 1500 - let records: Vec<(i64, String, i64)> = sqlx::query_as( 1501 - "SELECT id, work, queued_at FROM handle_resolution_queue ORDER BY queued_at ASC" 1502 - ) 1503 - .fetch_all(&pool) 1504 - .await 1505 - .expect("Failed to query database"); 1506 - 1507 - assert_eq!(records.len(), 100); 1508 - 1509 - // Verify all items are present in correct order 1510 - for (i, expected_handle) in expected_handles.iter().enumerate() { 1511 - let stored_work: HandleResolutionWork = serde_json::from_str(&records[i].1) 1512 - .expect("Failed to deserialize stored work"); 1513 - assert_eq!(stored_work.handle, *expected_handle); 1514 - } 1515 - } 1516 - 1517 - #[tokio::test] 1518 - async fn test_sqlite_queue_adapter_performance_optimizations() { 1519 - // Create in-memory SQLite database for testing 1520 - let pool = sqlx::SqlitePool::connect("sqlite::memory:") 1521 - .await 1522 - .expect("Failed to connect to in-memory SQLite"); 1523 - 1524 - // Create the queue schema 1525 - crate::sqlite_schema::create_schema(&pool) 1526 - .await 1527 - .expect("Failed to create schema"); 1528 - 1529 - // Create adapter with reasonable max_size 1530 - let max_size = 100; 1531 - let adapter = Arc::new(SqliteQueueAdapter::<HandleResolutionWork>::with_max_size( 1532 - pool.clone(), 1533 - max_size 1534 - )); 1535 - 1536 - // Test 1: Verify inserts don't trigger checks when well under limit 1537 - // Push 50 items (50% of max_size) - should not trigger any cleanup checks 1538 - for i in 0..50 { 1539 - let work = HandleResolutionWork { 1540 - handle: format!("handle-{:04}", i), 1541 - }; 1542 - adapter.push(work).await.expect("Push should succeed"); 1543 - } 1544 - assert_eq!(adapter.depth().await, Some(50)); 1545 - 1546 - // Test 2: Verify batch deletion efficiency 1547 - // Push to 110% to trigger batch cleanup 1548 - for i in 50..111 { 1549 - let work = HandleResolutionWork { 1550 - handle: format!("handle-{:04}", i), 1551 - }; 1552 - adapter.push(work).await.expect("Push should succeed"); 1553 - // Add tiny delay to ensure different timestamps for proper ordering 1554 - if i % 10 == 0 { 1555 - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; 1556 - } 1557 - } 1558 - 1559 - // Should have deleted down to ~80% of max_size 1560 - let depth_after_batch = adapter.depth().await.unwrap(); 1561 - assert!( 1562 - (79..=81).contains(&depth_after_batch), 1563 - "After batch deletion, size should be ~80 (got {})", 1564 - depth_after_batch 1565 - ); 1566 - 1567 - // Test 3: Verify cleanup doesn't happen again immediately 1568 - // Push a few more items - should not trigger another cleanup 1569 - for i in 111..115 { 1570 - let work = HandleResolutionWork { 1571 - handle: format!("handle-{:04}", i), 1572 - }; 1573 - adapter.push(work).await.expect("Push should succeed"); 1574 - } 1575 - 1576 - let depth_no_cleanup = adapter.depth().await.unwrap(); 1577 - assert!( 1578 - depth_no_cleanup > 80 && depth_no_cleanup < 90, 1579 - "Should not have triggered cleanup yet (got {})", 1580 - depth_no_cleanup 1581 - ); 1582 - 1583 - // Test 4: Verify timestamp-based deletion is working correctly 1584 - // The oldest items should be gone after batch deletion 1585 - let records: Vec<(i64, String, i64)> = sqlx::query_as( 1586 - "SELECT id, work, queued_at FROM handle_resolution_queue 1587 - ORDER BY queued_at ASC LIMIT 5" 1588 - ) 1589 - .fetch_all(&pool) 1590 - .await 1591 - .expect("Failed to query database"); 1592 - 1593 - // Verify we have recent items (not necessarily the oldest) 1594 - let oldest_work: HandleResolutionWork = serde_json::from_str(&records[0].1) 1595 - .expect("Failed to deserialize work"); 1596 - let oldest_num: i32 = oldest_work.handle 1597 - .trim_start_matches("handle-") 1598 - .parse() 1599 - .unwrap_or(0); 1600 - 1601 - // After batch deletion to 80%, we should have deleted approximately the first 31 items 1602 - // But allow some variance due to timing 1603 - assert!( 1604 - oldest_num >= 20, 1605 - "Should have deleted old items, oldest is now: {}", 1606 - oldest_work.handle 1607 - ); 1608 - 1609 - // Test 5: Verify FIFO order is maintained after batch operations 1610 - let mut prev_timestamp = 0i64; 1611 - let all_records: Vec<(i64, String, i64)> = sqlx::query_as( 1612 - "SELECT id, work, queued_at FROM handle_resolution_queue ORDER BY queued_at ASC" 1613 - ) 1614 - .fetch_all(&pool) 1615 - .await 1616 - .expect("Failed to query database"); 1617 - 1618 - for record in &all_records { 1619 - assert!( 1620 - record.2 >= prev_timestamp, 1621 - "Timestamps should be in ascending order" 1622 - ); 1623 - prev_timestamp = record.2; 1624 - } 1625 - } 1626 - }