QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Rate-limited handle resolver implementation.
2//!
3//! This module provides a handle resolver wrapper that limits concurrent
4//! resolution requests using a semaphore to implement basic rate limiting.
5
6use super::errors::HandleResolverError;
7use super::traits::HandleResolver;
8use crate::metrics::SharedMetricsPublisher;
9use async_trait::async_trait;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Semaphore;
13use tokio::time::timeout;
14
15/// Rate-limited handle resolver that constrains concurrent resolutions.
16///
17/// This resolver wraps an inner resolver and uses a semaphore to limit
18/// the number of concurrent resolution requests. This provides basic
19/// rate limiting and protects upstream services from being overwhelmed.
20///
21/// # Architecture
22///
23/// The rate limiter should be placed between the base resolver and any
24/// caching layers:
25/// ```text
26/// Request -> Cache -> RateLimited -> Base -> DNS/HTTP
27/// ```
28///
29/// # Example
30///
31/// ```no_run
32/// use std::sync::Arc;
33/// use quickdid::handle_resolver::{
34/// create_base_resolver,
35/// create_rate_limited_resolver,
36/// HandleResolver,
37/// };
38/// use quickdid::metrics::NoOpMetricsPublisher;
39///
40/// # async fn example() {
41/// # use atproto_identity::resolve::HickoryDnsResolver;
42/// # use reqwest::Client;
43/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
44/// # let http_client = Client::new();
45/// # let metrics = Arc::new(NoOpMetricsPublisher);
46/// // Create base resolver
47/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone());
48///
49/// // Wrap with rate limiting (max 10 concurrent resolutions)
50/// let rate_limited = create_rate_limited_resolver(base, 10, metrics);
51///
52/// // Use the rate-limited resolver
53/// let (did, timestamp) = rate_limited.resolve("alice.bsky.social").await.unwrap();
54/// # }
55/// ```
56pub(super) struct RateLimitedHandleResolver {
57 /// Inner resolver that performs actual resolution.
58 inner: Arc<dyn HandleResolver>,
59
60 /// Semaphore for limiting concurrent resolutions.
61 semaphore: Arc<Semaphore>,
62
63 /// Optional timeout for acquiring permits (in milliseconds).
64 /// When None or 0, no timeout is applied.
65 timeout_ms: Option<u64>,
66
67 /// Metrics publisher for telemetry.
68 metrics: SharedMetricsPublisher,
69}
70
71impl RateLimitedHandleResolver {
72 /// Create a new rate-limited resolver.
73 ///
74 /// # Arguments
75 ///
76 /// * `inner` - The inner resolver to wrap
77 /// * `max_concurrent` - Maximum number of concurrent resolutions allowed
78 /// * `metrics` - Metrics publisher for telemetry
79 pub fn new(
80 inner: Arc<dyn HandleResolver>,
81 max_concurrent: usize,
82 metrics: SharedMetricsPublisher,
83 ) -> Self {
84 Self {
85 inner,
86 semaphore: Arc::new(Semaphore::new(max_concurrent)),
87 timeout_ms: None,
88 metrics,
89 }
90 }
91
92 /// Create a new rate-limited resolver with timeout.
93 ///
94 /// # Arguments
95 ///
96 /// * `inner` - The inner resolver to wrap
97 /// * `max_concurrent` - Maximum number of concurrent resolutions allowed
98 /// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout)
99 /// * `metrics` - Metrics publisher for telemetry
100 pub fn new_with_timeout(
101 inner: Arc<dyn HandleResolver>,
102 max_concurrent: usize,
103 timeout_ms: u64,
104 metrics: SharedMetricsPublisher,
105 ) -> Self {
106 Self {
107 inner,
108 semaphore: Arc::new(Semaphore::new(max_concurrent)),
109 timeout_ms: if timeout_ms > 0 {
110 Some(timeout_ms)
111 } else {
112 None
113 },
114 metrics,
115 }
116 }
117}
118
119#[async_trait]
120impl HandleResolver for RateLimitedHandleResolver {
121 async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> {
122 let permit_start = std::time::Instant::now();
123
124 // Track rate limiter queue depth
125 let available_permits = self.semaphore.available_permits();
126 self.metrics
127 .gauge(
128 "resolver.rate_limit.available_permits",
129 available_permits as u64,
130 )
131 .await;
132
133 // Acquire a permit from the semaphore, with optional timeout
134 let _permit = match self.timeout_ms {
135 Some(timeout_ms) if timeout_ms > 0 => {
136 // Apply timeout when acquiring permit
137 let duration = Duration::from_millis(timeout_ms);
138 match timeout(duration, self.semaphore.acquire()).await {
139 Ok(Ok(permit)) => {
140 let wait_ms = permit_start.elapsed().as_millis() as u64;
141 self.metrics
142 .time("resolver.rate_limit.permit_acquired", wait_ms)
143 .await;
144 permit
145 }
146 Ok(Err(e)) => {
147 // Semaphore error (e.g., closed)
148 self.metrics.incr("resolver.rate_limit.permit_error").await;
149 return Err(HandleResolverError::ResolutionFailed(format!(
150 "Failed to acquire rate limit permit: {}",
151 e
152 )));
153 }
154 Err(_) => {
155 // Timeout occurred
156 self.metrics
157 .incr("resolver.rate_limit.permit_timeout")
158 .await;
159 return Err(HandleResolverError::ResolutionFailed(format!(
160 "Rate limit permit acquisition timed out after {}ms",
161 timeout_ms
162 )));
163 }
164 }
165 }
166 _ => {
167 // No timeout configured, wait indefinitely
168 match self.semaphore.acquire().await {
169 Ok(permit) => {
170 let wait_ms = permit_start.elapsed().as_millis() as u64;
171 self.metrics
172 .time("resolver.rate_limit.permit_acquired", wait_ms)
173 .await;
174 permit
175 }
176 Err(e) => {
177 self.metrics.incr("resolver.rate_limit.permit_error").await;
178 return Err(HandleResolverError::ResolutionFailed(format!(
179 "Failed to acquire rate limit permit: {}",
180 e
181 )));
182 }
183 }
184 }
185 };
186
187 // With permit acquired, forward to inner resolver
188 self.inner.resolve(s).await
189 }
190
191 async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> {
192 // Set operations don't need rate limiting since they're typically administrative
193 // and don't involve network calls to external services
194 self.inner.set(handle, did).await
195 }
196}
197
198/// Create a rate-limited handle resolver.
199///
200/// This factory function creates a new [`RateLimitedHandleResolver`] that wraps
201/// the provided inner resolver with concurrency limiting.
202///
203/// # Arguments
204///
205/// * `inner` - The resolver to wrap with rate limiting
206/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
207/// * `metrics` - Metrics publisher for telemetry
208///
209/// # Returns
210///
211/// An `Arc<dyn HandleResolver>` that can be used wherever a handle resolver is needed.
212///
213/// # Example
214///
215/// ```no_run
216/// use std::sync::Arc;
217/// use quickdid::handle_resolver::{
218/// create_base_resolver,
219/// create_rate_limited_resolver,
220/// };
221///
222/// # async fn example() {
223/// # use atproto_identity::resolve::HickoryDnsResolver;
224/// # use reqwest::Client;
225/// # use quickdid::metrics::NoOpMetricsPublisher;
226/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
227/// # let http_client = Client::new();
228/// # let metrics = Arc::new(NoOpMetricsPublisher);
229/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone());
230/// let rate_limited = create_rate_limited_resolver(base, 10, metrics);
231/// # }
232/// ```
233pub fn create_rate_limited_resolver(
234 inner: Arc<dyn HandleResolver>,
235 max_concurrent: usize,
236 metrics: SharedMetricsPublisher,
237) -> Arc<dyn HandleResolver> {
238 Arc::new(RateLimitedHandleResolver::new(
239 inner,
240 max_concurrent,
241 metrics,
242 ))
243}
244
245/// Create a rate-limited handle resolver with timeout.
246///
247/// This factory function creates a new [`RateLimitedHandleResolver`] that wraps
248/// the provided inner resolver with concurrency limiting and timeout for permit acquisition.
249///
250/// # Arguments
251///
252/// * `inner` - The resolver to wrap with rate limiting
253/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
254/// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout)
255/// * `metrics` - Metrics publisher for telemetry
256///
257/// # Returns
258///
259/// An `Arc<dyn HandleResolver>` that can be used wherever a handle resolver is needed.
260///
261/// # Example
262///
263/// ```no_run
264/// use std::sync::Arc;
265/// use quickdid::handle_resolver::{
266/// create_base_resolver,
267/// create_rate_limited_resolver_with_timeout,
268/// };
269///
270/// # async fn example() {
271/// # use atproto_identity::resolve::HickoryDnsResolver;
272/// # use reqwest::Client;
273/// # use quickdid::metrics::NoOpMetricsPublisher;
274/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
275/// # let http_client = Client::new();
276/// # let metrics = Arc::new(NoOpMetricsPublisher);
277/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone());
278/// // Rate limit with 10 concurrent resolutions and 5 second timeout
279/// let rate_limited = create_rate_limited_resolver_with_timeout(base, 10, 5000, metrics);
280/// # }
281/// ```
282pub fn create_rate_limited_resolver_with_timeout(
283 inner: Arc<dyn HandleResolver>,
284 max_concurrent: usize,
285 timeout_ms: u64,
286 metrics: SharedMetricsPublisher,
287) -> Arc<dyn HandleResolver> {
288 Arc::new(RateLimitedHandleResolver::new_with_timeout(
289 inner,
290 max_concurrent,
291 timeout_ms,
292 metrics,
293 ))
294}