A decentralized event management and credentialing system built on atproto.
at main 431 lines 14 kB view raw
1//! Caching identity resolver implementation. 2//! 3//! Provides a three-layer caching strategy for DID document resolution: 4//! 1. In-memory LRU cache (fastest) 5//! 2. Database storage (persistent) 6//! 3. Base resolver (network fetch) 7 8use anyhow::Result; 9use async_trait::async_trait; 10use atproto_identity::{model::Document, resolve::IdentityResolver, storage::DidDocumentStorage}; 11use chrono::{Duration, Utc}; 12use lru::LruCache; 13use std::num::NonZeroUsize; 14use std::sync::Arc; 15use tokio::sync::RwLock; 16use tracing::warn; 17 18/// Configuration for the caching identity resolver 19#[derive(Clone, Debug)] 20pub struct CacheConfig { 21 /// Maximum number of entries in the in-memory cache 22 pub memory_cache_size: usize, 23 /// TTL for in-memory cache entries (in seconds) 24 pub memory_ttl_seconds: i64, 25} 26 27impl Default for CacheConfig { 28 fn default() -> Self { 29 Self { 30 memory_cache_size: 1000, 31 memory_ttl_seconds: 300, 32 } 33 } 34} 35 36/// Entry in the memory cache with timestamp 37#[derive(Clone)] 38struct CachedDocument { 39 document: Document, 40 cached_at: chrono::DateTime<Utc>, 41} 42 43impl CachedDocument { 44 fn is_expired(&self, ttl_seconds: i64) -> bool { 45 let age = Utc::now() - self.cached_at; 46 age > Duration::seconds(ttl_seconds) 47 } 48} 49 50/// A caching identity resolver that uses multiple layers of caching. 51/// 52/// Resolution order: 53/// 1. Check in-memory LRU cache 54/// 2. Check database storage 55/// 3. Resolve using base resolver 56/// 4. Update both caches with the result 57pub struct CachingIdentityResolver<R, S> 58where 59 R: IdentityResolver + 'static, 60 S: DidDocumentStorage + 'static, 61{ 62 /// The base resolver for actual DID resolution 63 base_resolver: Arc<R>, 64 /// Database storage for persistent caching 65 storage: Arc<S>, 66 /// In-memory LRU cache 67 memory_cache: Arc<RwLock<LruCache<String, CachedDocument>>>, 68 /// Cache configuration 69 config: CacheConfig, 70} 71 72impl<R, S> CachingIdentityResolver<R, S> 73where 74 R: IdentityResolver + 'static, 75 S: DidDocumentStorage + 'static, 76{ 77 /// Creates a new caching identity resolver with default configuration 78 pub fn new(base_resolver: Arc<R>, storage: Arc<S>) -> Self { 79 Self::with_config(base_resolver, storage, CacheConfig::default()) 80 } 81 82 /// Creates a new caching identity resolver with custom configuration 83 pub fn with_config(base_resolver: Arc<R>, storage: Arc<S>, config: CacheConfig) -> Self { 84 let cache_size = NonZeroUsize::new(config.memory_cache_size.max(1)) 85 .expect("Cache size must be at least 1"); 86 87 Self { 88 base_resolver, 89 storage, 90 memory_cache: Arc::new(RwLock::new(LruCache::new(cache_size))), 91 config, 92 } 93 } 94 95 /// Normalize the subject to a consistent format for caching 96 fn normalize_subject(subject: &str) -> String { 97 // Convert handles to lowercase, keep DIDs as-is 98 if subject.starts_with("did:") { 99 subject.to_string() 100 } else { 101 subject.to_lowercase() 102 } 103 } 104 105 /// Try to get a document from the in-memory cache 106 async fn get_from_memory(&self, subject: &str) -> Option<Document> { 107 let normalized = Self::normalize_subject(subject); 108 let mut cache = self.memory_cache.write().await; 109 110 if let Some(cached) = cache.get(&normalized) { 111 if !cached.is_expired(self.config.memory_ttl_seconds) { 112 return Some(cached.document.clone()); 113 } else { 114 // Remove expired entry 115 cache.pop(&normalized); 116 } 117 } 118 119 None 120 } 121 122 /// Try to get a document from database storage 123 async fn get_from_storage(&self, subject: &str) -> Option<Document> { 124 // First resolve the subject to a DID if it's a handle 125 // This is tricky because we need the DID to query storage 126 // For now, we'll only check storage if the subject is already a DID 127 if !subject.starts_with("did:") { 128 return None; 129 } 130 131 match self.storage.as_ref().get_document_by_did(subject).await { 132 Ok(Some(document)) => { 133 134 // Check if the database entry is still fresh enough 135 // Note: The storage implementation doesn't provide timestamps, 136 // so we'll trust it for now. In a real implementation, you'd 137 // want to add timestamp tracking to the storage layer. 138 Some(document) 139 } 140 Ok(None) => { 141 None 142 } 143 Err(_) => { 144 warn!("Failed to query database cache"); 145 None 146 } 147 } 148 } 149 150 /// Store a document in both memory and database caches 151 async fn store_in_caches(&self, subject: &str, document: Document) { 152 let normalized = Self::normalize_subject(subject); 153 154 // Store in memory cache 155 { 156 let mut cache = self.memory_cache.write().await; 157 cache.put( 158 normalized.clone(), 159 CachedDocument { 160 document: document.clone(), 161 cached_at: Utc::now(), 162 }, 163 ); 164 } 165 166 // Store in database 167 if let Err(e) = self.storage.as_ref().store_document(document.clone()).await { 168 warn!("Failed to store document in database cache: {}", e); 169 } else { 170 } 171 172 // Also store by handle if the subject was a handle 173 if !subject.starts_with("did:") { 174 // Store the handle -> document mapping in memory 175 let mut cache = self.memory_cache.write().await; 176 cache.put( 177 Self::normalize_subject(subject), 178 CachedDocument { 179 document: document.clone(), 180 cached_at: Utc::now(), 181 }, 182 ); 183 } 184 } 185 186 /// Refresh a stale entry in the background 187 #[allow(dead_code)] 188 async fn background_refresh(&self, subject: String) { 189 let resolver = self.base_resolver.clone(); 190 let storage = self.storage.clone(); 191 let memory_cache = self.memory_cache.clone(); 192 193 tokio::spawn(async move { 194 195 match resolver.resolve(&subject).await { 196 Ok(document) => { 197 // Update memory cache 198 let normalized = Self::normalize_subject(&subject); 199 let mut cache = memory_cache.write().await; 200 cache.put( 201 normalized, 202 CachedDocument { 203 document: document.clone(), 204 cached_at: Utc::now(), 205 }, 206 ); 207 208 // Update database 209 if let Err(e) = storage.as_ref().store_document(document).await { 210 warn!("Failed to update database during background refresh: {}", e); 211 } 212 } 213 Err(_) => { 214 warn!("Background refresh failed"); 215 } 216 } 217 }); 218 } 219 220 /// Clear all caches 221 pub async fn clear_caches(&self) -> Result<()> { 222 // Clear memory cache 223 let mut cache = self.memory_cache.write().await; 224 cache.clear(); 225 226 // Note: Database clearing would need to be implemented in the storage layer 227 // For now, we just clear the memory cache 228 229 Ok(()) 230 } 231 232 /// Get cache statistics 233 pub async fn cache_stats(&self) -> CacheStats { 234 let cache = self.memory_cache.read().await; 235 CacheStats { 236 memory_entries: cache.len(), 237 memory_capacity: cache.cap().get(), 238 } 239 } 240} 241 242#[async_trait] 243impl<R, S> IdentityResolver for CachingIdentityResolver<R, S> 244where 245 R: IdentityResolver + 'static, 246 S: DidDocumentStorage + 'static, 247{ 248 async fn resolve(&self, subject: &str) -> Result<Document> { 249 // 1. Check memory cache 250 if let Some(document) = self.get_from_memory(subject).await { 251 return Ok(document); 252 } 253 254 // 2. Check database storage 255 if let Some(document) = self.get_from_storage(subject).await { 256 // Store in memory cache for faster future access 257 self.store_in_caches(subject, document.clone()).await; 258 return Ok(document); 259 } 260 261 // 3. Resolve using base resolver 262 let document = self.base_resolver.resolve(subject).await?; 263 264 // 4. Store in both caches 265 self.store_in_caches(subject, document.clone()).await; 266 267 Ok(document) 268 } 269} 270 271/// Statistics about the cache 272#[derive(Debug, Clone)] 273pub struct CacheStats { 274 pub memory_entries: usize, 275 pub memory_capacity: usize, 276} 277 278#[cfg(test)] 279mod tests { 280 use super::*; 281 use std::collections::HashMap; 282 283 /// Mock storage for testing 284 struct MockStorage { 285 documents: Arc<RwLock<HashMap<String, Document>>>, 286 } 287 288 impl MockStorage { 289 fn new() -> Self { 290 Self { 291 documents: Arc::new(RwLock::new(HashMap::new())), 292 } 293 } 294 } 295 296 #[async_trait] 297 impl DidDocumentStorage for MockStorage { 298 async fn get_document_by_did(&self, did: &str) -> Result<Option<Document>> { 299 let docs = self.documents.read().await; 300 Ok(docs.get(did).cloned()) 301 } 302 303 async fn store_document(&self, document: Document) -> Result<()> { 304 let mut docs = self.documents.write().await; 305 docs.insert(document.id.clone(), document); 306 Ok(()) 307 } 308 309 async fn delete_document_by_did(&self, did: &str) -> Result<()> { 310 let mut docs = self.documents.write().await; 311 docs.remove(did); 312 Ok(()) 313 } 314 } 315 316 /// Mock resolver for testing 317 struct MockResolver { 318 documents: Arc<RwLock<HashMap<String, Document>>>, 319 call_count: Arc<RwLock<usize>>, 320 } 321 322 impl MockResolver { 323 fn new() -> Self { 324 Self { 325 documents: Arc::new(RwLock::new(HashMap::new())), 326 call_count: Arc::new(RwLock::new(0)), 327 } 328 } 329 330 async fn add_document(&self, did: String, document: Document) { 331 let mut docs = self.documents.write().await; 332 docs.insert(did, document); 333 } 334 335 async fn get_call_count(&self) -> usize { 336 *self.call_count.read().await 337 } 338 } 339 340 #[async_trait] 341 impl IdentityResolver for MockResolver { 342 async fn resolve(&self, subject: &str) -> Result<Document> { 343 let mut count = self.call_count.write().await; 344 *count += 1; 345 346 let docs = self.documents.read().await; 347 docs.get(subject) 348 .cloned() 349 .ok_or_else(|| anyhow::anyhow!("Document not found")) 350 } 351 } 352 353 #[tokio::test] 354 async fn test_caching_resolver_memory_cache() { 355 let base_resolver = Arc::new(MockResolver::new()); 356 let storage = Arc::new(MockStorage::new()); 357 358 let test_did = "did:plc:test123"; 359 let test_doc = Document { 360 id: test_did.to_string(), 361 also_known_as: vec!["test.bsky.social".to_string()], 362 verification_method: vec![], 363 service: vec![], 364 context: vec![], 365 extra: Default::default(), 366 }; 367 368 base_resolver 369 .add_document(test_did.to_string(), test_doc.clone()) 370 .await; 371 372 let config = CacheConfig { 373 memory_cache_size: 10, 374 memory_ttl_seconds: 60, 375 }; 376 377 let caching_resolver = 378 CachingIdentityResolver::with_config(base_resolver.clone(), storage, config); 379 380 // First resolve should hit the base resolver 381 let result1 = caching_resolver.resolve(test_did).await.unwrap(); 382 assert_eq!(result1.id, test_did.to_string()); 383 assert_eq!(base_resolver.get_call_count().await, 1); 384 385 // Second resolve should hit the memory cache 386 let result2 = caching_resolver.resolve(test_did).await.unwrap(); 387 assert_eq!(result2.id, test_did.to_string()); 388 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call 389 390 // Verify cache stats 391 let stats = caching_resolver.cache_stats().await; 392 assert_eq!(stats.memory_entries, 1); 393 assert_eq!(stats.memory_capacity, 10); 394 } 395 396 #[tokio::test] 397 async fn test_handle_normalization() { 398 let base_resolver = Arc::new(MockResolver::new()); 399 let storage = Arc::new(MockStorage::new()); 400 401 let test_handle = "Test.BSKY.Social"; 402 let normalized_handle = "test.bsky.social"; 403 let test_doc = Document { 404 id: "did:plc:test123".to_string(), 405 also_known_as: vec![normalized_handle.to_string()], 406 verification_method: vec![], 407 service: vec![], 408 context: vec![], 409 extra: Default::default(), 410 }; 411 412 base_resolver 413 .add_document(test_handle.to_string(), test_doc.clone()) 414 .await; 415 base_resolver 416 .add_document(normalized_handle.to_string(), test_doc.clone()) 417 .await; 418 419 let caching_resolver = CachingIdentityResolver::new(base_resolver.clone(), storage); 420 421 // Resolve with uppercase handle 422 let result1 = caching_resolver.resolve(test_handle).await.unwrap(); 423 assert_eq!(base_resolver.get_call_count().await, 1); 424 425 // Resolve with lowercase handle - should hit cache 426 let result2 = caching_resolver.resolve(normalized_handle).await.unwrap(); 427 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call 428 429 assert_eq!(result1.id, result2.id); 430 } 431}