The smokesignal.events web application
1use anyhow::Result;
2use async_trait::async_trait;
3use bloomfilter::Bloom;
4use std::collections::HashMap;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use tokio::fs;
8use tokio::sync::RwLock;
9
10#[cfg(feature = "s3")]
11use minio::s3::{Client as MinioClient, creds::StaticProvider, http::BaseUrl, types::S3Api};
12
13use crate::storage::errors::ContentError;
14
15/// Trait for storing and retrieving content by CID.
16#[async_trait]
17pub trait ContentStorage: Send + Sync {
18 /// Check if content exists for a given CID.
19 async fn content_exists(&self, cid: &str) -> Result<bool>;
20
21 /// Write content data for a given CID.
22 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()>;
23
24 /// Read content data for a given CID.
25 async fn read_content(&self, cid: &str) -> Result<Vec<u8>>;
26}
27
28pub use self::FilesystemContentStorage as FilesystemStorage;
29
30/// Parse an S3 URL in the format: s3://[key]:[secret]@hostname/bucket[/optional_prefix]
31/// Returns (endpoint, access_key, secret_key, bucket, prefix)
32#[cfg(feature = "s3")]
33pub fn parse_s3_url(
34 url: &str,
35) -> Result<(String, String, String, String, Option<String>), ContentError> {
36 if !url.starts_with("s3://") {
37 return Err(ContentError::ConfigS3UrlInvalid {
38 details: format!("Invalid S3 URL format: {}", url),
39 });
40 }
41
42 let url_without_scheme = &url[5..]; // Remove "s3://"
43
44 // Split by '@' to separate credentials from hostname/path
45 let parts: Vec<&str> = url_without_scheme.splitn(2, '@').collect();
46 if parts.len() != 2 {
47 return Err(ContentError::ConfigS3UrlInvalid {
48 details: format!("Invalid S3 URL format - missing @ separator: {}", url),
49 });
50 }
51
52 let credentials = parts[0];
53 let hostname_and_path = parts[1];
54
55 // Parse credentials: key:secret
56 let cred_parts: Vec<&str> = credentials.splitn(2, ':').collect();
57 if cred_parts.len() != 2 {
58 return Err(ContentError::ConfigS3UrlInvalid {
59 details: format!(
60 "Invalid S3 URL format - credentials must be key:secret: {}",
61 url
62 ),
63 });
64 }
65
66 let access_key = cred_parts[0].to_string();
67 let secret_key = cred_parts[1].to_string();
68
69 // Parse hostname and path: hostname/bucket[/prefix]
70 let path_parts: Vec<&str> = hostname_and_path.splitn(2, '/').collect();
71 if path_parts.len() != 2 {
72 return Err(ContentError::ConfigS3UrlInvalid {
73 details: format!("Invalid S3 URL format - must include bucket: {}", url),
74 });
75 }
76
77 let hostname = path_parts[0].to_string();
78 let bucket_and_prefix = path_parts[1];
79
80 // Split bucket from optional prefix
81 let bucket_parts: Vec<&str> = bucket_and_prefix.splitn(2, '/').collect();
82 let bucket = bucket_parts[0].to_string();
83 let prefix = if bucket_parts.len() > 1 && !bucket_parts[1].is_empty() {
84 Some(bucket_parts[1].to_string())
85 } else {
86 None
87 };
88
89 let endpoint = if hostname.starts_with("http://") || hostname.starts_with("https://") {
90 hostname
91 } else {
92 format!("https://{}", hostname)
93 };
94
95 Ok((endpoint, access_key, secret_key, bucket, prefix))
96}
97
98/// Local filesystem implementation of content storage.
99#[derive(Debug, Clone)]
100pub struct FilesystemContentStorage {
101 base_dir: PathBuf,
102}
103
104impl FilesystemContentStorage {
105 /// Create a new filesystem content storage with the given base directory.
106 pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
107 let base_dir = base_dir.as_ref().to_path_buf();
108
109 // Ensure the base directory exists
110 fs::create_dir_all(&base_dir).await?;
111
112 Ok(Self { base_dir })
113 }
114
115 /// Get the file path for a given CID.
116 /// Uses a subdirectory structure based on the first few characters of the CID
117 /// to avoid having too many files in a single directory.
118 fn get_content_path(&self, cid: &str) -> PathBuf {
119 // Use first 2 characters for first level directory
120 // and next 2 characters for second level directory
121 let (dir1, dir2, filename) = if cid.len() >= 4 {
122 (&cid[0..2], &cid[2..4], cid)
123 } else if cid.len() >= 2 {
124 (&cid[0..2], "00", cid)
125 } else {
126 ("00", "00", cid)
127 };
128
129 self.base_dir.join(dir1).join(dir2).join(filename)
130 }
131}
132
133#[async_trait]
134impl ContentStorage for FilesystemContentStorage {
135 async fn content_exists(&self, cid: &str) -> Result<bool> {
136 let path = self.get_content_path(cid);
137 Ok(fs::try_exists(&path).await?)
138 }
139
140 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> {
141 let path = self.get_content_path(cid);
142
143 // Ensure parent directory exists
144 if let Some(parent) = path.parent() {
145 fs::create_dir_all(parent).await?;
146 }
147
148 // Write content atomically by writing to a temp file first
149 let temp_path = path.with_extension("tmp");
150 fs::write(&temp_path, data).await?;
151
152 // Rename temp file to final path (atomic on most filesystems)
153 fs::rename(&temp_path, &path).await?;
154
155 Ok(())
156 }
157
158 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> {
159 let path = self.get_content_path(cid);
160 Ok(fs::read(&path).await?)
161 }
162}
163
164#[cfg(feature = "s3")]
165/// S3-compatible object storage implementation for content storage.
166pub struct S3FileStorage {
167 client: MinioClient,
168 bucket: String,
169 prefix: Option<String>,
170}
171
172#[cfg(feature = "s3")]
173impl S3FileStorage {
174 /// Create a new S3FileStorage with the given credentials and bucket information
175 pub fn new(
176 endpoint: String,
177 access_key: String,
178 secret_key: String,
179 bucket: String,
180 prefix: Option<String>,
181 ) -> Result<Self> {
182 let base_url: BaseUrl = endpoint.parse().unwrap();
183 tracing::debug!(?base_url, "s3 file storage base url");
184
185 let static_provider = StaticProvider::new(&access_key, &secret_key, None);
186 tracing::debug!(?static_provider, "s3 file storage static provider");
187
188 let client = MinioClient::new(base_url, Some(Box::new(static_provider)), None, None)
189 .map_err(|e| ContentError::StorageFileOperationFailed {
190 operation: format!("Failed to create S3 client: {}", e),
191 })?;
192
193 Ok(Self {
194 client,
195 bucket,
196 prefix,
197 })
198 }
199
200 /// Get the full object key by combining prefix with path
201 fn get_object_key(&self, path: &str) -> String {
202 match &self.prefix {
203 Some(prefix) => {
204 if path.starts_with('/') {
205 format!("/{prefix}{path}")
206 } else {
207 format!("/{prefix}/{path}")
208 }
209 }
210 None => {
211 if path.starts_with('/') {
212 path.to_string()
213 } else {
214 format!("/{path}")
215 }
216 }
217 }
218 }
219}
220
221#[cfg(feature = "s3")]
222#[async_trait]
223impl ContentStorage for S3FileStorage {
224 async fn content_exists(&self, cid: &str) -> Result<bool> {
225 use minio::s3::error::ErrorCode;
226
227 let object_key = self.get_object_key(cid);
228
229 match self
230 .client
231 .stat_object(&self.bucket, &object_key)
232 .send()
233 .await
234 {
235 Ok(_) => Ok(true),
236 Err(minio::s3::error::Error::S3Error(ref s3_err))
237 if s3_err.code == ErrorCode::NoSuchKey =>
238 {
239 Ok(false)
240 }
241 Err(e) => Err(ContentError::StorageFileOperationFailed {
242 operation: format!("Failed to check if S3 object exists: {}", e),
243 }
244 .into()),
245 }
246 }
247
248 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> {
249 use minio::s3::segmented_bytes::SegmentedBytes;
250
251 let object_key = self.get_object_key(cid);
252
253 let put_data = SegmentedBytes::from(bytes::Bytes::copy_from_slice(data));
254
255 self.client
256 .put_object(&self.bucket, &object_key, put_data)
257 .send()
258 .await
259 .map_err(|e| ContentError::StorageFileOperationFailed {
260 operation: format!("Failed to write S3 object: {}", e),
261 })?;
262
263 Ok(())
264 }
265
266 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> {
267 let object_key = self.get_object_key(cid);
268
269 let response = self
270 .client
271 .get_object(&self.bucket, &object_key)
272 .send()
273 .await
274 .map_err(|e| ContentError::StorageFileOperationFailed {
275 operation: format!("Failed to read S3 object: {}", e),
276 })?;
277
278 let data = response
279 .content
280 .to_segmented_bytes()
281 .await
282 .map_err(|e| ContentError::StorageFileOperationFailed {
283 operation: format!("Failed to read S3 object data: {}", e),
284 })?
285 .to_bytes();
286
287 Ok(data.to_vec())
288 }
289}
290
291/// A read-through cache implementation for ContentStorage that caches content by CID
292/// and uses a bloom filter to track CIDs that were not found in the underlying storage.
293pub struct CachedContentStorage<T: ContentStorage> {
294 underlying_storage: Arc<T>,
295 /// Cache of content by CID
296 content_cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
297 /// Bloom filter to track CIDs that were not found in underlying storage
298 not_found_filter: Arc<RwLock<Bloom<String>>>,
299 /// Maximum number of items to cache
300 cache_size_limit: usize,
301}
302
303impl<T: ContentStorage> CachedContentStorage<T> {
304 /// Create a new cached content storage with the given underlying storage.
305 ///
306 /// # Arguments
307 /// * `underlying_storage` - The underlying ContentStorage implementation to wrap
308 /// * `cache_size_limit` - Maximum number of items to cache (default: 1000)
309 /// * `bloom_filter_capacity` - Expected number of items in the bloom filter (default: 10000)
310 /// * `bloom_filter_error_rate` - False positive probability for bloom filter (default: 0.01)
311 pub fn new(underlying_storage: Arc<T>) -> Self {
312 Self::with_config(underlying_storage, 1000, 10000, 0.01)
313 }
314
315 /// Create a new cached content storage with custom configuration.
316 pub fn with_config(
317 underlying_storage: Arc<T>,
318 cache_size_limit: usize,
319 bloom_filter_capacity: usize,
320 bloom_filter_error_rate: f64,
321 ) -> Self {
322 let bloom_filter = Bloom::new_for_fp_rate(bloom_filter_capacity, bloom_filter_error_rate)
323 .expect("TODO fix this");
324
325 Self {
326 underlying_storage,
327 content_cache: Arc::new(RwLock::new(HashMap::new())),
328 not_found_filter: Arc::new(RwLock::new(bloom_filter)),
329 cache_size_limit,
330 }
331 }
332
333 /// Check if the cache is at capacity and evict items if necessary.
334 /// Uses a simple LRU-style eviction by clearing the cache when it's full.
335 async fn maybe_evict_cache(&self) {
336 let cache = self.content_cache.read().await;
337 if cache.len() >= self.cache_size_limit {
338 drop(cache);
339 let mut cache = self.content_cache.write().await;
340 if cache.len() >= self.cache_size_limit {
341 // Simple eviction strategy: clear the entire cache
342 // In a production system, you might want to implement LRU eviction
343 cache.clear();
344 }
345 }
346 }
347
348 /// Add a CID to the not-found bloom filter
349 async fn add_to_not_found_filter(&self, cid: &str) {
350 let mut filter = self.not_found_filter.write().await;
351 filter.set(&cid.to_string());
352 }
353
354 /// Check if a CID is in the not-found bloom filter
355 async fn is_in_not_found_filter(&self, cid: &str) -> bool {
356 let filter = self.not_found_filter.read().await;
357 filter.check(&cid.to_string())
358 }
359
360 /// Add content to the cache
361 async fn add_to_cache(&self, cid: &str, content: Vec<u8>) {
362 self.maybe_evict_cache().await;
363 let mut cache = self.content_cache.write().await;
364 cache.insert(cid.to_string(), content);
365 }
366
367 /// Get content from the cache
368 async fn get_from_cache(&self, cid: &str) -> Option<Vec<u8>> {
369 let cache = self.content_cache.read().await;
370 cache.get(cid).cloned()
371 }
372
373 /// Remove content from the cache (used when content is written)
374 async fn remove_from_cache(&self, cid: &str) {
375 let mut cache = self.content_cache.write().await;
376 cache.remove(cid);
377 }
378
379 /// Clear the not-found filter entry for a CID (used when content is written)
380 async fn clear_not_found_filter(&self, _cid: &str) {
381 // Note: Bloom filters don't support removal, so we recreate the filter
382 // This is a limitation of bloom filters - in production you might want
383 // to use a counting bloom filter or periodically reset the filter
384 let mut filter = self.not_found_filter.write().await;
385 *filter = Bloom::new_for_fp_rate(10000, 0.01).expect("TODO fix this");
386 }
387}
388
389#[async_trait]
390impl<T: ContentStorage> ContentStorage for CachedContentStorage<T> {
391 async fn content_exists(&self, cid: &str) -> Result<bool> {
392 // First check the cache
393 if self.get_from_cache(cid).await.is_some() {
394 return Ok(true);
395 }
396
397 // Check the not-found filter to avoid expensive lookups
398 if self.is_in_not_found_filter(cid).await {
399 return Ok(false);
400 }
401
402 // Check the underlying storage
403 let exists = self.underlying_storage.content_exists(cid).await?;
404
405 // If not found, add to the not-found filter
406 if !exists {
407 self.add_to_not_found_filter(cid).await;
408 }
409
410 Ok(exists)
411 }
412
413 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> {
414 // Write to underlying storage first
415 self.underlying_storage.write_content(cid, data).await?;
416
417 // Clear any not-found filter entry
418 self.clear_not_found_filter(cid).await;
419
420 // Remove from cache (it will be cached on next read)
421 self.remove_from_cache(cid).await;
422
423 Ok(())
424 }
425
426 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> {
427 // First check the cache
428 if let Some(cached_content) = self.get_from_cache(cid).await {
429 return Ok(cached_content);
430 }
431
432 // Check the not-found filter to avoid expensive lookups
433 if self.is_in_not_found_filter(cid).await {
434 return Err(ContentError::StorageFileOperationFailed {
435 operation: format!("Content not found: {}", cid),
436 }
437 .into());
438 }
439
440 // Read from underlying storage
441 match self.underlying_storage.read_content(cid).await {
442 Ok(content) => {
443 // Cache the content
444 self.add_to_cache(cid, content.clone()).await;
445 Ok(content)
446 }
447 Err(e) => {
448 // If it's a not-found error, add to the not-found filter
449 if e.to_string()
450 .starts_with("error-smokesignal-content-2 File storage operation failed:")
451 {
452 self.add_to_not_found_filter(cid).await;
453 }
454 Err(e)
455 }
456 }
457 }
458}