use anyhow::Result; use async_trait::async_trait; use bloomfilter::Bloom; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::fs; use tokio::sync::RwLock; #[cfg(feature = "s3")] use minio::s3::{Client as MinioClient, creds::StaticProvider, http::BaseUrl, types::S3Api}; use crate::storage::errors::ContentError; /// Trait for storing and retrieving content by CID. #[async_trait] pub trait ContentStorage: Send + Sync { /// Check if content exists for a given CID. async fn content_exists(&self, cid: &str) -> Result; /// Write content data for a given CID. async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()>; /// Read content data for a given CID. async fn read_content(&self, cid: &str) -> Result>; } pub use self::FilesystemContentStorage as FilesystemStorage; /// Parse an S3 URL in the format: s3://[key]:[secret]@hostname/bucket[/optional_prefix] /// Returns (endpoint, access_key, secret_key, bucket, prefix) #[cfg(feature = "s3")] pub fn parse_s3_url( url: &str, ) -> Result<(String, String, String, String, Option), ContentError> { if !url.starts_with("s3://") { return Err(ContentError::ConfigS3UrlInvalid { details: format!("Invalid S3 URL format: {}", url), }); } let url_without_scheme = &url[5..]; // Remove "s3://" // Split by '@' to separate credentials from hostname/path let parts: Vec<&str> = url_without_scheme.splitn(2, '@').collect(); if parts.len() != 2 { return Err(ContentError::ConfigS3UrlInvalid { details: format!("Invalid S3 URL format - missing @ separator: {}", url), }); } let credentials = parts[0]; let hostname_and_path = parts[1]; // Parse credentials: key:secret let cred_parts: Vec<&str> = credentials.splitn(2, ':').collect(); if cred_parts.len() != 2 { return Err(ContentError::ConfigS3UrlInvalid { details: format!( "Invalid S3 URL format - credentials must be key:secret: {}", url ), }); } let access_key = cred_parts[0].to_string(); let secret_key = cred_parts[1].to_string(); // Parse hostname and path: hostname/bucket[/prefix] let path_parts: Vec<&str> = hostname_and_path.splitn(2, '/').collect(); if path_parts.len() != 2 { return Err(ContentError::ConfigS3UrlInvalid { details: format!("Invalid S3 URL format - must include bucket: {}", url), }); } let hostname = path_parts[0].to_string(); let bucket_and_prefix = path_parts[1]; // Split bucket from optional prefix let bucket_parts: Vec<&str> = bucket_and_prefix.splitn(2, '/').collect(); let bucket = bucket_parts[0].to_string(); let prefix = if bucket_parts.len() > 1 && !bucket_parts[1].is_empty() { Some(bucket_parts[1].to_string()) } else { None }; let endpoint = if hostname.starts_with("http://") || hostname.starts_with("https://") { hostname } else { format!("https://{}", hostname) }; Ok((endpoint, access_key, secret_key, bucket, prefix)) } /// Local filesystem implementation of content storage. #[derive(Debug, Clone)] pub struct FilesystemContentStorage { base_dir: PathBuf, } impl FilesystemContentStorage { /// Create a new filesystem content storage with the given base directory. pub async fn new>(base_dir: P) -> Result { let base_dir = base_dir.as_ref().to_path_buf(); // Ensure the base directory exists fs::create_dir_all(&base_dir).await?; Ok(Self { base_dir }) } /// Get the file path for a given CID. /// Uses a subdirectory structure based on the first few characters of the CID /// to avoid having too many files in a single directory. fn get_content_path(&self, cid: &str) -> PathBuf { // Use first 2 characters for first level directory // and next 2 characters for second level directory let (dir1, dir2, filename) = if cid.len() >= 4 { (&cid[0..2], &cid[2..4], cid) } else if cid.len() >= 2 { (&cid[0..2], "00", cid) } else { ("00", "00", cid) }; self.base_dir.join(dir1).join(dir2).join(filename) } } #[async_trait] impl ContentStorage for FilesystemContentStorage { async fn content_exists(&self, cid: &str) -> Result { let path = self.get_content_path(cid); Ok(fs::try_exists(&path).await?) } async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { let path = self.get_content_path(cid); // Ensure parent directory exists if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } // Write content atomically by writing to a temp file first let temp_path = path.with_extension("tmp"); fs::write(&temp_path, data).await?; // Rename temp file to final path (atomic on most filesystems) fs::rename(&temp_path, &path).await?; Ok(()) } async fn read_content(&self, cid: &str) -> Result> { let path = self.get_content_path(cid); Ok(fs::read(&path).await?) } } #[cfg(feature = "s3")] /// S3-compatible object storage implementation for content storage. pub struct S3FileStorage { client: MinioClient, bucket: String, prefix: Option, } #[cfg(feature = "s3")] impl S3FileStorage { /// Create a new S3FileStorage with the given credentials and bucket information pub fn new( endpoint: String, access_key: String, secret_key: String, bucket: String, prefix: Option, ) -> Result { let base_url: BaseUrl = endpoint.parse().unwrap(); tracing::debug!(?base_url, "s3 file storage base url"); let static_provider = StaticProvider::new(&access_key, &secret_key, None); tracing::debug!(?static_provider, "s3 file storage static provider"); let client = MinioClient::new(base_url, Some(Box::new(static_provider)), None, None) .map_err(|e| ContentError::StorageFileOperationFailed { operation: format!("Failed to create S3 client: {}", e), })?; Ok(Self { client, bucket, prefix, }) } /// Get the full object key by combining prefix with path fn get_object_key(&self, path: &str) -> String { match &self.prefix { Some(prefix) => { if path.starts_with('/') { format!("/{prefix}{path}") } else { format!("/{prefix}/{path}") } } None => { if path.starts_with('/') { path.to_string() } else { format!("/{path}") } } } } } #[cfg(feature = "s3")] #[async_trait] impl ContentStorage for S3FileStorage { async fn content_exists(&self, cid: &str) -> Result { use minio::s3::error::ErrorCode; let object_key = self.get_object_key(cid); match self .client .stat_object(&self.bucket, &object_key) .send() .await { Ok(_) => Ok(true), Err(minio::s3::error::Error::S3Error(ref s3_err)) if s3_err.code == ErrorCode::NoSuchKey => { Ok(false) } Err(e) => Err(ContentError::StorageFileOperationFailed { operation: format!("Failed to check if S3 object exists: {}", e), } .into()), } } async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { use minio::s3::segmented_bytes::SegmentedBytes; let object_key = self.get_object_key(cid); let put_data = SegmentedBytes::from(bytes::Bytes::copy_from_slice(data)); self.client .put_object(&self.bucket, &object_key, put_data) .send() .await .map_err(|e| ContentError::StorageFileOperationFailed { operation: format!("Failed to write S3 object: {}", e), })?; Ok(()) } async fn read_content(&self, cid: &str) -> Result> { let object_key = self.get_object_key(cid); let response = self .client .get_object(&self.bucket, &object_key) .send() .await .map_err(|e| ContentError::StorageFileOperationFailed { operation: format!("Failed to read S3 object: {}", e), })?; let data = response .content .to_segmented_bytes() .await .map_err(|e| ContentError::StorageFileOperationFailed { operation: format!("Failed to read S3 object data: {}", e), })? .to_bytes(); Ok(data.to_vec()) } } /// A read-through cache implementation for ContentStorage that caches content by CID /// and uses a bloom filter to track CIDs that were not found in the underlying storage. pub struct CachedContentStorage { underlying_storage: Arc, /// Cache of content by CID content_cache: Arc>>>, /// Bloom filter to track CIDs that were not found in underlying storage not_found_filter: Arc>>, /// Maximum number of items to cache cache_size_limit: usize, } impl CachedContentStorage { /// Create a new cached content storage with the given underlying storage. /// /// # Arguments /// * `underlying_storage` - The underlying ContentStorage implementation to wrap /// * `cache_size_limit` - Maximum number of items to cache (default: 1000) /// * `bloom_filter_capacity` - Expected number of items in the bloom filter (default: 10000) /// * `bloom_filter_error_rate` - False positive probability for bloom filter (default: 0.01) pub fn new(underlying_storage: Arc) -> Self { Self::with_config(underlying_storage, 1000, 10000, 0.01) } /// Create a new cached content storage with custom configuration. pub fn with_config( underlying_storage: Arc, cache_size_limit: usize, bloom_filter_capacity: usize, bloom_filter_error_rate: f64, ) -> Self { let bloom_filter = Bloom::new_for_fp_rate(bloom_filter_capacity, bloom_filter_error_rate) .expect("TODO fix this"); Self { underlying_storage, content_cache: Arc::new(RwLock::new(HashMap::new())), not_found_filter: Arc::new(RwLock::new(bloom_filter)), cache_size_limit, } } /// Check if the cache is at capacity and evict items if necessary. /// Uses a simple LRU-style eviction by clearing the cache when it's full. async fn maybe_evict_cache(&self) { let cache = self.content_cache.read().await; if cache.len() >= self.cache_size_limit { drop(cache); let mut cache = self.content_cache.write().await; if cache.len() >= self.cache_size_limit { // Simple eviction strategy: clear the entire cache // In a production system, you might want to implement LRU eviction cache.clear(); } } } /// Add a CID to the not-found bloom filter async fn add_to_not_found_filter(&self, cid: &str) { let mut filter = self.not_found_filter.write().await; filter.set(&cid.to_string()); } /// Check if a CID is in the not-found bloom filter async fn is_in_not_found_filter(&self, cid: &str) -> bool { let filter = self.not_found_filter.read().await; filter.check(&cid.to_string()) } /// Add content to the cache async fn add_to_cache(&self, cid: &str, content: Vec) { self.maybe_evict_cache().await; let mut cache = self.content_cache.write().await; cache.insert(cid.to_string(), content); } /// Get content from the cache async fn get_from_cache(&self, cid: &str) -> Option> { let cache = self.content_cache.read().await; cache.get(cid).cloned() } /// Remove content from the cache (used when content is written) async fn remove_from_cache(&self, cid: &str) { let mut cache = self.content_cache.write().await; cache.remove(cid); } /// Clear the not-found filter entry for a CID (used when content is written) async fn clear_not_found_filter(&self, _cid: &str) { // Note: Bloom filters don't support removal, so we recreate the filter // This is a limitation of bloom filters - in production you might want // to use a counting bloom filter or periodically reset the filter let mut filter = self.not_found_filter.write().await; *filter = Bloom::new_for_fp_rate(10000, 0.01).expect("TODO fix this"); } } #[async_trait] impl ContentStorage for CachedContentStorage { async fn content_exists(&self, cid: &str) -> Result { // First check the cache if self.get_from_cache(cid).await.is_some() { return Ok(true); } // Check the not-found filter to avoid expensive lookups if self.is_in_not_found_filter(cid).await { return Ok(false); } // Check the underlying storage let exists = self.underlying_storage.content_exists(cid).await?; // If not found, add to the not-found filter if !exists { self.add_to_not_found_filter(cid).await; } Ok(exists) } async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { // Write to underlying storage first self.underlying_storage.write_content(cid, data).await?; // Clear any not-found filter entry self.clear_not_found_filter(cid).await; // Remove from cache (it will be cached on next read) self.remove_from_cache(cid).await; Ok(()) } async fn read_content(&self, cid: &str) -> Result> { // First check the cache if let Some(cached_content) = self.get_from_cache(cid).await { return Ok(cached_content); } // Check the not-found filter to avoid expensive lookups if self.is_in_not_found_filter(cid).await { return Err(ContentError::StorageFileOperationFailed { operation: format!("Content not found: {}", cid), } .into()); } // Read from underlying storage match self.underlying_storage.read_content(cid).await { Ok(content) => { // Cache the content self.add_to_cache(cid, content.clone()).await; Ok(content) } Err(e) => { // If it's a not-found error, add to the not-found filter if e.to_string() .starts_with("error-smokesignal-content-2 File storage operation failed:") { self.add_to_not_found_filter(cid).await; } Err(e) } } } }