An ATProtocol powered blogging engine.
at main 327 lines 10 kB view raw
1use std::path::{Path, PathBuf}; 2 3use crate::errors::Result; 4use async_trait::async_trait; 5use tokio::fs; 6 7#[cfg(feature = "s3")] 8use minio::s3::{Client as MinioClient, creds::StaticProvider, http::BaseUrl, types::S3Api}; 9 10use super::ContentStorage; 11 12pub use self::FilesystemContentStorage as FilesystemStorage; 13 14/// Parse an S3 URL in the format: s3://[key]:[secret]@hostname/bucket[/optional_prefix] 15/// Returns (endpoint, access_key, secret_key, bucket, prefix) 16#[cfg(feature = "s3")] 17pub fn parse_s3_url(url: &str) -> Result<(String, String, String, String, Option<String>)> { 18 if !url.starts_with("s3://") { 19 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid { 20 details: format!("Invalid S3 URL format: {}", url), 21 }); 22 } 23 24 let url_without_scheme = &url[5..]; // Remove "s3://" 25 26 // Split by '@' to separate credentials from hostname/path 27 let parts: Vec<&str> = url_without_scheme.splitn(2, '@').collect(); 28 if parts.len() != 2 { 29 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid { 30 details: format!("Invalid S3 URL format - missing @ separator: {}", url), 31 }); 32 } 33 34 let credentials = parts[0]; 35 let hostname_and_path = parts[1]; 36 37 // Parse credentials: key:secret 38 let cred_parts: Vec<&str> = credentials.splitn(2, ':').collect(); 39 if cred_parts.len() != 2 { 40 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid { 41 details: format!( 42 "Invalid S3 URL format - credentials must be key:secret: {}", 43 url 44 ), 45 }); 46 } 47 48 let access_key = cred_parts[0].to_string(); 49 let secret_key = cred_parts[1].to_string(); 50 51 // Parse hostname and path: hostname/bucket[/prefix] 52 let path_parts: Vec<&str> = hostname_and_path.splitn(2, '/').collect(); 53 if path_parts.len() != 2 { 54 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid { 55 details: format!("Invalid S3 URL format - must include bucket: {}", url), 56 }); 57 } 58 59 let hostname = path_parts[0].to_string(); 60 let bucket_and_prefix = path_parts[1]; 61 62 // Split bucket from optional prefix 63 let bucket_parts: Vec<&str> = bucket_and_prefix.splitn(2, '/').collect(); 64 let bucket = bucket_parts[0].to_string(); 65 let prefix = if bucket_parts.len() > 1 && !bucket_parts[1].is_empty() { 66 Some(bucket_parts[1].to_string()) 67 } else { 68 None 69 }; 70 71 let endpoint = if hostname.starts_with("http://") || hostname.starts_with("https://") { 72 hostname 73 } else { 74 format!("https://{}", hostname) 75 }; 76 77 Ok((endpoint, access_key, secret_key, bucket, prefix)) 78} 79 80/// Local filesystem implementation of content storage. 81#[derive(Debug, Clone)] 82pub struct FilesystemContentStorage { 83 base_dir: PathBuf, 84} 85 86impl FilesystemContentStorage { 87 /// Create a new filesystem content storage with the given base directory. 88 pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> { 89 let base_dir = base_dir.as_ref().to_path_buf(); 90 91 // Ensure the base directory exists 92 fs::create_dir_all(&base_dir).await?; 93 94 Ok(Self { base_dir }) 95 } 96 97 /// Get the file path for a given CID. 98 /// Uses a subdirectory structure based on the first few characters of the CID 99 /// to avoid having too many files in a single directory. 100 fn get_content_path(&self, cid: &str) -> PathBuf { 101 // Use first 2 characters for first level directory 102 // and next 2 characters for second level directory 103 let (dir1, dir2, filename) = if cid.len() >= 4 { 104 (&cid[0..2], &cid[2..4], cid) 105 } else if cid.len() >= 2 { 106 (&cid[0..2], "00", cid) 107 } else { 108 ("00", "00", cid) 109 }; 110 111 self.base_dir.join(dir1).join(dir2).join(filename) 112 } 113} 114 115#[async_trait] 116impl ContentStorage for FilesystemContentStorage { 117 async fn content_exists(&self, cid: &str) -> Result<bool> { 118 let path = self.get_content_path(cid); 119 Ok(fs::try_exists(&path).await?) 120 } 121 122 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { 123 let path = self.get_content_path(cid); 124 125 // Ensure parent directory exists 126 if let Some(parent) = path.parent() { 127 fs::create_dir_all(parent).await?; 128 } 129 130 // Write content atomically by writing to a temp file first 131 let temp_path = path.with_extension("tmp"); 132 fs::write(&temp_path, data).await?; 133 134 // Rename temp file to final path (atomic on most filesystems) 135 fs::rename(&temp_path, &path).await?; 136 137 Ok(()) 138 } 139 140 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> { 141 let path = self.get_content_path(cid); 142 Ok(fs::read(&path).await?) 143 } 144} 145 146#[cfg(feature = "s3")] 147/// S3-compatible object storage implementation for content storage. 148pub struct S3FileStorage { 149 client: MinioClient, 150 bucket: String, 151 prefix: Option<String>, 152} 153 154#[cfg(feature = "s3")] 155impl S3FileStorage { 156 /// Create a new S3FileStorage with the given credentials and bucket information 157 pub fn new( 158 endpoint: String, 159 access_key: String, 160 secret_key: String, 161 bucket: String, 162 prefix: Option<String>, 163 ) -> Result<Self> { 164 let base_url: BaseUrl = endpoint.parse().unwrap(); 165 tracing::debug!(?base_url, "s3 file storage base url"); 166 167 let static_provider = StaticProvider::new(&access_key, &secret_key, None); 168 tracing::debug!(?static_provider, "s3 file storage static provider"); 169 170 let client = MinioClient::new(base_url, Some(Box::new(static_provider)), None, None) 171 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed { 172 operation: format!("Failed to create S3 client: {}", e), 173 })?; 174 175 Ok(Self { 176 client, 177 bucket, 178 prefix, 179 }) 180 } 181 182 /// Get the full object key by combining prefix with path 183 fn get_object_key(&self, path: &str) -> String { 184 match &self.prefix { 185 Some(prefix) => { 186 if path.starts_with('/') { 187 format!("/{prefix}{path}") 188 } else { 189 format!("/{prefix}/{path}") 190 } 191 } 192 None => { 193 if path.starts_with('/') { 194 path.to_string() 195 } else { 196 format!("/{path}") 197 } 198 } 199 } 200 } 201} 202 203#[cfg(feature = "s3")] 204#[async_trait] 205impl ContentStorage for S3FileStorage { 206 async fn content_exists(&self, cid: &str) -> Result<bool> { 207 use minio::s3::error::ErrorCode; 208 209 let object_key = self.get_object_key(cid); 210 211 match self 212 .client 213 .stat_object(&self.bucket, &object_key) 214 .send() 215 .await 216 { 217 Ok(_) => Ok(true), 218 Err(minio::s3::error::Error::S3Error(ref s3_err)) 219 if s3_err.code == ErrorCode::NoSuchKey => 220 { 221 Ok(false) 222 } 223 Err(e) => Err(crate::errors::BlahgError::StorageFileOperationFailed { 224 operation: format!("Failed to check if S3 object exists: {}", e), 225 }), 226 } 227 } 228 229 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> { 230 use minio::s3::segmented_bytes::SegmentedBytes; 231 232 let object_key = self.get_object_key(cid); 233 234 let put_data = SegmentedBytes::from(bytes::Bytes::copy_from_slice(data)); 235 236 self.client 237 .put_object(&self.bucket, &object_key, put_data) 238 .send() 239 .await 240 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed { 241 operation: format!("Failed to write S3 object: {}", e), 242 })?; 243 244 Ok(()) 245 } 246 247 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> { 248 let object_key = self.get_object_key(cid); 249 250 let response = self 251 .client 252 .get_object(&self.bucket, &object_key) 253 .send() 254 .await 255 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed { 256 operation: format!("Failed to read S3 object: {}", e), 257 })?; 258 259 let data = response 260 .content 261 .to_segmented_bytes() 262 .await 263 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed { 264 operation: format!("Failed to read S3 object data: {}", e), 265 })? 266 .to_bytes(); 267 268 Ok(data.to_vec()) 269 } 270} 271 272#[cfg(test)] 273mod tests { 274 use super::*; 275 use tempfile::TempDir; 276 277 #[tokio::test] 278 async fn test_filesystem_content_storage() -> Result<()> { 279 let temp_dir = TempDir::new()?; 280 let storage = FilesystemContentStorage::new(temp_dir.path()).await?; 281 282 let cid = "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"; 283 let data = b"Hello, world!"; 284 285 // Test content doesn't exist initially 286 assert!(!storage.content_exists(cid).await?); 287 288 // Write content 289 storage.write_content(cid, data).await?; 290 291 // Test content exists after writing 292 assert!(storage.content_exists(cid).await?); 293 294 // Read content and verify 295 let read_data = storage.read_content(cid).await?; 296 assert_eq!(read_data, data); 297 298 Ok(()) 299 } 300 301 #[tokio::test] 302 async fn test_filesystem_path_structure() { 303 let temp_dir = TempDir::new().unwrap(); 304 let storage = FilesystemContentStorage::new(temp_dir.path()) 305 .await 306 .unwrap(); 307 308 // Test normal CID 309 let path = 310 storage.get_content_path("bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"); 311 assert_eq!( 312 path.file_name().unwrap(), 313 "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 314 ); 315 assert!(path.parent().unwrap().ends_with("ba/fy")); 316 317 // Test short CID 318 let path = storage.get_content_path("ab"); 319 assert_eq!(path.file_name().unwrap(), "ab"); 320 assert!(path.parent().unwrap().ends_with("ab/00")); 321 322 // Test very short CID 323 let path = storage.get_content_path("a"); 324 assert_eq!(path.file_name().unwrap(), "a"); 325 assert!(path.parent().unwrap().ends_with("00/00")); 326 } 327}