QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at 3ba9119de263e0d1f9d1d0c4e4ec3efddc4eaaaf 547 lines 21 kB view raw
1//! SQLite-backed caching handle resolver. 2//! 3//! This module provides a handle resolver that caches resolution results in SQLite 4//! with configurable expiration times. SQLite caching provides persistence across 5//! service restarts while remaining lightweight for single-instance deployments. 6 7use super::errors::HandleResolverError; 8use super::traits::HandleResolver; 9use crate::handle_resolution_result::HandleResolutionResult; 10use crate::metrics::SharedMetricsPublisher; 11use async_trait::async_trait; 12use metrohash::MetroHash64; 13use sqlx::{Row, SqlitePool}; 14use std::hash::Hasher as _; 15use std::sync::Arc; 16use std::time::{SystemTime, UNIX_EPOCH}; 17 18/// SQLite-backed caching handle resolver. 19/// 20/// This resolver caches handle resolution results in SQLite with a configurable TTL. 21/// Results are stored in a compact binary format using bincode serialization 22/// to minimize storage overhead. 23/// 24/// # Features 25/// 26/// - Persistent caching across service restarts 27/// - Lightweight single-file database 28/// - Configurable TTL (default: 90 days) 29/// - Compact binary storage format 30/// - Automatic schema management 31/// - Graceful fallback if SQLite is unavailable 32/// 33/// # Example 34/// 35/// ```no_run 36/// use std::sync::Arc; 37/// use sqlx::SqlitePool; 38/// use quickdid::handle_resolver::{create_base_resolver, create_sqlite_resolver, HandleResolver}; 39/// use quickdid::metrics::NoOpMetricsPublisher; 40/// 41/// # async fn example() { 42/// # use atproto_identity::resolve::HickoryDnsResolver; 43/// # use reqwest::Client; 44/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 45/// # let http_client = Client::new(); 46/// # let metrics = Arc::new(NoOpMetricsPublisher); 47/// # let base_resolver = create_base_resolver(dns_resolver, http_client, metrics.clone()); 48/// # let sqlite_pool: SqlitePool = todo!(); 49/// // Create with default 90-day TTL 50/// let resolver = create_sqlite_resolver( 51/// base_resolver, 52/// sqlite_pool, 53/// metrics 54/// ); 55/// # } 56/// ``` 57pub(super) struct SqliteHandleResolver { 58 /// Base handle resolver to perform actual resolution 59 inner: Arc<dyn HandleResolver>, 60 /// SQLite connection pool 61 pool: SqlitePool, 62 /// TTL for cache entries in seconds 63 ttl_seconds: u64, 64 /// Metrics publisher for telemetry 65 metrics: SharedMetricsPublisher, 66} 67 68impl SqliteHandleResolver { 69 /// Create a new SQLite-backed handle resolver with default 90-day TTL. 70 fn new( 71 inner: Arc<dyn HandleResolver>, 72 pool: SqlitePool, 73 metrics: SharedMetricsPublisher, 74 ) -> Self { 75 Self::with_ttl(inner, pool, 90 * 24 * 60 * 60, metrics) // 90 days default 76 } 77 78 /// Create a new SQLite-backed handle resolver with custom TTL. 79 fn with_ttl( 80 inner: Arc<dyn HandleResolver>, 81 pool: SqlitePool, 82 ttl_seconds: u64, 83 metrics: SharedMetricsPublisher, 84 ) -> Self { 85 Self { 86 inner, 87 pool, 88 ttl_seconds, 89 metrics, 90 } 91 } 92 93 /// Generate the cache key for a handle. 94 /// 95 /// Uses MetroHash64 to generate a consistent hash of the handle 96 /// for use as the primary key. This provides better key distribution 97 /// and avoids issues with special characters in handles. 98 fn make_key(&self, handle: &str) -> u64 { 99 let mut h = MetroHash64::default(); 100 h.write(handle.as_bytes()); 101 h.finish() 102 } 103 104 /// Check if a cache entry is expired. 105 fn is_expired(&self, updated_timestamp: i64) -> bool { 106 let current_timestamp = SystemTime::now() 107 .duration_since(UNIX_EPOCH) 108 .unwrap_or_default() 109 .as_secs() as i64; 110 111 (current_timestamp - updated_timestamp) > (self.ttl_seconds as i64) 112 } 113} 114 115#[async_trait] 116impl HandleResolver for SqliteHandleResolver { 117 async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> { 118 let handle = s.to_string(); 119 let key = self.make_key(&handle) as i64; // SQLite uses signed integers 120 121 // Try to get from SQLite cache first 122 let cached_result = 123 sqlx::query("SELECT result, updated FROM handle_resolution_cache WHERE key = ?1") 124 .bind(key) 125 .fetch_optional(&self.pool) 126 .await; 127 128 match cached_result { 129 Ok(Some(row)) => { 130 let cached_bytes: Vec<u8> = row.get("result"); 131 let updated_timestamp: i64 = row.get("updated"); 132 133 // Check if the entry is expired 134 if !self.is_expired(updated_timestamp) { 135 // Deserialize the cached result 136 match HandleResolutionResult::from_bytes(&cached_bytes) { 137 Ok(cached_result) => { 138 if let Some(did) = cached_result.to_did() { 139 tracing::debug!("Cache hit for handle {}: {}", handle, did); 140 self.metrics.incr("resolver.sqlite.cache_hit").await; 141 return Ok((did, cached_result.timestamp)); 142 } else { 143 tracing::debug!("Cache hit (not resolved) for handle {}", handle); 144 self.metrics 145 .incr("resolver.sqlite.cache_hit_not_resolved") 146 .await; 147 return Err(HandleResolverError::HandleNotFound); 148 } 149 } 150 Err(e) => { 151 tracing::warn!( 152 "Failed to deserialize cached result for handle {}: {}", 153 handle, 154 e 155 ); 156 self.metrics.incr("resolver.sqlite.deserialize_error").await; 157 // Fall through to re-resolve if deserialization fails 158 } 159 } 160 } else { 161 tracing::debug!("Cache entry expired for handle {}", handle); 162 self.metrics.incr("resolver.sqlite.cache_expired").await; 163 // Entry is expired, we'll re-resolve and update it 164 } 165 } 166 Ok(None) => { 167 tracing::debug!("Cache miss for handle {}, resolving...", handle); 168 self.metrics.incr("resolver.sqlite.cache_miss").await; 169 } 170 Err(e) => { 171 tracing::warn!("Failed to query SQLite cache for handle {}: {}", handle, e); 172 self.metrics.incr("resolver.sqlite.query_error").await; 173 // Fall through to resolve without caching on database error 174 } 175 } 176 177 // Not in cache or expired, resolve through inner resolver 178 let result = self.inner.resolve(s).await; 179 180 // Create and serialize resolution result 181 let resolution_result = match &result { 182 Ok((did, _timestamp)) => { 183 tracing::debug!( 184 "Caching successful resolution for handle {}: {}", 185 handle, 186 did 187 ); 188 match HandleResolutionResult::success(did) { 189 Ok(res) => res, 190 Err(e) => { 191 tracing::warn!("Failed to create resolution result: {}", e); 192 self.metrics 193 .incr("resolver.sqlite.result_create_error") 194 .await; 195 return result; 196 } 197 } 198 } 199 Err(e) => { 200 tracing::debug!("Caching failed resolution for handle {}: {}", handle, e); 201 match HandleResolutionResult::not_resolved() { 202 Ok(res) => res, 203 Err(err) => { 204 tracing::warn!("Failed to create not_resolved result: {}", err); 205 self.metrics 206 .incr("resolver.sqlite.result_create_error") 207 .await; 208 return result; 209 } 210 } 211 } 212 }; 213 214 // Serialize to bytes 215 match resolution_result.to_bytes() { 216 Ok(bytes) => { 217 let current_timestamp = SystemTime::now() 218 .duration_since(UNIX_EPOCH) 219 .unwrap_or_default() 220 .as_secs() as i64; 221 222 // Insert or update the cache entry 223 let query_result = sqlx::query( 224 r#" 225 INSERT INTO handle_resolution_cache (key, result, created, updated) 226 VALUES (?1, ?2, ?3, ?4) 227 ON CONFLICT(key) DO UPDATE SET 228 result = excluded.result, 229 updated = excluded.updated 230 "#, 231 ) 232 .bind(key) 233 .bind(&bytes) 234 .bind(current_timestamp) 235 .bind(current_timestamp) 236 .execute(&self.pool) 237 .await; 238 239 if let Err(e) = query_result { 240 tracing::warn!("Failed to cache handle resolution in SQLite: {}", e); 241 self.metrics.incr("resolver.sqlite.cache_set_error").await; 242 } else { 243 self.metrics.incr("resolver.sqlite.cache_set").await; 244 } 245 } 246 Err(e) => { 247 tracing::warn!( 248 "Failed to serialize resolution result for handle {}: {}", 249 handle, 250 e 251 ); 252 self.metrics.incr("resolver.sqlite.serialize_error").await; 253 } 254 } 255 256 result 257 } 258} 259 260/// Create a new SQLite-backed handle resolver with default 90-day TTL. 261/// 262/// # Arguments 263/// 264/// * `inner` - The underlying resolver to use for actual resolution 265/// * `pool` - SQLite connection pool 266/// * `metrics` - Metrics publisher for telemetry 267/// 268/// # Example 269/// 270/// ```no_run 271/// use std::sync::Arc; 272/// use quickdid::handle_resolver::{create_base_resolver, create_sqlite_resolver, HandleResolver}; 273/// use quickdid::sqlite_schema::create_sqlite_pool; 274/// use quickdid::metrics::NoOpMetricsPublisher; 275/// 276/// # async fn example() -> anyhow::Result<()> { 277/// # use atproto_identity::resolve::HickoryDnsResolver; 278/// # use reqwest::Client; 279/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 280/// # let http_client = Client::new(); 281/// # let metrics = Arc::new(NoOpMetricsPublisher); 282/// let base = create_base_resolver( 283/// dns_resolver, 284/// http_client, 285/// metrics.clone(), 286/// ); 287/// 288/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 289/// let resolver = create_sqlite_resolver(base, pool, metrics); 290/// let (did, timestamp) = resolver.resolve("alice.bsky.social").await.unwrap(); 291/// # Ok(()) 292/// # } 293/// ``` 294pub fn create_sqlite_resolver( 295 inner: Arc<dyn HandleResolver>, 296 pool: SqlitePool, 297 metrics: SharedMetricsPublisher, 298) -> Arc<dyn HandleResolver> { 299 Arc::new(SqliteHandleResolver::new(inner, pool, metrics)) 300} 301 302/// Create a new SQLite-backed handle resolver with custom TTL. 303/// 304/// # Arguments 305/// 306/// * `inner` - The underlying resolver to use for actual resolution 307/// * `pool` - SQLite connection pool 308/// * `ttl_seconds` - TTL for cache entries in seconds 309/// * `metrics` - Metrics publisher for telemetry 310pub fn create_sqlite_resolver_with_ttl( 311 inner: Arc<dyn HandleResolver>, 312 pool: SqlitePool, 313 ttl_seconds: u64, 314 metrics: SharedMetricsPublisher, 315) -> Arc<dyn HandleResolver> { 316 Arc::new(SqliteHandleResolver::with_ttl( 317 inner, 318 pool, 319 ttl_seconds, 320 metrics, 321 )) 322} 323 324#[cfg(test)] 325mod tests { 326 use super::*; 327 328 // Mock handle resolver for testing 329 #[derive(Clone)] 330 struct MockHandleResolver { 331 should_fail: bool, 332 expected_did: String, 333 } 334 335 #[async_trait] 336 impl HandleResolver for MockHandleResolver { 337 async fn resolve(&self, _handle: &str) -> Result<(String, u64), HandleResolverError> { 338 if self.should_fail { 339 Err(HandleResolverError::MockResolutionFailure) 340 } else { 341 let timestamp = std::time::SystemTime::now() 342 .duration_since(std::time::UNIX_EPOCH) 343 .unwrap_or_default() 344 .as_secs(); 345 Ok((self.expected_did.clone(), timestamp)) 346 } 347 } 348 } 349 350 #[tokio::test] 351 async fn test_sqlite_handle_resolver_cache_hit() { 352 // Create in-memory SQLite database for testing 353 let pool = SqlitePool::connect("sqlite::memory:") 354 .await 355 .expect("Failed to connect to in-memory SQLite"); 356 357 // Create the schema 358 crate::sqlite_schema::create_schema(&pool) 359 .await 360 .expect("Failed to create schema"); 361 362 // Create mock resolver 363 let mock_resolver = Arc::new(MockHandleResolver { 364 should_fail: false, 365 expected_did: "did:plc:testuser123".to_string(), 366 }); 367 368 // Create metrics publisher 369 let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher); 370 371 // Create SQLite-backed resolver 372 let sqlite_resolver = 373 SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics); 374 375 let test_handle = "alice.bsky.social"; 376 let expected_key = sqlite_resolver.make_key(test_handle) as i64; 377 378 // Verify database is empty initially 379 let initial_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 380 .fetch_one(&pool) 381 .await 382 .expect("Failed to query initial count"); 383 assert_eq!(initial_count, 0); 384 385 // First resolution - should call inner resolver and cache the result 386 let (result1, _timestamp1) = sqlite_resolver.resolve(test_handle).await.unwrap(); 387 assert_eq!(result1, "did:plc:testuser123"); 388 389 // Verify record was inserted 390 let count_after_first: i64 = 391 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 392 .fetch_one(&pool) 393 .await 394 .expect("Failed to query count after first resolution"); 395 assert_eq!(count_after_first, 1); 396 397 // Verify the cached record has correct key and non-empty result 398 let cached_record = sqlx::query( 399 "SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1", 400 ) 401 .bind(expected_key) 402 .fetch_one(&pool) 403 .await 404 .expect("Failed to fetch cached record"); 405 406 let cached_key: i64 = cached_record.get("key"); 407 let cached_result: Vec<u8> = cached_record.get("result"); 408 let cached_created: i64 = cached_record.get("created"); 409 let cached_updated: i64 = cached_record.get("updated"); 410 411 assert_eq!(cached_key, expected_key); 412 assert!( 413 !cached_result.is_empty(), 414 "Cached result should not be empty" 415 ); 416 assert!(cached_created > 0, "Created timestamp should be positive"); 417 assert!(cached_updated > 0, "Updated timestamp should be positive"); 418 assert_eq!( 419 cached_created, cached_updated, 420 "Created and updated should be equal on first insert" 421 ); 422 423 // Verify we can deserialize the cached result 424 let resolution_result = 425 crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result) 426 .expect("Failed to deserialize cached result"); 427 let cached_did = resolution_result.to_did().expect("Should have a DID"); 428 assert_eq!(cached_did, "did:plc:testuser123"); 429 430 // Second resolution - should hit cache (no additional database insert) 431 let (result2, _timestamp2) = sqlite_resolver.resolve(test_handle).await.unwrap(); 432 assert_eq!(result2, "did:plc:testuser123"); 433 434 // Verify count hasn't changed (cache hit, no new insert) 435 let count_after_second: i64 = 436 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 437 .fetch_one(&pool) 438 .await 439 .expect("Failed to query count after second resolution"); 440 assert_eq!(count_after_second, 1); 441 } 442 443 #[tokio::test] 444 async fn test_sqlite_handle_resolver_cache_error() { 445 // Create in-memory SQLite database for testing 446 let pool = SqlitePool::connect("sqlite::memory:") 447 .await 448 .expect("Failed to connect to in-memory SQLite"); 449 450 // Create the schema 451 crate::sqlite_schema::create_schema(&pool) 452 .await 453 .expect("Failed to create schema"); 454 455 // Create mock resolver that fails 456 let mock_resolver = Arc::new(MockHandleResolver { 457 should_fail: true, 458 expected_did: String::new(), 459 }); 460 461 // Create metrics publisher 462 let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher); 463 464 // Create SQLite-backed resolver 465 let sqlite_resolver = 466 SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics); 467 468 let test_handle = "error.bsky.social"; 469 let expected_key = sqlite_resolver.make_key(test_handle) as i64; 470 471 // Verify database is empty initially 472 let initial_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 473 .fetch_one(&pool) 474 .await 475 .expect("Failed to query initial count"); 476 assert_eq!(initial_count, 0); 477 478 // First resolution - should fail and cache the failure 479 let result1 = sqlite_resolver.resolve(test_handle).await; 480 assert!(result1.is_err()); 481 482 // Match the specific error type we expect 483 match result1 { 484 Err(HandleResolverError::MockResolutionFailure) => {} 485 other => panic!("Expected MockResolutionFailure, got {:?}", other), 486 } 487 488 // Verify the failure was cached 489 let count_after_first: i64 = 490 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 491 .fetch_one(&pool) 492 .await 493 .expect("Failed to query count after first resolution"); 494 assert_eq!(count_after_first, 1); 495 496 // Verify the cached error record 497 let cached_record = sqlx::query( 498 "SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1", 499 ) 500 .bind(expected_key) 501 .fetch_one(&pool) 502 .await 503 .expect("Failed to fetch cached error record"); 504 505 let cached_key: i64 = cached_record.get("key"); 506 let cached_result: Vec<u8> = cached_record.get("result"); 507 let cached_created: i64 = cached_record.get("created"); 508 let cached_updated: i64 = cached_record.get("updated"); 509 510 assert_eq!(cached_key, expected_key); 511 assert!( 512 !cached_result.is_empty(), 513 "Cached error result should not be empty" 514 ); 515 assert!(cached_created > 0, "Created timestamp should be positive"); 516 assert!(cached_updated > 0, "Updated timestamp should be positive"); 517 assert_eq!( 518 cached_created, cached_updated, 519 "Created and updated should be equal on first insert" 520 ); 521 522 // Verify we can deserialize the cached error result 523 let resolution_result = 524 crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result) 525 .expect("Failed to deserialize cached error result"); 526 let cached_did = resolution_result.to_did(); 527 assert!(cached_did.is_none(), "Error result should have no DID"); 528 529 // Second resolution - should hit cache with error (no additional database operations) 530 let result2 = sqlite_resolver.resolve(test_handle).await; 531 assert!(result2.is_err()); 532 533 // Match the specific error type we expect from cache 534 match result2 { 535 Err(HandleResolverError::HandleNotFound) => {} // Cache returns HandleNotFound for "not resolved" 536 other => panic!("Expected HandleNotFound from cache, got {:?}", other), 537 } 538 539 // Verify count hasn't changed (cache hit, no new operations) 540 let count_after_second: i64 = 541 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 542 .fetch_one(&pool) 543 .await 544 .expect("Failed to query count after second resolution"); 545 assert_eq!(count_after_second, 1); 546 } 547}