The smokesignal.events web application
at main 458 lines 16 kB view raw
1use anyhow::Result; 2use async_trait::async_trait; 3use bloomfilter::Bloom; 4use std::collections::HashMap; 5use std::path::{Path, PathBuf}; 6use std::sync::Arc; 7use tokio::fs; 8use tokio::sync::RwLock; 9 10#[cfg(feature = "s3")] 11use minio::s3::{Client as MinioClient, creds::StaticProvider, http::BaseUrl, types::S3Api}; 12 13use crate::storage::errors::ContentError; 14 15/// Trait for storing and retrieving content by CID. 16#[async_trait] 17pub trait ContentStorage: Send + Sync { 18 /// Check if content exists for a given CID. 19 async fn content_exists(&self, cid: &str) -> Result<bool>; 20 21 /// Write content data for a given CID. 22 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()>; 23 24 /// Read content data for a given CID. 25 async fn read_content(&self, cid: &str) -> Result<Vec<u8>>; 26} 27 28pub use self::FilesystemContentStorage as FilesystemStorage; 29 30/// Parse an S3 URL in the format: s3://[key]:[secret]@hostname/bucket[/optional_prefix] 31/// Returns (endpoint, access_key, secret_key, bucket, prefix) 32#[cfg(feature = "s3")] 33pub fn parse_s3_url( 34 url: &str, 35) -> Result<(String, String, String, String, Option<String>), ContentError> { 36 if !url.starts_with("s3://") { 37 return Err(ContentError::ConfigS3UrlInvalid { 38 details: format!("Invalid S3 URL format: {}", url), 39 }); 40 } 41 42 let url_without_scheme = &url[5..]; // Remove "s3://" 43 44 // Split by '@' to separate credentials from hostname/path 45 let parts: Vec<&str> = url_without_scheme.splitn(2, '@').collect(); 46 if parts.len() != 2 { 47 return Err(ContentError::ConfigS3UrlInvalid { 48 details: format!("Invalid S3 URL format - missing @ separator: {}", url), 49 }); 50 } 51 52 let credentials = parts[0]; 53 let hostname_and_path = parts[1]; 54 55 // Parse credentials: key:secret 56 let cred_parts: Vec<&str> = credentials.splitn(2, ':').collect(); 57 if cred_parts.len() != 2 { 58 return Err(ContentError::ConfigS3UrlInvalid { 59 details: format!( 60 "Invalid S3 URL format - credentials must be key:secret: {}", 61 url 62 ), 63 }); 64 } 65 66 let access_key = cred_parts[0].to_string(); 67 let secret_key = cred_parts[1].to_string(); 68 69 // Parse hostname and path: hostname/bucket[/prefix] 70 let path_parts: Vec<&str> = hostname_and_path.splitn(2, '/').collect(); 71 if path_parts.len() != 2 { 72 return Err(ContentError::ConfigS3UrlInvalid { 73 details: format!("Invalid S3 URL format - must include bucket: {}", url), 74 }); 75 } 76 77 let hostname = path_parts[0].to_string(); 78 let bucket_and_prefix = path_parts[1]; 79 80 // Split bucket from optional prefix 81 let bucket_parts: Vec<&str> = bucket_and_prefix.splitn(2, '/').collect(); 82 let bucket = bucket_parts[0].to_string(); 83 let prefix = if bucket_parts.len() > 1 && !bucket_parts[1].is_empty() { 84 Some(bucket_parts[1].to_string()) 85 } else { 86 None 87 }; 88 89 let endpoint = if hostname.starts_with("http://") || hostname.starts_with("https://") { 90 hostname 91 } else { 92 format!("https://{}", hostname) 93 }; 94 95 Ok((endpoint, access_key, secret_key, bucket, prefix)) 96} 97 98/// Local filesystem implementation of content storage. 99#[derive(Debug, Clone)] 100pub struct FilesystemContentStorage { 101 base_dir: PathBuf, 102} 103 104impl FilesystemContentStorage { 105 /// Create a new filesystem content storage with the given base directory. 106 pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> { 107 let base_dir = base_dir.as_ref().to_path_buf(); 108 109 // Ensure the base directory exists 110 fs::create_dir_all(&base_dir).await?; 111 112 Ok(Self { base_dir }) 113 } 114 115 /// Get the file path for a given CID. 116 /// Uses a subdirectory structure based on the first few characters of the CID 117 /// to avoid having too many files in a single directory. 118 fn get_content_path(&self, cid: &str) -> PathBuf { 119 // Use first 2 characters for first level directory 120 // and next 2 characters for second level directory 121 let (dir1, dir2, filename) = if cid.len() >= 4 { 122 (&cid[0..2], &cid[2..4], cid) 123 } else if cid.len() >= 2 { 124 (&cid[0..2], "00", cid) 125 } else { 126 ("00", "00", cid) 127 }; 128 129 self.base_dir.join(dir1).join(dir2).join(filename) 130 } 131} 132 133#[async_trait] 134impl ContentStorage for FilesystemContentStorage { 135 async fn content_exists(&self, cid: &str) -> Result<bool> { 136 let path = self.get_content_path(cid); 137 Ok(fs::try_exists(&path).await?) 138 } 139 140 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { 141 let path = self.get_content_path(cid); 142 143 // Ensure parent directory exists 144 if let Some(parent) = path.parent() { 145 fs::create_dir_all(parent).await?; 146 } 147 148 // Write content atomically by writing to a temp file first 149 let temp_path = path.with_extension("tmp"); 150 fs::write(&temp_path, data).await?; 151 152 // Rename temp file to final path (atomic on most filesystems) 153 fs::rename(&temp_path, &path).await?; 154 155 Ok(()) 156 } 157 158 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> { 159 let path = self.get_content_path(cid); 160 Ok(fs::read(&path).await?) 161 } 162} 163 164#[cfg(feature = "s3")] 165/// S3-compatible object storage implementation for content storage. 166pub struct S3FileStorage { 167 client: MinioClient, 168 bucket: String, 169 prefix: Option<String>, 170} 171 172#[cfg(feature = "s3")] 173impl S3FileStorage { 174 /// Create a new S3FileStorage with the given credentials and bucket information 175 pub fn new( 176 endpoint: String, 177 access_key: String, 178 secret_key: String, 179 bucket: String, 180 prefix: Option<String>, 181 ) -> Result<Self> { 182 let base_url: BaseUrl = endpoint.parse().unwrap(); 183 tracing::debug!(?base_url, "s3 file storage base url"); 184 185 let static_provider = StaticProvider::new(&access_key, &secret_key, None); 186 tracing::debug!(?static_provider, "s3 file storage static provider"); 187 188 let client = MinioClient::new(base_url, Some(Box::new(static_provider)), None, None) 189 .map_err(|e| ContentError::StorageFileOperationFailed { 190 operation: format!("Failed to create S3 client: {}", e), 191 })?; 192 193 Ok(Self { 194 client, 195 bucket, 196 prefix, 197 }) 198 } 199 200 /// Get the full object key by combining prefix with path 201 fn get_object_key(&self, path: &str) -> String { 202 match &self.prefix { 203 Some(prefix) => { 204 if path.starts_with('/') { 205 format!("/{prefix}{path}") 206 } else { 207 format!("/{prefix}/{path}") 208 } 209 } 210 None => { 211 if path.starts_with('/') { 212 path.to_string() 213 } else { 214 format!("/{path}") 215 } 216 } 217 } 218 } 219} 220 221#[cfg(feature = "s3")] 222#[async_trait] 223impl ContentStorage for S3FileStorage { 224 async fn content_exists(&self, cid: &str) -> Result<bool> { 225 use minio::s3::error::ErrorCode; 226 227 let object_key = self.get_object_key(cid); 228 229 match self 230 .client 231 .stat_object(&self.bucket, &object_key) 232 .send() 233 .await 234 { 235 Ok(_) => Ok(true), 236 Err(minio::s3::error::Error::S3Error(ref s3_err)) 237 if s3_err.code == ErrorCode::NoSuchKey => 238 { 239 Ok(false) 240 } 241 Err(e) => Err(ContentError::StorageFileOperationFailed { 242 operation: format!("Failed to check if S3 object exists: {}", e), 243 } 244 .into()), 245 } 246 } 247 248 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { 249 use minio::s3::segmented_bytes::SegmentedBytes; 250 251 let object_key = self.get_object_key(cid); 252 253 let put_data = SegmentedBytes::from(bytes::Bytes::copy_from_slice(data)); 254 255 self.client 256 .put_object(&self.bucket, &object_key, put_data) 257 .send() 258 .await 259 .map_err(|e| ContentError::StorageFileOperationFailed { 260 operation: format!("Failed to write S3 object: {}", e), 261 })?; 262 263 Ok(()) 264 } 265 266 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> { 267 let object_key = self.get_object_key(cid); 268 269 let response = self 270 .client 271 .get_object(&self.bucket, &object_key) 272 .send() 273 .await 274 .map_err(|e| ContentError::StorageFileOperationFailed { 275 operation: format!("Failed to read S3 object: {}", e), 276 })?; 277 278 let data = response 279 .content 280 .to_segmented_bytes() 281 .await 282 .map_err(|e| ContentError::StorageFileOperationFailed { 283 operation: format!("Failed to read S3 object data: {}", e), 284 })? 285 .to_bytes(); 286 287 Ok(data.to_vec()) 288 } 289} 290 291/// A read-through cache implementation for ContentStorage that caches content by CID 292/// and uses a bloom filter to track CIDs that were not found in the underlying storage. 293pub struct CachedContentStorage<T: ContentStorage> { 294 underlying_storage: Arc<T>, 295 /// Cache of content by CID 296 content_cache: Arc<RwLock<HashMap<String, Vec<u8>>>>, 297 /// Bloom filter to track CIDs that were not found in underlying storage 298 not_found_filter: Arc<RwLock<Bloom<String>>>, 299 /// Maximum number of items to cache 300 cache_size_limit: usize, 301} 302 303impl<T: ContentStorage> CachedContentStorage<T> { 304 /// Create a new cached content storage with the given underlying storage. 305 /// 306 /// # Arguments 307 /// * `underlying_storage` - The underlying ContentStorage implementation to wrap 308 /// * `cache_size_limit` - Maximum number of items to cache (default: 1000) 309 /// * `bloom_filter_capacity` - Expected number of items in the bloom filter (default: 10000) 310 /// * `bloom_filter_error_rate` - False positive probability for bloom filter (default: 0.01) 311 pub fn new(underlying_storage: Arc<T>) -> Self { 312 Self::with_config(underlying_storage, 1000, 10000, 0.01) 313 } 314 315 /// Create a new cached content storage with custom configuration. 316 pub fn with_config( 317 underlying_storage: Arc<T>, 318 cache_size_limit: usize, 319 bloom_filter_capacity: usize, 320 bloom_filter_error_rate: f64, 321 ) -> Self { 322 let bloom_filter = Bloom::new_for_fp_rate(bloom_filter_capacity, bloom_filter_error_rate) 323 .expect("TODO fix this"); 324 325 Self { 326 underlying_storage, 327 content_cache: Arc::new(RwLock::new(HashMap::new())), 328 not_found_filter: Arc::new(RwLock::new(bloom_filter)), 329 cache_size_limit, 330 } 331 } 332 333 /// Check if the cache is at capacity and evict items if necessary. 334 /// Uses a simple LRU-style eviction by clearing the cache when it's full. 335 async fn maybe_evict_cache(&self) { 336 let cache = self.content_cache.read().await; 337 if cache.len() >= self.cache_size_limit { 338 drop(cache); 339 let mut cache = self.content_cache.write().await; 340 if cache.len() >= self.cache_size_limit { 341 // Simple eviction strategy: clear the entire cache 342 // In a production system, you might want to implement LRU eviction 343 cache.clear(); 344 } 345 } 346 } 347 348 /// Add a CID to the not-found bloom filter 349 async fn add_to_not_found_filter(&self, cid: &str) { 350 let mut filter = self.not_found_filter.write().await; 351 filter.set(&cid.to_string()); 352 } 353 354 /// Check if a CID is in the not-found bloom filter 355 async fn is_in_not_found_filter(&self, cid: &str) -> bool { 356 let filter = self.not_found_filter.read().await; 357 filter.check(&cid.to_string()) 358 } 359 360 /// Add content to the cache 361 async fn add_to_cache(&self, cid: &str, content: Vec<u8>) { 362 self.maybe_evict_cache().await; 363 let mut cache = self.content_cache.write().await; 364 cache.insert(cid.to_string(), content); 365 } 366 367 /// Get content from the cache 368 async fn get_from_cache(&self, cid: &str) -> Option<Vec<u8>> { 369 let cache = self.content_cache.read().await; 370 cache.get(cid).cloned() 371 } 372 373 /// Remove content from the cache (used when content is written) 374 async fn remove_from_cache(&self, cid: &str) { 375 let mut cache = self.content_cache.write().await; 376 cache.remove(cid); 377 } 378 379 /// Clear the not-found filter entry for a CID (used when content is written) 380 async fn clear_not_found_filter(&self, _cid: &str) { 381 // Note: Bloom filters don't support removal, so we recreate the filter 382 // This is a limitation of bloom filters - in production you might want 383 // to use a counting bloom filter or periodically reset the filter 384 let mut filter = self.not_found_filter.write().await; 385 *filter = Bloom::new_for_fp_rate(10000, 0.01).expect("TODO fix this"); 386 } 387} 388 389#[async_trait] 390impl<T: ContentStorage> ContentStorage for CachedContentStorage<T> { 391 async fn content_exists(&self, cid: &str) -> Result<bool> { 392 // First check the cache 393 if self.get_from_cache(cid).await.is_some() { 394 return Ok(true); 395 } 396 397 // Check the not-found filter to avoid expensive lookups 398 if self.is_in_not_found_filter(cid).await { 399 return Ok(false); 400 } 401 402 // Check the underlying storage 403 let exists = self.underlying_storage.content_exists(cid).await?; 404 405 // If not found, add to the not-found filter 406 if !exists { 407 self.add_to_not_found_filter(cid).await; 408 } 409 410 Ok(exists) 411 } 412 413 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { 414 // Write to underlying storage first 415 self.underlying_storage.write_content(cid, data).await?; 416 417 // Clear any not-found filter entry 418 self.clear_not_found_filter(cid).await; 419 420 // Remove from cache (it will be cached on next read) 421 self.remove_from_cache(cid).await; 422 423 Ok(()) 424 } 425 426 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> { 427 // First check the cache 428 if let Some(cached_content) = self.get_from_cache(cid).await { 429 return Ok(cached_content); 430 } 431 432 // Check the not-found filter to avoid expensive lookups 433 if self.is_in_not_found_filter(cid).await { 434 return Err(ContentError::StorageFileOperationFailed { 435 operation: format!("Content not found: {}", cid), 436 } 437 .into()); 438 } 439 440 // Read from underlying storage 441 match self.underlying_storage.read_content(cid).await { 442 Ok(content) => { 443 // Cache the content 444 self.add_to_cache(cid, content.clone()).await; 445 Ok(content) 446 } 447 Err(e) => { 448 // If it's a not-found error, add to the not-found filter 449 if e.to_string() 450 .starts_with("error-smokesignal-content-2 File storage operation failed:") 451 { 452 self.add_to_not_found_filter(cid).await; 453 } 454 Err(e) 455 } 456 } 457 } 458}