QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Background task for asynchronous handle resolution
2//!
3//! This module implements a background task that processes handle resolution requests
4//! asynchronously through a work queue. The design supports multiple queue backends
5//! and ensures resolved handles are cached for efficient subsequent lookups.
6
7use crate::handle_resolver::HandleResolver;
8use crate::metrics::SharedMetricsPublisher;
9use crate::queue::{HandleResolutionWork, QueueAdapter};
10use anyhow::Result;
11use std::sync::Arc;
12use std::time::Duration;
13use thiserror::Error;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info, instrument};
16
17/// Handle resolver task errors
18#[derive(Error, Debug)]
19pub(crate) enum HandleResolverError {
20 /// Queue adapter health check failed
21 #[error("error-quickdid-task-1 Queue adapter health check failed: adapter is not healthy")]
22 QueueAdapterUnhealthy,
23}
24
25/// Configuration for the handle resolver task processor
26#[derive(Clone, Debug)]
27pub struct HandleResolverTaskConfig {
28 /// Default timeout for resolution requests in milliseconds
29 pub default_timeout_ms: u64,
30}
31
32impl Default for HandleResolverTaskConfig {
33 fn default() -> Self {
34 Self {
35 default_timeout_ms: 10000, // 10 seconds
36 }
37 }
38}
39
40/// Handle resolver task processor
41pub(crate) struct HandleResolverTask {
42 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
43 handle_resolver: Arc<dyn HandleResolver>,
44 cancel_token: CancellationToken,
45 config: HandleResolverTaskConfig,
46 metrics_publisher: SharedMetricsPublisher,
47}
48
49impl HandleResolverTask {
50 /// Create a new handle resolver task processor
51 pub fn new(
52 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
53 handle_resolver: Arc<dyn HandleResolver>,
54 cancel_token: CancellationToken,
55 metrics_publisher: SharedMetricsPublisher,
56 ) -> Self {
57 let config = HandleResolverTaskConfig::default();
58 Self {
59 adapter,
60 handle_resolver,
61 cancel_token,
62 config,
63 metrics_publisher,
64 }
65 }
66
67 /// Create a new handle resolver task processor with custom configuration
68 pub fn with_config(
69 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
70 handle_resolver: Arc<dyn HandleResolver>,
71 cancel_token: CancellationToken,
72 config: HandleResolverTaskConfig,
73 metrics_publisher: SharedMetricsPublisher,
74 ) -> Self {
75 Self {
76 adapter,
77 handle_resolver,
78 cancel_token,
79 config,
80 metrics_publisher,
81 }
82 }
83
84 /// Run the handle resolver task processor
85 #[instrument(skip(self))]
86 pub async fn run(self) -> Result<(), HandleResolverError> {
87 info!("Handle resolver task processor started");
88
89 // Check adapter health before starting
90 if !self.adapter.is_healthy().await {
91 return Err(HandleResolverError::QueueAdapterUnhealthy);
92 }
93
94 loop {
95 tokio::select! {
96 _ = self.cancel_token.cancelled() => {
97 info!("Handle resolver task processor shutting down");
98 break;
99 }
100 work = self.adapter.pull() => {
101 if let Some(work) = work {
102 // Process the handle resolution directly
103 self.process_handle_resolution(work.clone()).await;
104 // Acknowledge work after processing (ignore errors)
105 let _ = self.adapter.ack(&work).await;
106 }
107 }
108 }
109 }
110
111 // All work has been processed
112 info!("All handle resolutions completed");
113 info!("Handle resolver task processor stopped");
114
115 Ok(())
116 }
117
118 /// Check if an error represents a soft failure (handle not found)
119 /// rather than a real error condition.
120 ///
121 /// These atproto_identity library errors indicate the handle doesn't support
122 /// the specific resolution method, which is normal and expected:
123 /// - error-atproto-identity-resolve-4: DNS resolution failed (no records)
124 /// - error-atproto-identity-resolve-5: HTTP resolution failed (hostname not found)
125 fn is_soft_failure(error_str: &str) -> bool {
126 // Check for specific atproto_identity error codes that indicate "not found"
127 // rather than actual failures
128 if error_str.starts_with("error-atproto-identity-resolve-4") {
129 // DNS resolution - check if it's a "no records" scenario
130 error_str.contains("NoRecordsFound")
131 } else if error_str.starts_with("error-atproto-identity-resolve-6") {
132 // HTTP resolution - check if it's a DID format issue
133 error_str.contains("expected DID format")
134 } else if error_str.starts_with("error-atproto-identity-resolve-5") {
135 // HTTP resolution - check if it's a hostname lookup failure
136 error_str.contains("No address associated with hostname")
137 || error_str.contains("failed to lookup address information")
138 } else {
139 false
140 }
141 }
142
143 /// Process a single handle resolution work item
144 #[instrument(skip(self), fields(
145 handle = %work.handle,
146 ))]
147 async fn process_handle_resolution(&self, work: HandleResolutionWork) {
148 let start_time = std::time::Instant::now();
149
150 debug!("Processing handle resolution: {}", work.handle);
151
152 // Perform the handle resolution with timeout
153 let timeout_duration = Duration::from_millis(self.config.default_timeout_ms);
154 let resolution_future = self.handle_resolver.resolve(&work.handle);
155
156 let result = tokio::time::timeout(timeout_duration, resolution_future).await;
157
158 let duration_ms = start_time.elapsed().as_millis() as u64;
159
160 // Publish metrics
161 self.metrics_publisher
162 .incr("task.handle_resolution.processed")
163 .await;
164 self.metrics_publisher
165 .time("task.handle_resolution.duration_ms", duration_ms)
166 .await;
167
168 match result {
169 Ok(Ok((did, _timestamp))) => {
170 // Publish success metrics
171 self.metrics_publisher
172 .incr("task.handle_resolution.success")
173 .await;
174 self.metrics_publisher
175 .incr("task.handle_resolution.cached")
176 .await;
177
178 debug!(
179 handle = %work.handle,
180 did = %did,
181 duration_ms = duration_ms,
182 "Handle resolved successfully"
183 );
184 }
185 Ok(Err(e)) => {
186 let error_str = e.to_string();
187
188 if Self::is_soft_failure(&error_str) {
189 // This is a soft failure - handle simply doesn't support this resolution method
190 // Publish not-found metrics
191 self.metrics_publisher
192 .incr("task.handle_resolution.not_found")
193 .await;
194
195 debug!(
196 handle = %work.handle,
197 error = %error_str,
198 duration_ms = duration_ms,
199 "Handle not found (soft failure)"
200 );
201 } else {
202 // This is a real error
203 // Publish failure metrics
204 self.metrics_publisher
205 .incr("task.handle_resolution.failed")
206 .await;
207
208 error!(
209 handle = %work.handle,
210 error = %error_str,
211 duration_ms = duration_ms,
212 "Handle resolution failed"
213 );
214 }
215 }
216 Err(_) => {
217 // Publish timeout metrics
218 self.metrics_publisher
219 .incr("task.handle_resolution.timeout")
220 .await;
221
222 error!(
223 handle = %work.handle,
224 duration_ms = duration_ms,
225 "Handle resolution timed out after {}ms", self.config.default_timeout_ms
226 );
227 }
228 }
229 }
230}
231
232// ========= Public API =========
233
234/// Opaque handle for a handle resolver task
235pub struct HandleResolverTaskHandle {
236 task: HandleResolverTask,
237}
238
239impl HandleResolverTaskHandle {
240 /// Run the handle resolver task processor
241 pub async fn run(self) -> Result<()> {
242 self.task.run().await.map_err(|e| anyhow::anyhow!(e))
243 }
244}
245
246// ========= Factory Functions =========
247
248/// Create a new handle resolver task with default configuration.
249///
250/// # Arguments
251///
252/// * `adapter` - Queue adapter for work items
253/// * `handle_resolver` - Handle resolver implementation
254/// * `cancel_token` - Token for graceful shutdown
255/// * `metrics_publisher` - Metrics publisher for telemetry
256pub fn create_handle_resolver_task(
257 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
258 handle_resolver: Arc<dyn HandleResolver>,
259 cancel_token: CancellationToken,
260 metrics_publisher: SharedMetricsPublisher,
261) -> HandleResolverTaskHandle {
262 HandleResolverTaskHandle {
263 task: HandleResolverTask::new(adapter, handle_resolver, cancel_token, metrics_publisher),
264 }
265}
266
267/// Create a new handle resolver task with custom configuration.
268///
269/// # Arguments
270///
271/// * `adapter` - Queue adapter for work items
272/// * `handle_resolver` - Handle resolver implementation
273/// * `cancel_token` - Token for graceful shutdown
274/// * `config` - Task configuration
275/// * `metrics_publisher` - Metrics publisher for telemetry
276pub fn create_handle_resolver_task_with_config(
277 adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
278 handle_resolver: Arc<dyn HandleResolver>,
279 cancel_token: CancellationToken,
280 config: HandleResolverTaskConfig,
281 metrics_publisher: SharedMetricsPublisher,
282) -> HandleResolverTaskHandle {
283 HandleResolverTaskHandle {
284 task: HandleResolverTask::with_config(
285 adapter,
286 handle_resolver,
287 cancel_token,
288 config,
289 metrics_publisher,
290 ),
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::queue::MpscQueueAdapter;
298 use async_trait::async_trait;
299 use std::sync::Arc;
300 use tokio::sync::mpsc;
301
302 // Mock handle resolver for testing
303 #[derive(Clone)]
304 struct MockHandleResolver {
305 should_fail: bool,
306 }
307
308 #[async_trait]
309 impl HandleResolver for MockHandleResolver {
310 async fn resolve(
311 &self,
312 handle: &str,
313 ) -> Result<(String, u64), crate::handle_resolver::HandleResolverError> {
314 if self.should_fail {
315 Err(crate::handle_resolver::HandleResolverError::MockResolutionFailure)
316 } else {
317 let timestamp = std::time::SystemTime::now()
318 .duration_since(std::time::UNIX_EPOCH)
319 .unwrap_or_default()
320 .as_secs();
321 Ok((format!("did:plc:{}", handle.replace('.', "")), timestamp))
322 }
323 }
324 }
325
326 #[tokio::test]
327 async fn test_handle_resolver_task_successful_resolution() {
328 // Create channels and adapter
329 let (sender, receiver) = mpsc::channel(10);
330 let adapter = Arc::new(MpscQueueAdapter::from_channel(sender.clone(), receiver));
331
332 // Create mock handle resolver
333 let handle_resolver = Arc::new(MockHandleResolver { should_fail: false });
334
335 // Create cancellation token
336 let cancel_token = CancellationToken::new();
337
338 // Create metrics publisher
339 let metrics_publisher = Arc::new(crate::metrics::NoOpMetricsPublisher);
340
341 // Create task with custom config
342 let config = HandleResolverTaskConfig {
343 default_timeout_ms: 5000,
344 };
345
346 let task = HandleResolverTask::with_config(
347 adapter.clone(),
348 handle_resolver,
349 cancel_token.clone(),
350 config,
351 metrics_publisher,
352 );
353
354 // Create handle resolution work
355 let work = HandleResolutionWork::new("alice.example.com".to_string());
356
357 // Send work to queue
358 sender.send(work).await.unwrap();
359
360 // Run task for a short time
361 let task_handle = tokio::spawn(async move { task.run().await });
362
363 // Wait a bit for processing
364 tokio::time::sleep(Duration::from_millis(500)).await;
365
366 // Cancel the task
367 cancel_token.cancel();
368
369 // Wait for task to complete
370 let _ = task_handle.await;
371
372 // Test passes if task runs without panic
373 }
374
375 #[test]
376 fn test_is_soft_failure() {
377 // Test DNS NoRecordsFound pattern (error-atproto-identity-resolve-4)
378 let dns_no_records = "error-atproto-identity-resolve-4 DNS resolution failed: ResolveError { kind: Proto(ProtoError { kind: NoRecordsFound { query: Query { name: Name(\"_atproto.noahshachtman.bsky.social.railway.internal.\"), query_type: TXT, query_class: IN }, soa: None, ns: None, negative_ttl: None, response_code: NotImp, trusted: true, authorities: None } }) }";
379 assert!(HandleResolverTask::is_soft_failure(dns_no_records));
380
381 // Test HTTP hostname not found pattern (error-atproto-identity-resolve-5)
382 let http_no_hostname = "error-atproto-identity-resolve-5 HTTP resolution failed: reqwest::Error { kind: Request, url: \"https://mattie.thegem.city/.well-known/atproto-did\", source: hyper_util::client::legacy::Error(Connect, ConnectError(\"dns error\", Custom { kind: Uncategorized, error: \"failed to lookup address information: No address associated with hostname\" })) }";
383 assert!(HandleResolverTask::is_soft_failure(http_no_hostname));
384
385 // Test alternate HTTP hostname failure message
386 let http_lookup_failed = "error-atproto-identity-resolve-5 HTTP resolution failed: reqwest::Error { kind: Request, url: \"https://example.com/.well-known/atproto-did\", source: hyper_util::client::legacy::Error(Connect, ConnectError(\"dns error\", Custom { kind: Uncategorized, error: \"failed to lookup address information\" })) }";
387 assert!(HandleResolverTask::is_soft_failure(http_lookup_failed));
388
389 // Test HTTP invalid DID format (error-atproto-identity-resolve-6) - like reuters.com
390 let http_invalid_did = "error-atproto-identity-resolve-6 Invalid HTTP resolution response: expected DID format";
391 assert!(HandleResolverTask::is_soft_failure(http_invalid_did));
392
393 // Test weratedogs.com case
394 let weratedogs_error = "error-atproto-identity-resolve-6 Invalid HTTP resolution response: expected DID format";
395 assert!(HandleResolverTask::is_soft_failure(weratedogs_error));
396
397 // Test DNS error that is NOT a soft failure (different DNS error)
398 let dns_real_error = "error-atproto-identity-resolve-4 DNS resolution failed: timeout";
399 assert!(!HandleResolverTask::is_soft_failure(dns_real_error));
400
401 // Test HTTP error that is NOT a soft failure (connection timeout)
402 let http_timeout =
403 "error-atproto-identity-resolve-5 HTTP resolution failed: connection timeout";
404 assert!(!HandleResolverTask::is_soft_failure(http_timeout));
405
406 // Test HTTP error that is NOT a soft failure (500 error)
407 let http_500 = "error-atproto-identity-resolve-5 HTTP resolution failed: status code 500";
408 assert!(!HandleResolverTask::is_soft_failure(http_500));
409
410 // Test QuickDID errors should never be soft failures
411 let quickdid_error =
412 "error-quickdid-resolve-1 Failed to resolve subject: internal server error";
413 assert!(!HandleResolverTask::is_soft_failure(quickdid_error));
414
415 // Test other atproto_identity error codes should not be soft failures
416 let other_atproto_error = "error-atproto-identity-resolve-1 Some other error";
417 assert!(!HandleResolverTask::is_soft_failure(other_atproto_error));
418 }
419}