A decentralized event management and credentialing system built on atproto.
1//! Caching identity resolver implementation.
2//!
3//! Provides a three-layer caching strategy for DID document resolution:
4//! 1. In-memory LRU cache (fastest)
5//! 2. Database storage (persistent)
6//! 3. Base resolver (network fetch)
7
8use anyhow::Result;
9use async_trait::async_trait;
10use atproto_identity::{model::Document, resolve::IdentityResolver, storage::DidDocumentStorage};
11use chrono::{Duration, Utc};
12use lru::LruCache;
13use std::num::NonZeroUsize;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tracing::warn;
17
18/// Configuration for the caching identity resolver
19#[derive(Clone, Debug)]
20pub struct CacheConfig {
21 /// Maximum number of entries in the in-memory cache
22 pub memory_cache_size: usize,
23 /// TTL for in-memory cache entries (in seconds)
24 pub memory_ttl_seconds: i64,
25}
26
27impl Default for CacheConfig {
28 fn default() -> Self {
29 Self {
30 memory_cache_size: 1000,
31 memory_ttl_seconds: 300,
32 }
33 }
34}
35
36/// Entry in the memory cache with timestamp
37#[derive(Clone)]
38struct CachedDocument {
39 document: Document,
40 cached_at: chrono::DateTime<Utc>,
41}
42
43impl CachedDocument {
44 fn is_expired(&self, ttl_seconds: i64) -> bool {
45 let age = Utc::now() - self.cached_at;
46 age > Duration::seconds(ttl_seconds)
47 }
48}
49
50/// A caching identity resolver that uses multiple layers of caching.
51///
52/// Resolution order:
53/// 1. Check in-memory LRU cache
54/// 2. Check database storage
55/// 3. Resolve using base resolver
56/// 4. Update both caches with the result
57pub struct CachingIdentityResolver<R, S>
58where
59 R: IdentityResolver + 'static,
60 S: DidDocumentStorage + 'static,
61{
62 /// The base resolver for actual DID resolution
63 base_resolver: Arc<R>,
64 /// Database storage for persistent caching
65 storage: Arc<S>,
66 /// In-memory LRU cache
67 memory_cache: Arc<RwLock<LruCache<String, CachedDocument>>>,
68 /// Cache configuration
69 config: CacheConfig,
70}
71
72impl<R, S> CachingIdentityResolver<R, S>
73where
74 R: IdentityResolver + 'static,
75 S: DidDocumentStorage + 'static,
76{
77 /// Creates a new caching identity resolver with default configuration
78 pub fn new(base_resolver: Arc<R>, storage: Arc<S>) -> Self {
79 Self::with_config(base_resolver, storage, CacheConfig::default())
80 }
81
82 /// Creates a new caching identity resolver with custom configuration
83 pub fn with_config(base_resolver: Arc<R>, storage: Arc<S>, config: CacheConfig) -> Self {
84 let cache_size = NonZeroUsize::new(config.memory_cache_size.max(1))
85 .expect("Cache size must be at least 1");
86
87 Self {
88 base_resolver,
89 storage,
90 memory_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
91 config,
92 }
93 }
94
95 /// Normalize the subject to a consistent format for caching
96 fn normalize_subject(subject: &str) -> String {
97 // Convert handles to lowercase, keep DIDs as-is
98 if subject.starts_with("did:") {
99 subject.to_string()
100 } else {
101 subject.to_lowercase()
102 }
103 }
104
105 /// Try to get a document from the in-memory cache
106 async fn get_from_memory(&self, subject: &str) -> Option<Document> {
107 let normalized = Self::normalize_subject(subject);
108 let mut cache = self.memory_cache.write().await;
109
110 if let Some(cached) = cache.get(&normalized) {
111 if !cached.is_expired(self.config.memory_ttl_seconds) {
112 return Some(cached.document.clone());
113 } else {
114 // Remove expired entry
115 cache.pop(&normalized);
116 }
117 }
118
119 None
120 }
121
122 /// Try to get a document from database storage
123 async fn get_from_storage(&self, subject: &str) -> Option<Document> {
124 // First resolve the subject to a DID if it's a handle
125 // This is tricky because we need the DID to query storage
126 // For now, we'll only check storage if the subject is already a DID
127 if !subject.starts_with("did:") {
128 return None;
129 }
130
131 match self.storage.as_ref().get_document_by_did(subject).await {
132 Ok(Some(document)) => {
133
134 // Check if the database entry is still fresh enough
135 // Note: The storage implementation doesn't provide timestamps,
136 // so we'll trust it for now. In a real implementation, you'd
137 // want to add timestamp tracking to the storage layer.
138 Some(document)
139 }
140 Ok(None) => {
141 None
142 }
143 Err(_) => {
144 warn!("Failed to query database cache");
145 None
146 }
147 }
148 }
149
150 /// Store a document in both memory and database caches
151 async fn store_in_caches(&self, subject: &str, document: Document) {
152 let normalized = Self::normalize_subject(subject);
153
154 // Store in memory cache
155 {
156 let mut cache = self.memory_cache.write().await;
157 cache.put(
158 normalized.clone(),
159 CachedDocument {
160 document: document.clone(),
161 cached_at: Utc::now(),
162 },
163 );
164 }
165
166 // Store in database
167 if let Err(e) = self.storage.as_ref().store_document(document.clone()).await {
168 warn!("Failed to store document in database cache: {}", e);
169 } else {
170 }
171
172 // Also store by handle if the subject was a handle
173 if !subject.starts_with("did:") {
174 // Store the handle -> document mapping in memory
175 let mut cache = self.memory_cache.write().await;
176 cache.put(
177 Self::normalize_subject(subject),
178 CachedDocument {
179 document: document.clone(),
180 cached_at: Utc::now(),
181 },
182 );
183 }
184 }
185
186 /// Refresh a stale entry in the background
187 #[allow(dead_code)]
188 async fn background_refresh(&self, subject: String) {
189 let resolver = self.base_resolver.clone();
190 let storage = self.storage.clone();
191 let memory_cache = self.memory_cache.clone();
192
193 tokio::spawn(async move {
194
195 match resolver.resolve(&subject).await {
196 Ok(document) => {
197 // Update memory cache
198 let normalized = Self::normalize_subject(&subject);
199 let mut cache = memory_cache.write().await;
200 cache.put(
201 normalized,
202 CachedDocument {
203 document: document.clone(),
204 cached_at: Utc::now(),
205 },
206 );
207
208 // Update database
209 if let Err(e) = storage.as_ref().store_document(document).await {
210 warn!("Failed to update database during background refresh: {}", e);
211 }
212 }
213 Err(_) => {
214 warn!("Background refresh failed");
215 }
216 }
217 });
218 }
219
220 /// Clear all caches
221 pub async fn clear_caches(&self) -> Result<()> {
222 // Clear memory cache
223 let mut cache = self.memory_cache.write().await;
224 cache.clear();
225
226 // Note: Database clearing would need to be implemented in the storage layer
227 // For now, we just clear the memory cache
228
229 Ok(())
230 }
231
232 /// Get cache statistics
233 pub async fn cache_stats(&self) -> CacheStats {
234 let cache = self.memory_cache.read().await;
235 CacheStats {
236 memory_entries: cache.len(),
237 memory_capacity: cache.cap().get(),
238 }
239 }
240}
241
242#[async_trait]
243impl<R, S> IdentityResolver for CachingIdentityResolver<R, S>
244where
245 R: IdentityResolver + 'static,
246 S: DidDocumentStorage + 'static,
247{
248 async fn resolve(&self, subject: &str) -> Result<Document> {
249 // 1. Check memory cache
250 if let Some(document) = self.get_from_memory(subject).await {
251 return Ok(document);
252 }
253
254 // 2. Check database storage
255 if let Some(document) = self.get_from_storage(subject).await {
256 // Store in memory cache for faster future access
257 self.store_in_caches(subject, document.clone()).await;
258 return Ok(document);
259 }
260
261 // 3. Resolve using base resolver
262 let document = self.base_resolver.resolve(subject).await?;
263
264 // 4. Store in both caches
265 self.store_in_caches(subject, document.clone()).await;
266
267 Ok(document)
268 }
269}
270
271/// Statistics about the cache
272#[derive(Debug, Clone)]
273pub struct CacheStats {
274 pub memory_entries: usize,
275 pub memory_capacity: usize,
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use std::collections::HashMap;
282
283 /// Mock storage for testing
284 struct MockStorage {
285 documents: Arc<RwLock<HashMap<String, Document>>>,
286 }
287
288 impl MockStorage {
289 fn new() -> Self {
290 Self {
291 documents: Arc::new(RwLock::new(HashMap::new())),
292 }
293 }
294 }
295
296 #[async_trait]
297 impl DidDocumentStorage for MockStorage {
298 async fn get_document_by_did(&self, did: &str) -> Result<Option<Document>> {
299 let docs = self.documents.read().await;
300 Ok(docs.get(did).cloned())
301 }
302
303 async fn store_document(&self, document: Document) -> Result<()> {
304 let mut docs = self.documents.write().await;
305 docs.insert(document.id.clone(), document);
306 Ok(())
307 }
308
309 async fn delete_document_by_did(&self, did: &str) -> Result<()> {
310 let mut docs = self.documents.write().await;
311 docs.remove(did);
312 Ok(())
313 }
314 }
315
316 /// Mock resolver for testing
317 struct MockResolver {
318 documents: Arc<RwLock<HashMap<String, Document>>>,
319 call_count: Arc<RwLock<usize>>,
320 }
321
322 impl MockResolver {
323 fn new() -> Self {
324 Self {
325 documents: Arc::new(RwLock::new(HashMap::new())),
326 call_count: Arc::new(RwLock::new(0)),
327 }
328 }
329
330 async fn add_document(&self, did: String, document: Document) {
331 let mut docs = self.documents.write().await;
332 docs.insert(did, document);
333 }
334
335 async fn get_call_count(&self) -> usize {
336 *self.call_count.read().await
337 }
338 }
339
340 #[async_trait]
341 impl IdentityResolver for MockResolver {
342 async fn resolve(&self, subject: &str) -> Result<Document> {
343 let mut count = self.call_count.write().await;
344 *count += 1;
345
346 let docs = self.documents.read().await;
347 docs.get(subject)
348 .cloned()
349 .ok_or_else(|| anyhow::anyhow!("Document not found"))
350 }
351 }
352
353 #[tokio::test]
354 async fn test_caching_resolver_memory_cache() {
355 let base_resolver = Arc::new(MockResolver::new());
356 let storage = Arc::new(MockStorage::new());
357
358 let test_did = "did:plc:test123";
359 let test_doc = Document {
360 id: test_did.to_string(),
361 also_known_as: vec!["test.bsky.social".to_string()],
362 verification_method: vec![],
363 service: vec![],
364 context: vec![],
365 extra: Default::default(),
366 };
367
368 base_resolver
369 .add_document(test_did.to_string(), test_doc.clone())
370 .await;
371
372 let config = CacheConfig {
373 memory_cache_size: 10,
374 memory_ttl_seconds: 60,
375 };
376
377 let caching_resolver =
378 CachingIdentityResolver::with_config(base_resolver.clone(), storage, config);
379
380 // First resolve should hit the base resolver
381 let result1 = caching_resolver.resolve(test_did).await.unwrap();
382 assert_eq!(result1.id, test_did.to_string());
383 assert_eq!(base_resolver.get_call_count().await, 1);
384
385 // Second resolve should hit the memory cache
386 let result2 = caching_resolver.resolve(test_did).await.unwrap();
387 assert_eq!(result2.id, test_did.to_string());
388 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call
389
390 // Verify cache stats
391 let stats = caching_resolver.cache_stats().await;
392 assert_eq!(stats.memory_entries, 1);
393 assert_eq!(stats.memory_capacity, 10);
394 }
395
396 #[tokio::test]
397 async fn test_handle_normalization() {
398 let base_resolver = Arc::new(MockResolver::new());
399 let storage = Arc::new(MockStorage::new());
400
401 let test_handle = "Test.BSKY.Social";
402 let normalized_handle = "test.bsky.social";
403 let test_doc = Document {
404 id: "did:plc:test123".to_string(),
405 also_known_as: vec![normalized_handle.to_string()],
406 verification_method: vec![],
407 service: vec![],
408 context: vec![],
409 extra: Default::default(),
410 };
411
412 base_resolver
413 .add_document(test_handle.to_string(), test_doc.clone())
414 .await;
415 base_resolver
416 .add_document(normalized_handle.to_string(), test_doc.clone())
417 .await;
418
419 let caching_resolver = CachingIdentityResolver::new(base_resolver.clone(), storage);
420
421 // Resolve with uppercase handle
422 let result1 = caching_resolver.resolve(test_handle).await.unwrap();
423 assert_eq!(base_resolver.get_call_count().await, 1);
424
425 // Resolve with lowercase handle - should hit cache
426 let result2 = caching_resolver.resolve(normalized_handle).await.unwrap();
427 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call
428
429 assert_eq!(result1.id, result2.id);
430 }
431}