The smokesignal.events web application
at main 430 lines 14 kB view raw
1//! Caching identity resolver implementation. 2//! 3//! Provides a three-layer caching strategy for DID document resolution: 4//! 1. In-memory LRU cache (fastest) 5//! 2. Database storage (persistent) 6//! 3. Base resolver (network fetch) 7 8use anyhow::Result; 9use async_trait::async_trait; 10use atproto_identity::{model::Document, resolve::IdentityResolver, traits::DidDocumentStorage}; 11use chrono::{Duration, Utc}; 12 13// TODO: Use a different library because lru uses unsafe. 14use lru::LruCache; 15 16use std::num::NonZeroUsize; 17use std::sync::Arc; 18use tokio::sync::RwLock; 19use tracing::warn; 20 21/// Configuration for the caching identity resolver 22#[derive(Clone, Debug)] 23pub struct CacheConfig { 24 /// Maximum number of entries in the in-memory cache 25 pub memory_cache_size: usize, 26 /// TTL for in-memory cache entries (in seconds) 27 pub memory_ttl_seconds: i64, 28} 29 30impl Default for CacheConfig { 31 fn default() -> Self { 32 Self { 33 memory_cache_size: 1000, 34 memory_ttl_seconds: 300, 35 } 36 } 37} 38 39/// Entry in the memory cache with timestamp 40#[derive(Clone)] 41struct CachedDocument { 42 document: Document, 43 cached_at: chrono::DateTime<Utc>, 44} 45 46impl CachedDocument { 47 fn is_expired(&self, ttl_seconds: i64) -> bool { 48 let age = Utc::now() - self.cached_at; 49 age > Duration::seconds(ttl_seconds) 50 } 51} 52 53/// A caching identity resolver that uses multiple layers of caching. 54/// 55/// Resolution order: 56/// 1. Check in-memory LRU cache 57/// 2. Check database storage 58/// 3. Resolve using base resolver 59/// 4. Update both caches with the result 60pub struct CachingIdentityResolver<R, S> 61where 62 R: IdentityResolver + 'static, 63 S: DidDocumentStorage + 'static, 64{ 65 /// The base resolver for actual DID resolution 66 base_resolver: Arc<R>, 67 /// Database storage for persistent caching 68 storage: Arc<S>, 69 /// In-memory LRU cache 70 memory_cache: Arc<RwLock<LruCache<String, CachedDocument>>>, 71 72 /// Cache configuration 73 config: CacheConfig, 74} 75 76impl<R, S> CachingIdentityResolver<R, S> 77where 78 R: IdentityResolver + 'static, 79 S: DidDocumentStorage + 'static, 80{ 81 /// Creates a new caching identity resolver with default configuration 82 pub fn new(base_resolver: Arc<R>, storage: Arc<S>) -> Self { 83 Self::with_config(base_resolver, storage, CacheConfig::default()) 84 } 85 86 /// Creates a new caching identity resolver with custom configuration 87 pub fn with_config(base_resolver: Arc<R>, storage: Arc<S>, config: CacheConfig) -> Self { 88 let cache_size = NonZeroUsize::new(config.memory_cache_size.max(1)) 89 .expect("Cache size must be at least 1"); 90 91 Self { 92 base_resolver, 93 storage, 94 memory_cache: Arc::new(RwLock::new(LruCache::new(cache_size))), 95 config, 96 } 97 } 98 99 /// Normalize the subject to a consistent format for caching 100 fn normalize_subject(subject: &str) -> String { 101 // Convert handles to lowercase, keep DIDs as-is 102 if subject.starts_with("did:") { 103 subject.to_string() 104 } else { 105 subject.to_lowercase() 106 } 107 } 108 109 /// Try to get a document from the in-memory cache 110 async fn get_from_memory(&self, subject: &str) -> Option<Document> { 111 let normalized = Self::normalize_subject(subject); 112 let mut cache = self.memory_cache.write().await; 113 114 if let Some(cached) = cache.get(&normalized) { 115 if !cached.is_expired(self.config.memory_ttl_seconds) { 116 return Some(cached.document.clone()); 117 } else { 118 // Remove expired entry 119 cache.pop(&normalized); 120 } 121 } 122 123 None 124 } 125 126 /// Try to get a document from database storage 127 async fn get_from_storage(&self, subject: &str) -> Option<Document> { 128 // First resolve the subject to a DID if it's a handle 129 // This is tricky because we need the DID to query storage 130 // For now, we'll only check storage if the subject is already a DID 131 if !subject.starts_with("did:") { 132 return None; 133 } 134 135 match self.storage.as_ref().get_document_by_did(subject).await { 136 Ok(Some(document)) => { 137 // Check if the database entry is still fresh enough 138 // Note: The storage implementation doesn't provide timestamps, 139 // so we'll trust it for now. In a real implementation, you'd 140 // want to add timestamp tracking to the storage layer. 141 Some(document) 142 } 143 Ok(None) => None, 144 Err(_) => { 145 warn!("Failed to query database cache"); 146 None 147 } 148 } 149 } 150 151 /// Store a document in both memory and database caches 152 async fn store_in_caches(&self, subject: &str, document: Document) { 153 let normalized = Self::normalize_subject(subject); 154 155 // Store in memory cache 156 { 157 let mut cache = self.memory_cache.write().await; 158 cache.put( 159 normalized.clone(), 160 CachedDocument { 161 document: document.clone(), 162 cached_at: Utc::now(), 163 }, 164 ); 165 } 166 167 // Store in database 168 if let Err(e) = self.storage.as_ref().store_document(document.clone()).await { 169 warn!("Failed to store document in database cache: {}", e); 170 } 171 172 // Also store by handle if the subject was a handle 173 if !subject.starts_with("did:") { 174 // Store the handle -> document mapping in memory 175 let mut cache = self.memory_cache.write().await; 176 cache.put( 177 Self::normalize_subject(subject), 178 CachedDocument { 179 document: document.clone(), 180 cached_at: Utc::now(), 181 }, 182 ); 183 } 184 } 185 186 /// Refresh a stale entry in the background 187 #[allow(dead_code)] 188 async fn background_refresh(&self, subject: String) { 189 let resolver = self.base_resolver.clone(); 190 let storage = self.storage.clone(); 191 let memory_cache = self.memory_cache.clone(); 192 193 tokio::spawn(async move { 194 match resolver.resolve(&subject).await { 195 Ok(document) => { 196 // Update memory cache 197 let normalized = Self::normalize_subject(&subject); 198 let mut cache = memory_cache.write().await; 199 cache.put( 200 normalized, 201 CachedDocument { 202 document: document.clone(), 203 cached_at: Utc::now(), 204 }, 205 ); 206 207 // Update database 208 if let Err(e) = storage.as_ref().store_document(document).await { 209 warn!("Failed to update database during background refresh: {}", e); 210 } 211 } 212 Err(_) => { 213 warn!("Background refresh failed"); 214 } 215 } 216 }); 217 } 218 219 /// Clear all caches 220 pub async fn clear_caches(&self) -> Result<()> { 221 // Clear memory cache 222 let mut cache = self.memory_cache.write().await; 223 cache.clear(); 224 225 // Note: Database clearing would need to be implemented in the storage layer 226 // For now, we just clear the memory cache 227 228 Ok(()) 229 } 230 231 /// Get cache statistics 232 pub async fn cache_stats(&self) -> CacheStats { 233 let cache = self.memory_cache.read().await; 234 CacheStats { 235 memory_entries: cache.len(), 236 memory_capacity: cache.cap().get(), 237 } 238 } 239} 240 241#[async_trait] 242impl<R, S> IdentityResolver for CachingIdentityResolver<R, S> 243where 244 R: IdentityResolver + 'static, 245 S: DidDocumentStorage + 'static, 246{ 247 async fn resolve(&self, subject: &str) -> Result<Document> { 248 // 1. Check memory cache 249 if let Some(document) = self.get_from_memory(subject).await { 250 return Ok(document); 251 } 252 253 // 2. Check database storage 254 if let Some(document) = self.get_from_storage(subject).await { 255 // Store in memory cache for faster future access 256 self.store_in_caches(subject, document.clone()).await; 257 return Ok(document); 258 } 259 260 // 3. Resolve using base resolver 261 let document = self.base_resolver.resolve(subject).await?; 262 263 // 4. Store in both caches 264 self.store_in_caches(subject, document.clone()).await; 265 266 Ok(document) 267 } 268} 269 270/// Statistics about the cache 271#[derive(Debug, Clone)] 272pub struct CacheStats { 273 pub memory_entries: usize, 274 pub memory_capacity: usize, 275} 276 277#[cfg(test)] 278mod tests { 279 use super::*; 280 use std::collections::HashMap; 281 282 /// Mock storage for testing 283 struct MockStorage { 284 documents: Arc<RwLock<HashMap<String, Document>>>, 285 } 286 287 impl MockStorage { 288 fn new() -> Self { 289 Self { 290 documents: Arc::new(RwLock::new(HashMap::new())), 291 } 292 } 293 } 294 295 #[async_trait] 296 impl DidDocumentStorage for MockStorage { 297 async fn get_document_by_did(&self, did: &str) -> Result<Option<Document>> { 298 let docs = self.documents.read().await; 299 Ok(docs.get(did).cloned()) 300 } 301 302 async fn store_document(&self, document: Document) -> Result<()> { 303 let mut docs = self.documents.write().await; 304 docs.insert(document.id.clone(), document); 305 Ok(()) 306 } 307 308 async fn delete_document_by_did(&self, did: &str) -> Result<()> { 309 let mut docs = self.documents.write().await; 310 docs.remove(did); 311 Ok(()) 312 } 313 } 314 315 /// Mock resolver for testing 316 struct MockResolver { 317 documents: Arc<RwLock<HashMap<String, Document>>>, 318 call_count: Arc<RwLock<usize>>, 319 } 320 321 impl MockResolver { 322 fn new() -> Self { 323 Self { 324 documents: Arc::new(RwLock::new(HashMap::new())), 325 call_count: Arc::new(RwLock::new(0)), 326 } 327 } 328 329 async fn add_document(&self, did: String, document: Document) { 330 let mut docs = self.documents.write().await; 331 docs.insert(did, document); 332 } 333 334 async fn get_call_count(&self) -> usize { 335 *self.call_count.read().await 336 } 337 } 338 339 #[async_trait] 340 impl IdentityResolver for MockResolver { 341 async fn resolve(&self, subject: &str) -> Result<Document> { 342 let mut count = self.call_count.write().await; 343 *count += 1; 344 345 let docs = self.documents.read().await; 346 docs.get(subject) 347 .cloned() 348 .ok_or_else(|| anyhow::anyhow!("Document not found")) 349 } 350 } 351 352 #[tokio::test] 353 async fn test_caching_resolver_memory_cache() { 354 let base_resolver = Arc::new(MockResolver::new()); 355 let storage = Arc::new(MockStorage::new()); 356 357 let test_did = "did:plc:test123"; 358 let test_doc = Document { 359 id: test_did.to_string(), 360 also_known_as: vec!["test.bsky.social".to_string()], 361 verification_method: vec![], 362 service: vec![], 363 context: vec![], 364 extra: Default::default(), 365 }; 366 367 base_resolver 368 .add_document(test_did.to_string(), test_doc.clone()) 369 .await; 370 371 let config = CacheConfig { 372 memory_cache_size: 10, 373 memory_ttl_seconds: 60, 374 }; 375 376 let caching_resolver = 377 CachingIdentityResolver::with_config(base_resolver.clone(), storage, config); 378 379 // First resolve should hit the base resolver 380 let result1 = caching_resolver.resolve(test_did).await.unwrap(); 381 assert_eq!(result1.id, test_did.to_string()); 382 assert_eq!(base_resolver.get_call_count().await, 1); 383 384 // Second resolve should hit the memory cache 385 let result2 = caching_resolver.resolve(test_did).await.unwrap(); 386 assert_eq!(result2.id, test_did.to_string()); 387 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call 388 389 // Verify cache stats 390 let stats = caching_resolver.cache_stats().await; 391 assert_eq!(stats.memory_entries, 1); 392 assert_eq!(stats.memory_capacity, 10); 393 } 394 395 #[tokio::test] 396 async fn test_handle_normalization() { 397 let base_resolver = Arc::new(MockResolver::new()); 398 let storage = Arc::new(MockStorage::new()); 399 400 let test_handle = "Test.BSKY.Social"; 401 let normalized_handle = "test.bsky.social"; 402 let test_doc = Document { 403 id: "did:plc:test123".to_string(), 404 also_known_as: vec![normalized_handle.to_string()], 405 verification_method: vec![], 406 service: vec![], 407 context: vec![], 408 extra: Default::default(), 409 }; 410 411 base_resolver 412 .add_document(test_handle.to_string(), test_doc.clone()) 413 .await; 414 base_resolver 415 .add_document(normalized_handle.to_string(), test_doc.clone()) 416 .await; 417 418 let caching_resolver = CachingIdentityResolver::new(base_resolver.clone(), storage); 419 420 // Resolve with uppercase handle 421 let result1 = caching_resolver.resolve(test_handle).await.unwrap(); 422 assert_eq!(base_resolver.get_call_count().await, 1); 423 424 // Resolve with lowercase handle - should hit cache 425 let result2 = caching_resolver.resolve(normalized_handle).await.unwrap(); 426 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call 427 428 assert_eq!(result1.id, result2.id); 429 } 430}