An ATProtocol powered blogging engine.
1use std::path::{Path, PathBuf};
2
3use crate::errors::Result;
4use async_trait::async_trait;
5use tokio::fs;
6
7#[cfg(feature = "s3")]
8use minio::s3::{Client as MinioClient, creds::StaticProvider, http::BaseUrl, types::S3Api};
9
10use super::ContentStorage;
11
12pub use self::FilesystemContentStorage as FilesystemStorage;
13
14/// Parse an S3 URL in the format: s3://[key]:[secret]@hostname/bucket[/optional_prefix]
15/// Returns (endpoint, access_key, secret_key, bucket, prefix)
16#[cfg(feature = "s3")]
17pub fn parse_s3_url(url: &str) -> Result<(String, String, String, String, Option<String>)> {
18 if !url.starts_with("s3://") {
19 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid {
20 details: format!("Invalid S3 URL format: {}", url),
21 });
22 }
23
24 let url_without_scheme = &url[5..]; // Remove "s3://"
25
26 // Split by '@' to separate credentials from hostname/path
27 let parts: Vec<&str> = url_without_scheme.splitn(2, '@').collect();
28 if parts.len() != 2 {
29 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid {
30 details: format!("Invalid S3 URL format - missing @ separator: {}", url),
31 });
32 }
33
34 let credentials = parts[0];
35 let hostname_and_path = parts[1];
36
37 // Parse credentials: key:secret
38 let cred_parts: Vec<&str> = credentials.splitn(2, ':').collect();
39 if cred_parts.len() != 2 {
40 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid {
41 details: format!(
42 "Invalid S3 URL format - credentials must be key:secret: {}",
43 url
44 ),
45 });
46 }
47
48 let access_key = cred_parts[0].to_string();
49 let secret_key = cred_parts[1].to_string();
50
51 // Parse hostname and path: hostname/bucket[/prefix]
52 let path_parts: Vec<&str> = hostname_and_path.splitn(2, '/').collect();
53 if path_parts.len() != 2 {
54 return Err(crate::errors::BlahgError::ConfigS3UrlInvalid {
55 details: format!("Invalid S3 URL format - must include bucket: {}", url),
56 });
57 }
58
59 let hostname = path_parts[0].to_string();
60 let bucket_and_prefix = path_parts[1];
61
62 // Split bucket from optional prefix
63 let bucket_parts: Vec<&str> = bucket_and_prefix.splitn(2, '/').collect();
64 let bucket = bucket_parts[0].to_string();
65 let prefix = if bucket_parts.len() > 1 && !bucket_parts[1].is_empty() {
66 Some(bucket_parts[1].to_string())
67 } else {
68 None
69 };
70
71 let endpoint = if hostname.starts_with("http://") || hostname.starts_with("https://") {
72 hostname
73 } else {
74 format!("https://{}", hostname)
75 };
76
77 Ok((endpoint, access_key, secret_key, bucket, prefix))
78}
79
80/// Local filesystem implementation of content storage.
81#[derive(Debug, Clone)]
82pub struct FilesystemContentStorage {
83 base_dir: PathBuf,
84}
85
86impl FilesystemContentStorage {
87 /// Create a new filesystem content storage with the given base directory.
88 pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
89 let base_dir = base_dir.as_ref().to_path_buf();
90
91 // Ensure the base directory exists
92 fs::create_dir_all(&base_dir).await?;
93
94 Ok(Self { base_dir })
95 }
96
97 /// Get the file path for a given CID.
98 /// Uses a subdirectory structure based on the first few characters of the CID
99 /// to avoid having too many files in a single directory.
100 fn get_content_path(&self, cid: &str) -> PathBuf {
101 // Use first 2 characters for first level directory
102 // and next 2 characters for second level directory
103 let (dir1, dir2, filename) = if cid.len() >= 4 {
104 (&cid[0..2], &cid[2..4], cid)
105 } else if cid.len() >= 2 {
106 (&cid[0..2], "00", cid)
107 } else {
108 ("00", "00", cid)
109 };
110
111 self.base_dir.join(dir1).join(dir2).join(filename)
112 }
113}
114
115#[async_trait]
116impl ContentStorage for FilesystemContentStorage {
117 async fn content_exists(&self, cid: &str) -> Result<bool> {
118 let path = self.get_content_path(cid);
119 Ok(fs::try_exists(&path).await?)
120 }
121
122 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> {
123 let path = self.get_content_path(cid);
124
125 // Ensure parent directory exists
126 if let Some(parent) = path.parent() {
127 fs::create_dir_all(parent).await?;
128 }
129
130 // Write content atomically by writing to a temp file first
131 let temp_path = path.with_extension("tmp");
132 fs::write(&temp_path, data).await?;
133
134 // Rename temp file to final path (atomic on most filesystems)
135 fs::rename(&temp_path, &path).await?;
136
137 Ok(())
138 }
139
140 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> {
141 let path = self.get_content_path(cid);
142 Ok(fs::read(&path).await?)
143 }
144}
145
146#[cfg(feature = "s3")]
147/// S3-compatible object storage implementation for content storage.
148pub struct S3FileStorage {
149 client: MinioClient,
150 bucket: String,
151 prefix: Option<String>,
152}
153
154#[cfg(feature = "s3")]
155impl S3FileStorage {
156 /// Create a new S3FileStorage with the given credentials and bucket information
157 pub fn new(
158 endpoint: String,
159 access_key: String,
160 secret_key: String,
161 bucket: String,
162 prefix: Option<String>,
163 ) -> Result<Self> {
164 let base_url: BaseUrl = endpoint.parse().unwrap();
165 tracing::debug!(?base_url, "s3 file storage base url");
166
167 let static_provider = StaticProvider::new(&access_key, &secret_key, None);
168 tracing::debug!(?static_provider, "s3 file storage static provider");
169
170 let client = MinioClient::new(base_url, Some(Box::new(static_provider)), None, None)
171 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed {
172 operation: format!("Failed to create S3 client: {}", e),
173 })?;
174
175 Ok(Self {
176 client,
177 bucket,
178 prefix,
179 })
180 }
181
182 /// Get the full object key by combining prefix with path
183 fn get_object_key(&self, path: &str) -> String {
184 match &self.prefix {
185 Some(prefix) => {
186 if path.starts_with('/') {
187 format!("/{prefix}{path}")
188 } else {
189 format!("/{prefix}/{path}")
190 }
191 }
192 None => {
193 if path.starts_with('/') {
194 path.to_string()
195 } else {
196 format!("/{path}")
197 }
198 }
199 }
200 }
201}
202
203#[cfg(feature = "s3")]
204#[async_trait]
205impl ContentStorage for S3FileStorage {
206 async fn content_exists(&self, cid: &str) -> Result<bool> {
207 use minio::s3::error::ErrorCode;
208
209 let object_key = self.get_object_key(cid);
210
211 match self
212 .client
213 .stat_object(&self.bucket, &object_key)
214 .send()
215 .await
216 {
217 Ok(_) => Ok(true),
218 Err(minio::s3::error::Error::S3Error(ref s3_err))
219 if s3_err.code == ErrorCode::NoSuchKey =>
220 {
221 Ok(false)
222 }
223 Err(e) => Err(crate::errors::BlahgError::StorageFileOperationFailed {
224 operation: format!("Failed to check if S3 object exists: {}", e),
225 }),
226 }
227 }
228
229 async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()> {
230 use minio::s3::segmented_bytes::SegmentedBytes;
231
232 let object_key = self.get_object_key(cid);
233
234 let put_data = SegmentedBytes::from(bytes::Bytes::copy_from_slice(data));
235
236 self.client
237 .put_object(&self.bucket, &object_key, put_data)
238 .send()
239 .await
240 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed {
241 operation: format!("Failed to write S3 object: {}", e),
242 })?;
243
244 Ok(())
245 }
246
247 async fn read_content(&self, cid: &str) -> Result<Vec<u8>> {
248 let object_key = self.get_object_key(cid);
249
250 let response = self
251 .client
252 .get_object(&self.bucket, &object_key)
253 .send()
254 .await
255 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed {
256 operation: format!("Failed to read S3 object: {}", e),
257 })?;
258
259 let data = response
260 .content
261 .to_segmented_bytes()
262 .await
263 .map_err(|e| crate::errors::BlahgError::StorageFileOperationFailed {
264 operation: format!("Failed to read S3 object data: {}", e),
265 })?
266 .to_bytes();
267
268 Ok(data.to_vec())
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use tempfile::TempDir;
276
277 #[tokio::test]
278 async fn test_filesystem_content_storage() -> Result<()> {
279 let temp_dir = TempDir::new()?;
280 let storage = FilesystemContentStorage::new(temp_dir.path()).await?;
281
282 let cid = "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi";
283 let data = b"Hello, world!";
284
285 // Test content doesn't exist initially
286 assert!(!storage.content_exists(cid).await?);
287
288 // Write content
289 storage.write_content(cid, data).await?;
290
291 // Test content exists after writing
292 assert!(storage.content_exists(cid).await?);
293
294 // Read content and verify
295 let read_data = storage.read_content(cid).await?;
296 assert_eq!(read_data, data);
297
298 Ok(())
299 }
300
301 #[tokio::test]
302 async fn test_filesystem_path_structure() {
303 let temp_dir = TempDir::new().unwrap();
304 let storage = FilesystemContentStorage::new(temp_dir.path())
305 .await
306 .unwrap();
307
308 // Test normal CID
309 let path =
310 storage.get_content_path("bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi");
311 assert_eq!(
312 path.file_name().unwrap(),
313 "bafyreigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
314 );
315 assert!(path.parent().unwrap().ends_with("ba/fy"));
316
317 // Test short CID
318 let path = storage.get_content_path("ab");
319 assert_eq!(path.file_name().unwrap(), "ab");
320 assert!(path.parent().unwrap().ends_with("ab/00"));
321
322 // Test very short CID
323 let path = storage.get_content_path("a");
324 assert_eq!(path.file_name().unwrap(), "a");
325 assert!(path.parent().unwrap().ends_with("00/00"));
326 }
327}