QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 294 lines 10 kB view raw
1//! Rate-limited handle resolver implementation. 2//! 3//! This module provides a handle resolver wrapper that limits concurrent 4//! resolution requests using a semaphore to implement basic rate limiting. 5 6use super::errors::HandleResolverError; 7use super::traits::HandleResolver; 8use crate::metrics::SharedMetricsPublisher; 9use async_trait::async_trait; 10use std::sync::Arc; 11use std::time::Duration; 12use tokio::sync::Semaphore; 13use tokio::time::timeout; 14 15/// Rate-limited handle resolver that constrains concurrent resolutions. 16/// 17/// This resolver wraps an inner resolver and uses a semaphore to limit 18/// the number of concurrent resolution requests. This provides basic 19/// rate limiting and protects upstream services from being overwhelmed. 20/// 21/// # Architecture 22/// 23/// The rate limiter should be placed between the base resolver and any 24/// caching layers: 25/// ```text 26/// Request -> Cache -> RateLimited -> Base -> DNS/HTTP 27/// ``` 28/// 29/// # Example 30/// 31/// ```no_run 32/// use std::sync::Arc; 33/// use quickdid::handle_resolver::{ 34/// create_base_resolver, 35/// create_rate_limited_resolver, 36/// HandleResolver, 37/// }; 38/// use quickdid::metrics::NoOpMetricsPublisher; 39/// 40/// # async fn example() { 41/// # use atproto_identity::resolve::HickoryDnsResolver; 42/// # use reqwest::Client; 43/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 44/// # let http_client = Client::new(); 45/// # let metrics = Arc::new(NoOpMetricsPublisher); 46/// // Create base resolver 47/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone()); 48/// 49/// // Wrap with rate limiting (max 10 concurrent resolutions) 50/// let rate_limited = create_rate_limited_resolver(base, 10, metrics); 51/// 52/// // Use the rate-limited resolver 53/// let (did, timestamp) = rate_limited.resolve("alice.bsky.social").await.unwrap(); 54/// # } 55/// ``` 56pub(super) struct RateLimitedHandleResolver { 57 /// Inner resolver that performs actual resolution. 58 inner: Arc<dyn HandleResolver>, 59 60 /// Semaphore for limiting concurrent resolutions. 61 semaphore: Arc<Semaphore>, 62 63 /// Optional timeout for acquiring permits (in milliseconds). 64 /// When None or 0, no timeout is applied. 65 timeout_ms: Option<u64>, 66 67 /// Metrics publisher for telemetry. 68 metrics: SharedMetricsPublisher, 69} 70 71impl RateLimitedHandleResolver { 72 /// Create a new rate-limited resolver. 73 /// 74 /// # Arguments 75 /// 76 /// * `inner` - The inner resolver to wrap 77 /// * `max_concurrent` - Maximum number of concurrent resolutions allowed 78 /// * `metrics` - Metrics publisher for telemetry 79 pub fn new( 80 inner: Arc<dyn HandleResolver>, 81 max_concurrent: usize, 82 metrics: SharedMetricsPublisher, 83 ) -> Self { 84 Self { 85 inner, 86 semaphore: Arc::new(Semaphore::new(max_concurrent)), 87 timeout_ms: None, 88 metrics, 89 } 90 } 91 92 /// Create a new rate-limited resolver with timeout. 93 /// 94 /// # Arguments 95 /// 96 /// * `inner` - The inner resolver to wrap 97 /// * `max_concurrent` - Maximum number of concurrent resolutions allowed 98 /// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout) 99 /// * `metrics` - Metrics publisher for telemetry 100 pub fn new_with_timeout( 101 inner: Arc<dyn HandleResolver>, 102 max_concurrent: usize, 103 timeout_ms: u64, 104 metrics: SharedMetricsPublisher, 105 ) -> Self { 106 Self { 107 inner, 108 semaphore: Arc::new(Semaphore::new(max_concurrent)), 109 timeout_ms: if timeout_ms > 0 { 110 Some(timeout_ms) 111 } else { 112 None 113 }, 114 metrics, 115 } 116 } 117} 118 119#[async_trait] 120impl HandleResolver for RateLimitedHandleResolver { 121 async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> { 122 let permit_start = std::time::Instant::now(); 123 124 // Track rate limiter queue depth 125 let available_permits = self.semaphore.available_permits(); 126 self.metrics 127 .gauge( 128 "resolver.rate_limit.available_permits", 129 available_permits as u64, 130 ) 131 .await; 132 133 // Acquire a permit from the semaphore, with optional timeout 134 let _permit = match self.timeout_ms { 135 Some(timeout_ms) if timeout_ms > 0 => { 136 // Apply timeout when acquiring permit 137 let duration = Duration::from_millis(timeout_ms); 138 match timeout(duration, self.semaphore.acquire()).await { 139 Ok(Ok(permit)) => { 140 let wait_ms = permit_start.elapsed().as_millis() as u64; 141 self.metrics 142 .time("resolver.rate_limit.permit_acquired", wait_ms) 143 .await; 144 permit 145 } 146 Ok(Err(e)) => { 147 // Semaphore error (e.g., closed) 148 self.metrics.incr("resolver.rate_limit.permit_error").await; 149 return Err(HandleResolverError::ResolutionFailed(format!( 150 "Failed to acquire rate limit permit: {}", 151 e 152 ))); 153 } 154 Err(_) => { 155 // Timeout occurred 156 self.metrics 157 .incr("resolver.rate_limit.permit_timeout") 158 .await; 159 return Err(HandleResolverError::ResolutionFailed(format!( 160 "Rate limit permit acquisition timed out after {}ms", 161 timeout_ms 162 ))); 163 } 164 } 165 } 166 _ => { 167 // No timeout configured, wait indefinitely 168 match self.semaphore.acquire().await { 169 Ok(permit) => { 170 let wait_ms = permit_start.elapsed().as_millis() as u64; 171 self.metrics 172 .time("resolver.rate_limit.permit_acquired", wait_ms) 173 .await; 174 permit 175 } 176 Err(e) => { 177 self.metrics.incr("resolver.rate_limit.permit_error").await; 178 return Err(HandleResolverError::ResolutionFailed(format!( 179 "Failed to acquire rate limit permit: {}", 180 e 181 ))); 182 } 183 } 184 } 185 }; 186 187 // With permit acquired, forward to inner resolver 188 self.inner.resolve(s).await 189 } 190 191 async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> { 192 // Set operations don't need rate limiting since they're typically administrative 193 // and don't involve network calls to external services 194 self.inner.set(handle, did).await 195 } 196} 197 198/// Create a rate-limited handle resolver. 199/// 200/// This factory function creates a new [`RateLimitedHandleResolver`] that wraps 201/// the provided inner resolver with concurrency limiting. 202/// 203/// # Arguments 204/// 205/// * `inner` - The resolver to wrap with rate limiting 206/// * `max_concurrent` - Maximum number of concurrent resolutions allowed 207/// * `metrics` - Metrics publisher for telemetry 208/// 209/// # Returns 210/// 211/// An `Arc<dyn HandleResolver>` that can be used wherever a handle resolver is needed. 212/// 213/// # Example 214/// 215/// ```no_run 216/// use std::sync::Arc; 217/// use quickdid::handle_resolver::{ 218/// create_base_resolver, 219/// create_rate_limited_resolver, 220/// }; 221/// 222/// # async fn example() { 223/// # use atproto_identity::resolve::HickoryDnsResolver; 224/// # use reqwest::Client; 225/// # use quickdid::metrics::NoOpMetricsPublisher; 226/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 227/// # let http_client = Client::new(); 228/// # let metrics = Arc::new(NoOpMetricsPublisher); 229/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone()); 230/// let rate_limited = create_rate_limited_resolver(base, 10, metrics); 231/// # } 232/// ``` 233pub fn create_rate_limited_resolver( 234 inner: Arc<dyn HandleResolver>, 235 max_concurrent: usize, 236 metrics: SharedMetricsPublisher, 237) -> Arc<dyn HandleResolver> { 238 Arc::new(RateLimitedHandleResolver::new( 239 inner, 240 max_concurrent, 241 metrics, 242 )) 243} 244 245/// Create a rate-limited handle resolver with timeout. 246/// 247/// This factory function creates a new [`RateLimitedHandleResolver`] that wraps 248/// the provided inner resolver with concurrency limiting and timeout for permit acquisition. 249/// 250/// # Arguments 251/// 252/// * `inner` - The resolver to wrap with rate limiting 253/// * `max_concurrent` - Maximum number of concurrent resolutions allowed 254/// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout) 255/// * `metrics` - Metrics publisher for telemetry 256/// 257/// # Returns 258/// 259/// An `Arc<dyn HandleResolver>` that can be used wherever a handle resolver is needed. 260/// 261/// # Example 262/// 263/// ```no_run 264/// use std::sync::Arc; 265/// use quickdid::handle_resolver::{ 266/// create_base_resolver, 267/// create_rate_limited_resolver_with_timeout, 268/// }; 269/// 270/// # async fn example() { 271/// # use atproto_identity::resolve::HickoryDnsResolver; 272/// # use reqwest::Client; 273/// # use quickdid::metrics::NoOpMetricsPublisher; 274/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 275/// # let http_client = Client::new(); 276/// # let metrics = Arc::new(NoOpMetricsPublisher); 277/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone()); 278/// // Rate limit with 10 concurrent resolutions and 5 second timeout 279/// let rate_limited = create_rate_limited_resolver_with_timeout(base, 10, 5000, metrics); 280/// # } 281/// ``` 282pub fn create_rate_limited_resolver_with_timeout( 283 inner: Arc<dyn HandleResolver>, 284 max_concurrent: usize, 285 timeout_ms: u64, 286 metrics: SharedMetricsPublisher, 287) -> Arc<dyn HandleResolver> { 288 Arc::new(RateLimitedHandleResolver::new_with_timeout( 289 inner, 290 max_concurrent, 291 timeout_ms, 292 metrics, 293 )) 294}