QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 419 lines 16 kB view raw
1//! Background task for asynchronous handle resolution 2//! 3//! This module implements a background task that processes handle resolution requests 4//! asynchronously through a work queue. The design supports multiple queue backends 5//! and ensures resolved handles are cached for efficient subsequent lookups. 6 7use crate::handle_resolver::HandleResolver; 8use crate::metrics::SharedMetricsPublisher; 9use crate::queue::{HandleResolutionWork, QueueAdapter}; 10use anyhow::Result; 11use std::sync::Arc; 12use std::time::Duration; 13use thiserror::Error; 14use tokio_util::sync::CancellationToken; 15use tracing::{debug, error, info, instrument}; 16 17/// Handle resolver task errors 18#[derive(Error, Debug)] 19pub(crate) enum HandleResolverError { 20 /// Queue adapter health check failed 21 #[error("error-quickdid-task-1 Queue adapter health check failed: adapter is not healthy")] 22 QueueAdapterUnhealthy, 23} 24 25/// Configuration for the handle resolver task processor 26#[derive(Clone, Debug)] 27pub struct HandleResolverTaskConfig { 28 /// Default timeout for resolution requests in milliseconds 29 pub default_timeout_ms: u64, 30} 31 32impl Default for HandleResolverTaskConfig { 33 fn default() -> Self { 34 Self { 35 default_timeout_ms: 10000, // 10 seconds 36 } 37 } 38} 39 40/// Handle resolver task processor 41pub(crate) struct HandleResolverTask { 42 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>, 43 handle_resolver: Arc<dyn HandleResolver>, 44 cancel_token: CancellationToken, 45 config: HandleResolverTaskConfig, 46 metrics_publisher: SharedMetricsPublisher, 47} 48 49impl HandleResolverTask { 50 /// Create a new handle resolver task processor 51 pub fn new( 52 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>, 53 handle_resolver: Arc<dyn HandleResolver>, 54 cancel_token: CancellationToken, 55 metrics_publisher: SharedMetricsPublisher, 56 ) -> Self { 57 let config = HandleResolverTaskConfig::default(); 58 Self { 59 adapter, 60 handle_resolver, 61 cancel_token, 62 config, 63 metrics_publisher, 64 } 65 } 66 67 /// Create a new handle resolver task processor with custom configuration 68 pub fn with_config( 69 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>, 70 handle_resolver: Arc<dyn HandleResolver>, 71 cancel_token: CancellationToken, 72 config: HandleResolverTaskConfig, 73 metrics_publisher: SharedMetricsPublisher, 74 ) -> Self { 75 Self { 76 adapter, 77 handle_resolver, 78 cancel_token, 79 config, 80 metrics_publisher, 81 } 82 } 83 84 /// Run the handle resolver task processor 85 #[instrument(skip(self))] 86 pub async fn run(self) -> Result<(), HandleResolverError> { 87 info!("Handle resolver task processor started"); 88 89 // Check adapter health before starting 90 if !self.adapter.is_healthy().await { 91 return Err(HandleResolverError::QueueAdapterUnhealthy); 92 } 93 94 loop { 95 tokio::select! { 96 _ = self.cancel_token.cancelled() => { 97 info!("Handle resolver task processor shutting down"); 98 break; 99 } 100 work = self.adapter.pull() => { 101 if let Some(work) = work { 102 // Process the handle resolution directly 103 self.process_handle_resolution(work.clone()).await; 104 // Acknowledge work after processing (ignore errors) 105 let _ = self.adapter.ack(&work).await; 106 } 107 } 108 } 109 } 110 111 // All work has been processed 112 info!("All handle resolutions completed"); 113 info!("Handle resolver task processor stopped"); 114 115 Ok(()) 116 } 117 118 /// Check if an error represents a soft failure (handle not found) 119 /// rather than a real error condition. 120 /// 121 /// These atproto_identity library errors indicate the handle doesn't support 122 /// the specific resolution method, which is normal and expected: 123 /// - error-atproto-identity-resolve-4: DNS resolution failed (no records) 124 /// - error-atproto-identity-resolve-5: HTTP resolution failed (hostname not found) 125 fn is_soft_failure(error_str: &str) -> bool { 126 // Check for specific atproto_identity error codes that indicate "not found" 127 // rather than actual failures 128 if error_str.starts_with("error-atproto-identity-resolve-4") { 129 // DNS resolution - check if it's a "no records" scenario 130 error_str.contains("NoRecordsFound") 131 } else if error_str.starts_with("error-atproto-identity-resolve-6") { 132 // HTTP resolution - check if it's a DID format issue 133 error_str.contains("expected DID format") 134 } else if error_str.starts_with("error-atproto-identity-resolve-5") { 135 // HTTP resolution - check if it's a hostname lookup failure 136 error_str.contains("No address associated with hostname") 137 || error_str.contains("failed to lookup address information") 138 } else { 139 false 140 } 141 } 142 143 /// Process a single handle resolution work item 144 #[instrument(skip(self), fields( 145 handle = %work.handle, 146 ))] 147 async fn process_handle_resolution(&self, work: HandleResolutionWork) { 148 let start_time = std::time::Instant::now(); 149 150 debug!("Processing handle resolution: {}", work.handle); 151 152 // Perform the handle resolution with timeout 153 let timeout_duration = Duration::from_millis(self.config.default_timeout_ms); 154 let resolution_future = self.handle_resolver.resolve(&work.handle); 155 156 let result = tokio::time::timeout(timeout_duration, resolution_future).await; 157 158 let duration_ms = start_time.elapsed().as_millis() as u64; 159 160 // Publish metrics 161 self.metrics_publisher 162 .incr("task.handle_resolution.processed") 163 .await; 164 self.metrics_publisher 165 .time("task.handle_resolution.duration_ms", duration_ms) 166 .await; 167 168 match result { 169 Ok(Ok((did, _timestamp))) => { 170 // Publish success metrics 171 self.metrics_publisher 172 .incr("task.handle_resolution.success") 173 .await; 174 self.metrics_publisher 175 .incr("task.handle_resolution.cached") 176 .await; 177 178 debug!( 179 handle = %work.handle, 180 did = %did, 181 duration_ms = duration_ms, 182 "Handle resolved successfully" 183 ); 184 } 185 Ok(Err(e)) => { 186 let error_str = e.to_string(); 187 188 if Self::is_soft_failure(&error_str) { 189 // This is a soft failure - handle simply doesn't support this resolution method 190 // Publish not-found metrics 191 self.metrics_publisher 192 .incr("task.handle_resolution.not_found") 193 .await; 194 195 debug!( 196 handle = %work.handle, 197 error = %error_str, 198 duration_ms = duration_ms, 199 "Handle not found (soft failure)" 200 ); 201 } else { 202 // This is a real error 203 // Publish failure metrics 204 self.metrics_publisher 205 .incr("task.handle_resolution.failed") 206 .await; 207 208 error!( 209 handle = %work.handle, 210 error = %error_str, 211 duration_ms = duration_ms, 212 "Handle resolution failed" 213 ); 214 } 215 } 216 Err(_) => { 217 // Publish timeout metrics 218 self.metrics_publisher 219 .incr("task.handle_resolution.timeout") 220 .await; 221 222 error!( 223 handle = %work.handle, 224 duration_ms = duration_ms, 225 "Handle resolution timed out after {}ms", self.config.default_timeout_ms 226 ); 227 } 228 } 229 } 230} 231 232// ========= Public API ========= 233 234/// Opaque handle for a handle resolver task 235pub struct HandleResolverTaskHandle { 236 task: HandleResolverTask, 237} 238 239impl HandleResolverTaskHandle { 240 /// Run the handle resolver task processor 241 pub async fn run(self) -> Result<()> { 242 self.task.run().await.map_err(|e| anyhow::anyhow!(e)) 243 } 244} 245 246// ========= Factory Functions ========= 247 248/// Create a new handle resolver task with default configuration. 249/// 250/// # Arguments 251/// 252/// * `adapter` - Queue adapter for work items 253/// * `handle_resolver` - Handle resolver implementation 254/// * `cancel_token` - Token for graceful shutdown 255/// * `metrics_publisher` - Metrics publisher for telemetry 256pub fn create_handle_resolver_task( 257 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>, 258 handle_resolver: Arc<dyn HandleResolver>, 259 cancel_token: CancellationToken, 260 metrics_publisher: SharedMetricsPublisher, 261) -> HandleResolverTaskHandle { 262 HandleResolverTaskHandle { 263 task: HandleResolverTask::new(adapter, handle_resolver, cancel_token, metrics_publisher), 264 } 265} 266 267/// Create a new handle resolver task with custom configuration. 268/// 269/// # Arguments 270/// 271/// * `adapter` - Queue adapter for work items 272/// * `handle_resolver` - Handle resolver implementation 273/// * `cancel_token` - Token for graceful shutdown 274/// * `config` - Task configuration 275/// * `metrics_publisher` - Metrics publisher for telemetry 276pub fn create_handle_resolver_task_with_config( 277 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>, 278 handle_resolver: Arc<dyn HandleResolver>, 279 cancel_token: CancellationToken, 280 config: HandleResolverTaskConfig, 281 metrics_publisher: SharedMetricsPublisher, 282) -> HandleResolverTaskHandle { 283 HandleResolverTaskHandle { 284 task: HandleResolverTask::with_config( 285 adapter, 286 handle_resolver, 287 cancel_token, 288 config, 289 metrics_publisher, 290 ), 291 } 292} 293 294#[cfg(test)] 295mod tests { 296 use super::*; 297 use crate::queue::MpscQueueAdapter; 298 use async_trait::async_trait; 299 use std::sync::Arc; 300 use tokio::sync::mpsc; 301 302 // Mock handle resolver for testing 303 #[derive(Clone)] 304 struct MockHandleResolver { 305 should_fail: bool, 306 } 307 308 #[async_trait] 309 impl HandleResolver for MockHandleResolver { 310 async fn resolve( 311 &self, 312 handle: &str, 313 ) -> Result<(String, u64), crate::handle_resolver::HandleResolverError> { 314 if self.should_fail { 315 Err(crate::handle_resolver::HandleResolverError::MockResolutionFailure) 316 } else { 317 let timestamp = std::time::SystemTime::now() 318 .duration_since(std::time::UNIX_EPOCH) 319 .unwrap_or_default() 320 .as_secs(); 321 Ok((format!("did:plc:{}", handle.replace('.', "")), timestamp)) 322 } 323 } 324 } 325 326 #[tokio::test] 327 async fn test_handle_resolver_task_successful_resolution() { 328 // Create channels and adapter 329 let (sender, receiver) = mpsc::channel(10); 330 let adapter = Arc::new(MpscQueueAdapter::from_channel(sender.clone(), receiver)); 331 332 // Create mock handle resolver 333 let handle_resolver = Arc::new(MockHandleResolver { should_fail: false }); 334 335 // Create cancellation token 336 let cancel_token = CancellationToken::new(); 337 338 // Create metrics publisher 339 let metrics_publisher = Arc::new(crate::metrics::NoOpMetricsPublisher); 340 341 // Create task with custom config 342 let config = HandleResolverTaskConfig { 343 default_timeout_ms: 5000, 344 }; 345 346 let task = HandleResolverTask::with_config( 347 adapter.clone(), 348 handle_resolver, 349 cancel_token.clone(), 350 config, 351 metrics_publisher, 352 ); 353 354 // Create handle resolution work 355 let work = HandleResolutionWork::new("alice.example.com".to_string()); 356 357 // Send work to queue 358 sender.send(work).await.unwrap(); 359 360 // Run task for a short time 361 let task_handle = tokio::spawn(async move { task.run().await }); 362 363 // Wait a bit for processing 364 tokio::time::sleep(Duration::from_millis(500)).await; 365 366 // Cancel the task 367 cancel_token.cancel(); 368 369 // Wait for task to complete 370 let _ = task_handle.await; 371 372 // Test passes if task runs without panic 373 } 374 375 #[test] 376 fn test_is_soft_failure() { 377 // Test DNS NoRecordsFound pattern (error-atproto-identity-resolve-4) 378 let dns_no_records = "error-atproto-identity-resolve-4 DNS resolution failed: ResolveError { kind: Proto(ProtoError { kind: NoRecordsFound { query: Query { name: Name(\"_atproto.noahshachtman.bsky.social.railway.internal.\"), query_type: TXT, query_class: IN }, soa: None, ns: None, negative_ttl: None, response_code: NotImp, trusted: true, authorities: None } }) }"; 379 assert!(HandleResolverTask::is_soft_failure(dns_no_records)); 380 381 // Test HTTP hostname not found pattern (error-atproto-identity-resolve-5) 382 let http_no_hostname = "error-atproto-identity-resolve-5 HTTP resolution failed: reqwest::Error { kind: Request, url: \"https://mattie.thegem.city/.well-known/atproto-did\", source: hyper_util::client::legacy::Error(Connect, ConnectError(\"dns error\", Custom { kind: Uncategorized, error: \"failed to lookup address information: No address associated with hostname\" })) }"; 383 assert!(HandleResolverTask::is_soft_failure(http_no_hostname)); 384 385 // Test alternate HTTP hostname failure message 386 let http_lookup_failed = "error-atproto-identity-resolve-5 HTTP resolution failed: reqwest::Error { kind: Request, url: \"https://example.com/.well-known/atproto-did\", source: hyper_util::client::legacy::Error(Connect, ConnectError(\"dns error\", Custom { kind: Uncategorized, error: \"failed to lookup address information\" })) }"; 387 assert!(HandleResolverTask::is_soft_failure(http_lookup_failed)); 388 389 // Test HTTP invalid DID format (error-atproto-identity-resolve-6) - like reuters.com 390 let http_invalid_did = "error-atproto-identity-resolve-6 Invalid HTTP resolution response: expected DID format"; 391 assert!(HandleResolverTask::is_soft_failure(http_invalid_did)); 392 393 // Test weratedogs.com case 394 let weratedogs_error = "error-atproto-identity-resolve-6 Invalid HTTP resolution response: expected DID format"; 395 assert!(HandleResolverTask::is_soft_failure(weratedogs_error)); 396 397 // Test DNS error that is NOT a soft failure (different DNS error) 398 let dns_real_error = "error-atproto-identity-resolve-4 DNS resolution failed: timeout"; 399 assert!(!HandleResolverTask::is_soft_failure(dns_real_error)); 400 401 // Test HTTP error that is NOT a soft failure (connection timeout) 402 let http_timeout = 403 "error-atproto-identity-resolve-5 HTTP resolution failed: connection timeout"; 404 assert!(!HandleResolverTask::is_soft_failure(http_timeout)); 405 406 // Test HTTP error that is NOT a soft failure (500 error) 407 let http_500 = "error-atproto-identity-resolve-5 HTTP resolution failed: status code 500"; 408 assert!(!HandleResolverTask::is_soft_failure(http_500)); 409 410 // Test QuickDID errors should never be soft failures 411 let quickdid_error = 412 "error-quickdid-resolve-1 Failed to resolve subject: internal server error"; 413 assert!(!HandleResolverTask::is_soft_failure(quickdid_error)); 414 415 // Test other atproto_identity error codes should not be soft failures 416 let other_atproto_error = "error-atproto-identity-resolve-1 Some other error"; 417 assert!(!HandleResolverTask::is_soft_failure(other_atproto_error)); 418 } 419}