The smokesignal.events web application
1//! Caching identity resolver implementation.
2//!
3//! Provides a three-layer caching strategy for DID document resolution:
4//! 1. In-memory LRU cache (fastest)
5//! 2. Database storage (persistent)
6//! 3. Base resolver (network fetch)
7
8use anyhow::Result;
9use async_trait::async_trait;
10use atproto_identity::{model::Document, resolve::IdentityResolver, traits::DidDocumentStorage};
11use chrono::{Duration, Utc};
12
13// TODO: Use a different library because lru uses unsafe.
14use lru::LruCache;
15
16use std::num::NonZeroUsize;
17use std::sync::Arc;
18use tokio::sync::RwLock;
19use tracing::warn;
20
21/// Configuration for the caching identity resolver
22#[derive(Clone, Debug)]
23pub struct CacheConfig {
24 /// Maximum number of entries in the in-memory cache
25 pub memory_cache_size: usize,
26 /// TTL for in-memory cache entries (in seconds)
27 pub memory_ttl_seconds: i64,
28}
29
30impl Default for CacheConfig {
31 fn default() -> Self {
32 Self {
33 memory_cache_size: 1000,
34 memory_ttl_seconds: 300,
35 }
36 }
37}
38
39/// Entry in the memory cache with timestamp
40#[derive(Clone)]
41struct CachedDocument {
42 document: Document,
43 cached_at: chrono::DateTime<Utc>,
44}
45
46impl CachedDocument {
47 fn is_expired(&self, ttl_seconds: i64) -> bool {
48 let age = Utc::now() - self.cached_at;
49 age > Duration::seconds(ttl_seconds)
50 }
51}
52
53/// A caching identity resolver that uses multiple layers of caching.
54///
55/// Resolution order:
56/// 1. Check in-memory LRU cache
57/// 2. Check database storage
58/// 3. Resolve using base resolver
59/// 4. Update both caches with the result
60pub struct CachingIdentityResolver<R, S>
61where
62 R: IdentityResolver + 'static,
63 S: DidDocumentStorage + 'static,
64{
65 /// The base resolver for actual DID resolution
66 base_resolver: Arc<R>,
67 /// Database storage for persistent caching
68 storage: Arc<S>,
69 /// In-memory LRU cache
70 memory_cache: Arc<RwLock<LruCache<String, CachedDocument>>>,
71
72 /// Cache configuration
73 config: CacheConfig,
74}
75
76impl<R, S> CachingIdentityResolver<R, S>
77where
78 R: IdentityResolver + 'static,
79 S: DidDocumentStorage + 'static,
80{
81 /// Creates a new caching identity resolver with default configuration
82 pub fn new(base_resolver: Arc<R>, storage: Arc<S>) -> Self {
83 Self::with_config(base_resolver, storage, CacheConfig::default())
84 }
85
86 /// Creates a new caching identity resolver with custom configuration
87 pub fn with_config(base_resolver: Arc<R>, storage: Arc<S>, config: CacheConfig) -> Self {
88 let cache_size = NonZeroUsize::new(config.memory_cache_size.max(1))
89 .expect("Cache size must be at least 1");
90
91 Self {
92 base_resolver,
93 storage,
94 memory_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
95 config,
96 }
97 }
98
99 /// Normalize the subject to a consistent format for caching
100 fn normalize_subject(subject: &str) -> String {
101 // Convert handles to lowercase, keep DIDs as-is
102 if subject.starts_with("did:") {
103 subject.to_string()
104 } else {
105 subject.to_lowercase()
106 }
107 }
108
109 /// Try to get a document from the in-memory cache
110 async fn get_from_memory(&self, subject: &str) -> Option<Document> {
111 let normalized = Self::normalize_subject(subject);
112 let mut cache = self.memory_cache.write().await;
113
114 if let Some(cached) = cache.get(&normalized) {
115 if !cached.is_expired(self.config.memory_ttl_seconds) {
116 return Some(cached.document.clone());
117 } else {
118 // Remove expired entry
119 cache.pop(&normalized);
120 }
121 }
122
123 None
124 }
125
126 /// Try to get a document from database storage
127 async fn get_from_storage(&self, subject: &str) -> Option<Document> {
128 // First resolve the subject to a DID if it's a handle
129 // This is tricky because we need the DID to query storage
130 // For now, we'll only check storage if the subject is already a DID
131 if !subject.starts_with("did:") {
132 return None;
133 }
134
135 match self.storage.as_ref().get_document_by_did(subject).await {
136 Ok(Some(document)) => {
137 // Check if the database entry is still fresh enough
138 // Note: The storage implementation doesn't provide timestamps,
139 // so we'll trust it for now. In a real implementation, you'd
140 // want to add timestamp tracking to the storage layer.
141 Some(document)
142 }
143 Ok(None) => None,
144 Err(_) => {
145 warn!("Failed to query database cache");
146 None
147 }
148 }
149 }
150
151 /// Store a document in both memory and database caches
152 async fn store_in_caches(&self, subject: &str, document: Document) {
153 let normalized = Self::normalize_subject(subject);
154
155 // Store in memory cache
156 {
157 let mut cache = self.memory_cache.write().await;
158 cache.put(
159 normalized.clone(),
160 CachedDocument {
161 document: document.clone(),
162 cached_at: Utc::now(),
163 },
164 );
165 }
166
167 // Store in database
168 if let Err(e) = self.storage.as_ref().store_document(document.clone()).await {
169 warn!("Failed to store document in database cache: {}", e);
170 }
171
172 // Also store by handle if the subject was a handle
173 if !subject.starts_with("did:") {
174 // Store the handle -> document mapping in memory
175 let mut cache = self.memory_cache.write().await;
176 cache.put(
177 Self::normalize_subject(subject),
178 CachedDocument {
179 document: document.clone(),
180 cached_at: Utc::now(),
181 },
182 );
183 }
184 }
185
186 /// Refresh a stale entry in the background
187 #[allow(dead_code)]
188 async fn background_refresh(&self, subject: String) {
189 let resolver = self.base_resolver.clone();
190 let storage = self.storage.clone();
191 let memory_cache = self.memory_cache.clone();
192
193 tokio::spawn(async move {
194 match resolver.resolve(&subject).await {
195 Ok(document) => {
196 // Update memory cache
197 let normalized = Self::normalize_subject(&subject);
198 let mut cache = memory_cache.write().await;
199 cache.put(
200 normalized,
201 CachedDocument {
202 document: document.clone(),
203 cached_at: Utc::now(),
204 },
205 );
206
207 // Update database
208 if let Err(e) = storage.as_ref().store_document(document).await {
209 warn!("Failed to update database during background refresh: {}", e);
210 }
211 }
212 Err(_) => {
213 warn!("Background refresh failed");
214 }
215 }
216 });
217 }
218
219 /// Clear all caches
220 pub async fn clear_caches(&self) -> Result<()> {
221 // Clear memory cache
222 let mut cache = self.memory_cache.write().await;
223 cache.clear();
224
225 // Note: Database clearing would need to be implemented in the storage layer
226 // For now, we just clear the memory cache
227
228 Ok(())
229 }
230
231 /// Get cache statistics
232 pub async fn cache_stats(&self) -> CacheStats {
233 let cache = self.memory_cache.read().await;
234 CacheStats {
235 memory_entries: cache.len(),
236 memory_capacity: cache.cap().get(),
237 }
238 }
239}
240
241#[async_trait]
242impl<R, S> IdentityResolver for CachingIdentityResolver<R, S>
243where
244 R: IdentityResolver + 'static,
245 S: DidDocumentStorage + 'static,
246{
247 async fn resolve(&self, subject: &str) -> Result<Document> {
248 // 1. Check memory cache
249 if let Some(document) = self.get_from_memory(subject).await {
250 return Ok(document);
251 }
252
253 // 2. Check database storage
254 if let Some(document) = self.get_from_storage(subject).await {
255 // Store in memory cache for faster future access
256 self.store_in_caches(subject, document.clone()).await;
257 return Ok(document);
258 }
259
260 // 3. Resolve using base resolver
261 let document = self.base_resolver.resolve(subject).await?;
262
263 // 4. Store in both caches
264 self.store_in_caches(subject, document.clone()).await;
265
266 Ok(document)
267 }
268}
269
270/// Statistics about the cache
271#[derive(Debug, Clone)]
272pub struct CacheStats {
273 pub memory_entries: usize,
274 pub memory_capacity: usize,
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use std::collections::HashMap;
281
282 /// Mock storage for testing
283 struct MockStorage {
284 documents: Arc<RwLock<HashMap<String, Document>>>,
285 }
286
287 impl MockStorage {
288 fn new() -> Self {
289 Self {
290 documents: Arc::new(RwLock::new(HashMap::new())),
291 }
292 }
293 }
294
295 #[async_trait]
296 impl DidDocumentStorage for MockStorage {
297 async fn get_document_by_did(&self, did: &str) -> Result<Option<Document>> {
298 let docs = self.documents.read().await;
299 Ok(docs.get(did).cloned())
300 }
301
302 async fn store_document(&self, document: Document) -> Result<()> {
303 let mut docs = self.documents.write().await;
304 docs.insert(document.id.clone(), document);
305 Ok(())
306 }
307
308 async fn delete_document_by_did(&self, did: &str) -> Result<()> {
309 let mut docs = self.documents.write().await;
310 docs.remove(did);
311 Ok(())
312 }
313 }
314
315 /// Mock resolver for testing
316 struct MockResolver {
317 documents: Arc<RwLock<HashMap<String, Document>>>,
318 call_count: Arc<RwLock<usize>>,
319 }
320
321 impl MockResolver {
322 fn new() -> Self {
323 Self {
324 documents: Arc::new(RwLock::new(HashMap::new())),
325 call_count: Arc::new(RwLock::new(0)),
326 }
327 }
328
329 async fn add_document(&self, did: String, document: Document) {
330 let mut docs = self.documents.write().await;
331 docs.insert(did, document);
332 }
333
334 async fn get_call_count(&self) -> usize {
335 *self.call_count.read().await
336 }
337 }
338
339 #[async_trait]
340 impl IdentityResolver for MockResolver {
341 async fn resolve(&self, subject: &str) -> Result<Document> {
342 let mut count = self.call_count.write().await;
343 *count += 1;
344
345 let docs = self.documents.read().await;
346 docs.get(subject)
347 .cloned()
348 .ok_or_else(|| anyhow::anyhow!("Document not found"))
349 }
350 }
351
352 #[tokio::test]
353 async fn test_caching_resolver_memory_cache() {
354 let base_resolver = Arc::new(MockResolver::new());
355 let storage = Arc::new(MockStorage::new());
356
357 let test_did = "did:plc:test123";
358 let test_doc = Document {
359 id: test_did.to_string(),
360 also_known_as: vec!["test.bsky.social".to_string()],
361 verification_method: vec![],
362 service: vec![],
363 context: vec![],
364 extra: Default::default(),
365 };
366
367 base_resolver
368 .add_document(test_did.to_string(), test_doc.clone())
369 .await;
370
371 let config = CacheConfig {
372 memory_cache_size: 10,
373 memory_ttl_seconds: 60,
374 };
375
376 let caching_resolver =
377 CachingIdentityResolver::with_config(base_resolver.clone(), storage, config);
378
379 // First resolve should hit the base resolver
380 let result1 = caching_resolver.resolve(test_did).await.unwrap();
381 assert_eq!(result1.id, test_did.to_string());
382 assert_eq!(base_resolver.get_call_count().await, 1);
383
384 // Second resolve should hit the memory cache
385 let result2 = caching_resolver.resolve(test_did).await.unwrap();
386 assert_eq!(result2.id, test_did.to_string());
387 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call
388
389 // Verify cache stats
390 let stats = caching_resolver.cache_stats().await;
391 assert_eq!(stats.memory_entries, 1);
392 assert_eq!(stats.memory_capacity, 10);
393 }
394
395 #[tokio::test]
396 async fn test_handle_normalization() {
397 let base_resolver = Arc::new(MockResolver::new());
398 let storage = Arc::new(MockStorage::new());
399
400 let test_handle = "Test.BSKY.Social";
401 let normalized_handle = "test.bsky.social";
402 let test_doc = Document {
403 id: "did:plc:test123".to_string(),
404 also_known_as: vec![normalized_handle.to_string()],
405 verification_method: vec![],
406 service: vec![],
407 context: vec![],
408 extra: Default::default(),
409 };
410
411 base_resolver
412 .add_document(test_handle.to_string(), test_doc.clone())
413 .await;
414 base_resolver
415 .add_document(normalized_handle.to_string(), test_doc.clone())
416 .await;
417
418 let caching_resolver = CachingIdentityResolver::new(base_resolver.clone(), storage);
419
420 // Resolve with uppercase handle
421 let result1 = caching_resolver.resolve(test_handle).await.unwrap();
422 assert_eq!(base_resolver.get_call_count().await, 1);
423
424 // Resolve with lowercase handle - should hit cache
425 let result2 = caching_resolver.resolve(normalized_handle).await.unwrap();
426 assert_eq!(base_resolver.get_call_count().await, 1); // No additional call
427
428 assert_eq!(result1.id, result2.id);
429 }
430}