An ATProtocol powered blogging engine.
1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::errors::Result;
5use async_trait::async_trait;
6use bloomfilter::Bloom;
7use tokio::sync::RwLock;
8
9use super::{ContentStorage, Identity, IdentityStorage, Post, PostReference, PostStorage, Storage};
10
11/// A caching layer for post storage that keeps posts in memory.
12pub struct CachedPostStorage<T: Storage> {
13 underlying_storage: Arc<T>,
14 post_cache: Arc<RwLock<Option<HashMap<String, Post>>>>,
15}
16
17impl<T: Storage> CachedPostStorage<T> {
18 /// Create a new cached post storage with the given underlying storage.
19 pub fn new(underlying_storage: Arc<T>) -> Self {
20 Self {
21 underlying_storage,
22 post_cache: Arc::new(RwLock::new(None)),
23 }
24 }
25
26 async fn ensure_cache_populated(&self) -> Result<()> {
27 let cache = self.post_cache.read().await;
28 if cache.is_some() {
29 return Ok(());
30 }
31 drop(cache);
32
33 let mut cache = self.post_cache.write().await;
34 if cache.is_some() {
35 return Ok(());
36 }
37
38 let posts = self.underlying_storage.get_posts().await?;
39 let mut post_map = HashMap::new();
40
41 for post in posts {
42 post_map.insert(post.aturi.clone(), post);
43 }
44
45 *cache = Some(post_map);
46 Ok(())
47 }
48
49 async fn refresh_cache(&self) -> Result<()> {
50 let mut cache = self.post_cache.write().await;
51 let posts = self.underlying_storage.get_posts().await?;
52 let mut post_map = HashMap::new();
53
54 for post in posts {
55 post_map.insert(post.aturi.clone(), post);
56 }
57
58 *cache = Some(post_map);
59 Ok(())
60 }
61}
62
63#[async_trait]
64impl<T: Storage> PostStorage for CachedPostStorage<T> {
65 async fn upsert_post(&self, post: &Post) -> Result<()> {
66 self.underlying_storage.upsert_post(post).await?;
67 self.refresh_cache().await?;
68 Ok(())
69 }
70
71 async fn get_post(&self, aturi: &str) -> Result<Option<Post>> {
72 self.ensure_cache_populated().await?;
73
74 let cache = self.post_cache.read().await;
75 if let Some(ref post_map) = *cache {
76 if let Some(post) = post_map.get(aturi) {
77 return Ok(Some(post.clone()));
78 }
79 }
80
81 Ok(None)
82 }
83
84 async fn get_posts(&self) -> Result<Vec<Post>> {
85 self.ensure_cache_populated().await?;
86
87 let cache = self.post_cache.read().await;
88 if let Some(ref post_map) = *cache {
89 let mut posts: Vec<Post> = post_map.values().cloned().collect();
90 posts.sort_by(|a, b| b.created_at.cmp(&a.created_at));
91 return Ok(posts);
92 }
93
94 Ok(Vec::new())
95 }
96
97 async fn delete_post(&self, aturi: &str) -> Result<Option<Post>> {
98 let result = self.underlying_storage.delete_post(aturi).await?;
99 self.refresh_cache().await?;
100 Ok(result)
101 }
102
103 async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result<bool> {
104 self.underlying_storage
105 .upsert_post_reference(post_reference)
106 .await
107 }
108
109 async fn delete_post_reference(&self, aturi: &str) -> Result<()> {
110 self.underlying_storage.delete_post_reference(aturi).await
111 }
112
113 async fn get_post_reference_count(&self, post_aturi: &str) -> Result<HashMap<String, i64>> {
114 self.underlying_storage
115 .get_post_reference_count(post_aturi)
116 .await
117 }
118
119 async fn get_post_references_for_post(&self, post_aturi: &str) -> Result<Vec<PostReference>> {
120 self.underlying_storage
121 .get_post_references_for_post(post_aturi)
122 .await
123 }
124
125 async fn get_post_references_for_post_for_collection(
126 &self,
127 post_aturi: &str,
128 collection: &str,
129 ) -> Result<Vec<PostReference>> {
130 self.underlying_storage
131 .get_post_references_for_post_for_collection(post_aturi, collection)
132 .await
133 }
134}
135
136#[async_trait]
137impl<T: Storage> IdentityStorage for CachedPostStorage<T> {
138 async fn upsert_identity(&self, identity: &Identity) -> Result<()> {
139 self.underlying_storage.upsert_identity(identity).await
140 }
141
142 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> {
143 self.underlying_storage.get_identity_by_did(did).await
144 }
145
146 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> {
147 self.underlying_storage.get_identity_by_handle(handle).await
148 }
149
150 async fn delete_identity(&self, aturi: &str) -> Result<Option<Identity>> {
151 self.underlying_storage.delete_identity(aturi).await
152 }
153}
154
155#[async_trait]
156impl<T: Storage> Storage for CachedPostStorage<T> {
157 async fn migrate(&self) -> Result<()> {
158 self.underlying_storage.migrate().await
159 }
160}
161
162/// A read-through cache implementation for ContentStorage that caches content by CID
163/// and uses a bloom filter to track CIDs that were not found in the underlying storage.
164pub struct CachedContentStorage<T: ContentStorage> {
165 underlying_storage: Arc<T>,
166 /// Cache of content by CID
167 content_cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
168 /// Bloom filter to track CIDs that were not found in underlying storage
169 not_found_filter: Arc<RwLock<Bloom<String>>>,
170 /// Maximum number of items to cache
171 cache_size_limit: usize,
172}
173
174impl<T: ContentStorage> CachedContentStorage<T> {
175 /// Create a new cached content storage with the given underlying storage.
176 ///
177 /// # Arguments
178 /// * `underlying_storage` - The underlying ContentStorage implementation to wrap
179 /// * `cache_size_limit` - Maximum number of items to cache (default: 1000)
180 /// * `bloom_filter_capacity` - Expected number of items in the bloom filter (default: 10000)
181 /// * `bloom_filter_error_rate` - False positive probability for bloom filter (default: 0.01)
182 pub fn new(underlying_storage: Arc<T>) -> Self {
183 Self::with_config(underlying_storage, 1000, 10000, 0.01)
184 }
185
186 /// Create a new cached content storage with custom configuration.
187 pub fn with_config(
188 underlying_storage: Arc<T>,
189 cache_size_limit: usize,
190 bloom_filter_capacity: usize,
191 bloom_filter_error_rate: f64,
192 ) -> Self {
193 let bloom_filter = Bloom::new_for_fp_rate(bloom_filter_capacity, bloom_filter_error_rate);
194
195 Self {
196 underlying_storage,
197 content_cache: Arc::new(RwLock::new(HashMap::new())),
198 not_found_filter: Arc::new(RwLock::new(bloom_filter)),
199 cache_size_limit,
200 }
201 }
202
203 /// Check if the cache is at capacity and evict items if necessary.
204 /// Uses a simple LRU-style eviction by clearing the cache when it's full.
205 async fn maybe_evict_cache(&self) {
206 let cache = self.content_cache.read().await;
207 if cache.len() >= self.cache_size_limit {
208 drop(cache);
209 let mut cache = self.content_cache.write().await;
210 if cache.len() >= self.cache_size_limit {
211 // Simple eviction strategy: clear the entire cache
212 // In a production system, you might want to implement LRU eviction
213 cache.clear();
214 }
215 }
216 }
217
218 /// Add a CID to the not-found bloom filter
219 async fn add_to_not_found_filter(&self, cid: &str) {
220 let mut filter = self.not_found_filter.write().await;
221 filter.set(&cid.to_string());
222 }
223
224 /// Check if a CID is in the not-found bloom filter
225 async fn is_in_not_found_filter(&self, cid: &str) -> bool {
226 let filter = self.not_found_filter.read().await;
227 filter.check(&cid.to_string())
228 }
229
230 /// Add content to the cache
231 async fn add_to_cache(&self, cid: &str, content: Vec<u8>) {
232 self.maybe_evict_cache().await;
233 let mut cache = self.content_cache.write().await;
234 cache.insert(cid.to_string(), content);
235 }
236
237 /// Get content from the cache
238 async fn get_from_cache(&self, cid: &str) -> Option<Vec<u8>> {
239 let cache = self.content_cache.read().await;
240 cache.get(cid).cloned()
241 }
242
243 /// Remove content from the cache (used when content is written)
244 async fn remove_from_cache(&self, cid: &str) {
245 let mut cache = self.content_cache.write().await;
246 cache.remove(cid);
247 }
248
249 /// Clear the not-found filter entry for a CID (used when content is written)
250 async fn clear_not_found_filter(&self, _cid: &str) {
251 // Note: Bloom filters don't support removal, so we recreate the filter
252 // This is a limitation of bloom filters - in production you might want
253 // to use a counting bloom filter or periodically reset the filter
254 let mut filter = self.not_found_filter.write().await;
255 *filter = Bloom::new_for_fp_rate(10000, 0.01);
256 }
257}
258
259#[async_trait]
260impl<T: ContentStorage> ContentStorage for CachedContentStorage<T> {
261 async fn content_exists(&self, cid: &str) -> Result<bool> {
262 // First check the cache
263 if self.get_from_cache(cid).await.is_some() {
264 return Ok(true);
265 }
266
267 // Check the not-found filter to avoid expensive lookups
268 if self.is_in_not_found_filter(cid).await {
269 return Ok(false);
270 }
271
272 // Check the underlying storage
273 let exists = self.underlying_storage.content_exists(cid).await?;
274
275 // If not found, add to the not-found filter
276 if !exists {
277 self.add_to_not_found_filter(cid).await;
278 }
279
280 Ok(exists)
281 }
282
283 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> {
284 // Write to underlying storage first
285 self.underlying_storage.write_content(cid, data).await?;
286
287 // Clear any not-found filter entry
288 self.clear_not_found_filter(cid).await;
289
290 // Remove from cache (it will be cached on next read)
291 self.remove_from_cache(cid).await;
292
293 Ok(())
294 }
295
296 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> {
297 // First check the cache
298 if let Some(cached_content) = self.get_from_cache(cid).await {
299 return Ok(cached_content);
300 }
301
302 // Check the not-found filter to avoid expensive lookups
303 if self.is_in_not_found_filter(cid).await {
304 return Err(crate::errors::BlahgError::StorageFileOperationFailed {
305 operation: format!("Content not found: {}", cid),
306 });
307 }
308
309 // Read from underlying storage
310 match self.underlying_storage.read_content(cid).await {
311 Ok(content) => {
312 // Cache the content
313 self.add_to_cache(cid, content.clone()).await;
314 Ok(content)
315 }
316 Err(e) => {
317 // If it's a not-found error, add to the not-found filter
318 if matches!(
319 e,
320 crate::errors::BlahgError::StorageFileOperationFailed { .. }
321 ) {
322 self.add_to_not_found_filter(cid).await;
323 }
324 Err(e)
325 }
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::storage::content::FilesystemContentStorage;
334 use std::sync::Arc;
335 use tempfile::TempDir;
336
337 #[tokio::test]
338 async fn test_cached_content_storage() -> Result<()> {
339 let temp_dir = TempDir::new()?;
340 let filesystem_storage = Arc::new(FilesystemContentStorage::new(temp_dir.path()).await?);
341
342 let cached_storage = CachedContentStorage::new(filesystem_storage.clone());
343
344 let cid = "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi";
345 let data = b"Hello, cached world!";
346
347 // Test content doesn't exist initially
348 assert!(!cached_storage.content_exists(cid).await?);
349
350 // Write content
351 cached_storage.write_content(cid, data).await?;
352
353 // Test content exists after writing
354 assert!(cached_storage.content_exists(cid).await?);
355
356 // Read content and verify - this should cache it
357 let read_data = cached_storage.read_content(cid).await?;
358 assert_eq!(read_data, data);
359
360 // Read again - this should come from cache
361 let read_data_cached = cached_storage.read_content(cid).await?;
362 assert_eq!(read_data_cached, data);
363
364 // Test not-found filtering
365 let nonexistent_cid =
366 "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi_nonexistent";
367
368 // First check should query underlying storage
369 assert!(!cached_storage.content_exists(nonexistent_cid).await?);
370
371 // Second check should use bloom filter and return false without querying underlying storage
372 assert!(!cached_storage.content_exists(nonexistent_cid).await?);
373
374 Ok(())
375 }
376}