An ATProtocol powered blogging engine.
at main 376 lines 13 kB view raw
1use std::collections::HashMap; 2use std::sync::Arc; 3 4use crate::errors::Result; 5use async_trait::async_trait; 6use bloomfilter::Bloom; 7use tokio::sync::RwLock; 8 9use super::{ContentStorage, Identity, IdentityStorage, Post, PostReference, PostStorage, Storage}; 10 11/// A caching layer for post storage that keeps posts in memory. 12pub struct CachedPostStorage<T: Storage> { 13 underlying_storage: Arc<T>, 14 post_cache: Arc<RwLock<Option<HashMap<String, Post>>>>, 15} 16 17impl<T: Storage> CachedPostStorage<T> { 18 /// Create a new cached post storage with the given underlying storage. 19 pub fn new(underlying_storage: Arc<T>) -> Self { 20 Self { 21 underlying_storage, 22 post_cache: Arc::new(RwLock::new(None)), 23 } 24 } 25 26 async fn ensure_cache_populated(&self) -> Result<()> { 27 let cache = self.post_cache.read().await; 28 if cache.is_some() { 29 return Ok(()); 30 } 31 drop(cache); 32 33 let mut cache = self.post_cache.write().await; 34 if cache.is_some() { 35 return Ok(()); 36 } 37 38 let posts = self.underlying_storage.get_posts().await?; 39 let mut post_map = HashMap::new(); 40 41 for post in posts { 42 post_map.insert(post.aturi.clone(), post); 43 } 44 45 *cache = Some(post_map); 46 Ok(()) 47 } 48 49 async fn refresh_cache(&self) -> Result<()> { 50 let mut cache = self.post_cache.write().await; 51 let posts = self.underlying_storage.get_posts().await?; 52 let mut post_map = HashMap::new(); 53 54 for post in posts { 55 post_map.insert(post.aturi.clone(), post); 56 } 57 58 *cache = Some(post_map); 59 Ok(()) 60 } 61} 62 63#[async_trait] 64impl<T: Storage> PostStorage for CachedPostStorage<T> { 65 async fn upsert_post(&self, post: &Post) -> Result<()> { 66 self.underlying_storage.upsert_post(post).await?; 67 self.refresh_cache().await?; 68 Ok(()) 69 } 70 71 async fn get_post(&self, aturi: &str) -> Result<Option<Post>> { 72 self.ensure_cache_populated().await?; 73 74 let cache = self.post_cache.read().await; 75 if let Some(ref post_map) = *cache { 76 if let Some(post) = post_map.get(aturi) { 77 return Ok(Some(post.clone())); 78 } 79 } 80 81 Ok(None) 82 } 83 84 async fn get_posts(&self) -> Result<Vec<Post>> { 85 self.ensure_cache_populated().await?; 86 87 let cache = self.post_cache.read().await; 88 if let Some(ref post_map) = *cache { 89 let mut posts: Vec<Post> = post_map.values().cloned().collect(); 90 posts.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 91 return Ok(posts); 92 } 93 94 Ok(Vec::new()) 95 } 96 97 async fn delete_post(&self, aturi: &str) -> Result<Option<Post>> { 98 let result = self.underlying_storage.delete_post(aturi).await?; 99 self.refresh_cache().await?; 100 Ok(result) 101 } 102 103 async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result<bool> { 104 self.underlying_storage 105 .upsert_post_reference(post_reference) 106 .await 107 } 108 109 async fn delete_post_reference(&self, aturi: &str) -> Result<()> { 110 self.underlying_storage.delete_post_reference(aturi).await 111 } 112 113 async fn get_post_reference_count(&self, post_aturi: &str) -> Result<HashMap<String, i64>> { 114 self.underlying_storage 115 .get_post_reference_count(post_aturi) 116 .await 117 } 118 119 async fn get_post_references_for_post(&self, post_aturi: &str) -> Result<Vec<PostReference>> { 120 self.underlying_storage 121 .get_post_references_for_post(post_aturi) 122 .await 123 } 124 125 async fn get_post_references_for_post_for_collection( 126 &self, 127 post_aturi: &str, 128 collection: &str, 129 ) -> Result<Vec<PostReference>> { 130 self.underlying_storage 131 .get_post_references_for_post_for_collection(post_aturi, collection) 132 .await 133 } 134} 135 136#[async_trait] 137impl<T: Storage> IdentityStorage for CachedPostStorage<T> { 138 async fn upsert_identity(&self, identity: &Identity) -> Result<()> { 139 self.underlying_storage.upsert_identity(identity).await 140 } 141 142 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> { 143 self.underlying_storage.get_identity_by_did(did).await 144 } 145 146 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> { 147 self.underlying_storage.get_identity_by_handle(handle).await 148 } 149 150 async fn delete_identity(&self, aturi: &str) -> Result<Option<Identity>> { 151 self.underlying_storage.delete_identity(aturi).await 152 } 153} 154 155#[async_trait] 156impl<T: Storage> Storage for CachedPostStorage<T> { 157 async fn migrate(&self) -> Result<()> { 158 self.underlying_storage.migrate().await 159 } 160} 161 162/// A read-through cache implementation for ContentStorage that caches content by CID 163/// and uses a bloom filter to track CIDs that were not found in the underlying storage. 164pub struct CachedContentStorage<T: ContentStorage> { 165 underlying_storage: Arc<T>, 166 /// Cache of content by CID 167 content_cache: Arc<RwLock<HashMap<String, Vec<u8>>>>, 168 /// Bloom filter to track CIDs that were not found in underlying storage 169 not_found_filter: Arc<RwLock<Bloom<String>>>, 170 /// Maximum number of items to cache 171 cache_size_limit: usize, 172} 173 174impl<T: ContentStorage> CachedContentStorage<T> { 175 /// Create a new cached content storage with the given underlying storage. 176 /// 177 /// # Arguments 178 /// * `underlying_storage` - The underlying ContentStorage implementation to wrap 179 /// * `cache_size_limit` - Maximum number of items to cache (default: 1000) 180 /// * `bloom_filter_capacity` - Expected number of items in the bloom filter (default: 10000) 181 /// * `bloom_filter_error_rate` - False positive probability for bloom filter (default: 0.01) 182 pub fn new(underlying_storage: Arc<T>) -> Self { 183 Self::with_config(underlying_storage, 1000, 10000, 0.01) 184 } 185 186 /// Create a new cached content storage with custom configuration. 187 pub fn with_config( 188 underlying_storage: Arc<T>, 189 cache_size_limit: usize, 190 bloom_filter_capacity: usize, 191 bloom_filter_error_rate: f64, 192 ) -> Self { 193 let bloom_filter = Bloom::new_for_fp_rate(bloom_filter_capacity, bloom_filter_error_rate); 194 195 Self { 196 underlying_storage, 197 content_cache: Arc::new(RwLock::new(HashMap::new())), 198 not_found_filter: Arc::new(RwLock::new(bloom_filter)), 199 cache_size_limit, 200 } 201 } 202 203 /// Check if the cache is at capacity and evict items if necessary. 204 /// Uses a simple LRU-style eviction by clearing the cache when it's full. 205 async fn maybe_evict_cache(&self) { 206 let cache = self.content_cache.read().await; 207 if cache.len() >= self.cache_size_limit { 208 drop(cache); 209 let mut cache = self.content_cache.write().await; 210 if cache.len() >= self.cache_size_limit { 211 // Simple eviction strategy: clear the entire cache 212 // In a production system, you might want to implement LRU eviction 213 cache.clear(); 214 } 215 } 216 } 217 218 /// Add a CID to the not-found bloom filter 219 async fn add_to_not_found_filter(&self, cid: &str) { 220 let mut filter = self.not_found_filter.write().await; 221 filter.set(&cid.to_string()); 222 } 223 224 /// Check if a CID is in the not-found bloom filter 225 async fn is_in_not_found_filter(&self, cid: &str) -> bool { 226 let filter = self.not_found_filter.read().await; 227 filter.check(&cid.to_string()) 228 } 229 230 /// Add content to the cache 231 async fn add_to_cache(&self, cid: &str, content: Vec<u8>) { 232 self.maybe_evict_cache().await; 233 let mut cache = self.content_cache.write().await; 234 cache.insert(cid.to_string(), content); 235 } 236 237 /// Get content from the cache 238 async fn get_from_cache(&self, cid: &str) -> Option<Vec<u8>> { 239 let cache = self.content_cache.read().await; 240 cache.get(cid).cloned() 241 } 242 243 /// Remove content from the cache (used when content is written) 244 async fn remove_from_cache(&self, cid: &str) { 245 let mut cache = self.content_cache.write().await; 246 cache.remove(cid); 247 } 248 249 /// Clear the not-found filter entry for a CID (used when content is written) 250 async fn clear_not_found_filter(&self, _cid: &str) { 251 // Note: Bloom filters don't support removal, so we recreate the filter 252 // This is a limitation of bloom filters - in production you might want 253 // to use a counting bloom filter or periodically reset the filter 254 let mut filter = self.not_found_filter.write().await; 255 *filter = Bloom::new_for_fp_rate(10000, 0.01); 256 } 257} 258 259#[async_trait] 260impl<T: ContentStorage> ContentStorage for CachedContentStorage<T> { 261 async fn content_exists(&self, cid: &str) -> Result<bool> { 262 // First check the cache 263 if self.get_from_cache(cid).await.is_some() { 264 return Ok(true); 265 } 266 267 // Check the not-found filter to avoid expensive lookups 268 if self.is_in_not_found_filter(cid).await { 269 return Ok(false); 270 } 271 272 // Check the underlying storage 273 let exists = self.underlying_storage.content_exists(cid).await?; 274 275 // If not found, add to the not-found filter 276 if !exists { 277 self.add_to_not_found_filter(cid).await; 278 } 279 280 Ok(exists) 281 } 282 283 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { 284 // Write to underlying storage first 285 self.underlying_storage.write_content(cid, data).await?; 286 287 // Clear any not-found filter entry 288 self.clear_not_found_filter(cid).await; 289 290 // Remove from cache (it will be cached on next read) 291 self.remove_from_cache(cid).await; 292 293 Ok(()) 294 } 295 296 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> { 297 // First check the cache 298 if let Some(cached_content) = self.get_from_cache(cid).await { 299 return Ok(cached_content); 300 } 301 302 // Check the not-found filter to avoid expensive lookups 303 if self.is_in_not_found_filter(cid).await { 304 return Err(crate::errors::BlahgError::StorageFileOperationFailed { 305 operation: format!("Content not found: {}", cid), 306 }); 307 } 308 309 // Read from underlying storage 310 match self.underlying_storage.read_content(cid).await { 311 Ok(content) => { 312 // Cache the content 313 self.add_to_cache(cid, content.clone()).await; 314 Ok(content) 315 } 316 Err(e) => { 317 // If it's a not-found error, add to the not-found filter 318 if matches!( 319 e, 320 crate::errors::BlahgError::StorageFileOperationFailed { .. } 321 ) { 322 self.add_to_not_found_filter(cid).await; 323 } 324 Err(e) 325 } 326 } 327 } 328} 329 330#[cfg(test)] 331mod tests { 332 use super::*; 333 use crate::storage::content::FilesystemContentStorage; 334 use std::sync::Arc; 335 use tempfile::TempDir; 336 337 #[tokio::test] 338 async fn test_cached_content_storage() -> Result<()> { 339 let temp_dir = TempDir::new()?; 340 let filesystem_storage = Arc::new(FilesystemContentStorage::new(temp_dir.path()).await?); 341 342 let cached_storage = CachedContentStorage::new(filesystem_storage.clone()); 343 344 let cid = "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"; 345 let data = b"Hello, cached world!"; 346 347 // Test content doesn't exist initially 348 assert!(!cached_storage.content_exists(cid).await?); 349 350 // Write content 351 cached_storage.write_content(cid, data).await?; 352 353 // Test content exists after writing 354 assert!(cached_storage.content_exists(cid).await?); 355 356 // Read content and verify - this should cache it 357 let read_data = cached_storage.read_content(cid).await?; 358 assert_eq!(read_data, data); 359 360 // Read again - this should come from cache 361 let read_data_cached = cached_storage.read_content(cid).await?; 362 assert_eq!(read_data_cached, data); 363 364 // Test not-found filtering 365 let nonexistent_cid = 366 "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi_nonexistent"; 367 368 // First check should query underlying storage 369 assert!(!cached_storage.content_exists(nonexistent_cid).await?); 370 371 // Second check should use bloom filter and return false without querying underlying storage 372 assert!(!cached_storage.content_exists(nonexistent_cid).await?); 373 374 Ok(()) 375 } 376}