Alternative ATProto PDS implementation
1//! File system implementation of blob storage
2//! Based on the S3 implementation but using local file system instead
3use anyhow::Result;
4use axum::body::Bytes;
5use cidv10::Cid;
6use rsky_common::get_random_str;
7use rsky_repo::error::BlobError;
8use std::path::PathBuf;
9use std::str::FromStr;
10use tokio::fs as async_fs;
11use tokio::io::AsyncWriteExt;
12use tracing::{debug, error, warn};
13
14/// ByteStream implementation for blob data
15pub struct ByteStream {
16 pub bytes: Bytes,
17}
18
19impl ByteStream {
20 /// Create a new ByteStream with the given bytes
21 pub const fn new(bytes: Bytes) -> Self {
22 Self { bytes }
23 }
24
25 /// Collect the bytes from the stream
26 pub async fn collect(self) -> Result<Bytes> {
27 Ok(self.bytes)
28 }
29}
30
31/// Path information for moving a blob
32struct MoveObject {
33 from: PathBuf,
34 to: PathBuf,
35}
36
37/// File system implementation of blob storage
38pub struct BlobStoreFs {
39 /// Base directory for storing blobs
40 pub base_dir: PathBuf,
41 /// DID of the actor
42 pub did: String,
43}
44
45impl BlobStoreFs {
46 /// Create a new file system blob store for the given DID and base directory
47 pub const fn new(did: String, base_dir: PathBuf) -> Self {
48 Self { base_dir, did }
49 }
50
51 /// Create a factory function for blob stores
52 pub fn creator(base_dir: PathBuf) -> Box<dyn Fn(String) -> Self> {
53 let base_dir_clone = base_dir;
54 Box::new(move |did: String| Self::new(did, base_dir_clone.clone()))
55 }
56
57 /// Generate a random key for temporary storage
58 fn gen_key(&self) -> String {
59 get_random_str()
60 }
61
62 /// Get path to the temporary blob storage
63 fn get_tmp_path(&self, key: &str) -> PathBuf {
64 self.base_dir.join("tmp").join(&self.did).join(key)
65 }
66
67 /// Get path to the stored blob with appropriate sharding
68 fn get_stored_path(&self, cid: Cid) -> PathBuf {
69 let cid_str = cid.to_string();
70
71 // Create two-level sharded structure based on CID
72 // First 10 chars for level 1, next 10 chars for level 2
73 let first_level = if cid_str.len() >= 10 {
74 &cid_str[0..10]
75 } else {
76 "short"
77 };
78
79 let second_level = if cid_str.len() >= 20 {
80 &cid_str[10..20]
81 } else {
82 "short"
83 };
84
85 self.base_dir
86 .join("blocks")
87 .join(&self.did)
88 .join(first_level)
89 .join(second_level)
90 .join(&cid_str)
91 }
92
93 /// Get path to the quarantined blob
94 fn get_quarantined_path(&self, cid: Cid) -> PathBuf {
95 let cid_str = cid.to_string();
96 self.base_dir
97 .join("quarantine")
98 .join(&self.did)
99 .join(&cid_str)
100 }
101
102 /// Store a blob temporarily
103 pub async fn put_temp(&self, bytes: Bytes) -> Result<String> {
104 let key = self.gen_key();
105 let temp_path = self.get_tmp_path(&key);
106
107 // Ensure the directory exists
108 if let Some(parent) = temp_path.parent() {
109 async_fs::create_dir_all(parent).await?;
110 }
111
112 // Write the temporary blob
113 let mut file = async_fs::File::create(&temp_path).await?;
114 file.write_all(&bytes).await?;
115 file.flush().await?;
116
117 debug!("Stored temp blob at: {:?}", temp_path);
118 Ok(key)
119 }
120
121 /// Make a temporary blob permanent by moving it to the blob store
122 pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> {
123 let already_has = self.has_stored(cid).await?;
124
125 if !already_has {
126 // Move the temporary blob to permanent storage
127 self.move_object(MoveObject {
128 from: self.get_tmp_path(&key),
129 to: self.get_stored_path(cid),
130 })
131 .await?;
132 debug!("Moved temp blob to permanent: {} -> {}", key, cid);
133 } else {
134 // Already saved, so just delete the temp
135 let temp_path = self.get_tmp_path(&key);
136 if temp_path.exists() {
137 async_fs::remove_file(temp_path).await?;
138 debug!("Deleted temp blob as permanent already exists: {}", key);
139 }
140 }
141
142 Ok(())
143 }
144
145 /// Store a blob directly as permanent
146 pub async fn put_permanent(&self, cid: Cid, bytes: Bytes) -> Result<()> {
147 let target_path = self.get_stored_path(cid);
148
149 // Ensure the directory exists
150 if let Some(parent) = target_path.parent() {
151 async_fs::create_dir_all(parent).await?;
152 }
153
154 // Write the blob
155 let mut file = async_fs::File::create(&target_path).await?;
156 file.write_all(&bytes).await?;
157 file.flush().await?;
158
159 debug!("Stored permanent blob: {}", cid);
160 Ok(())
161 }
162
163 /// Quarantine a blob by moving it to the quarantine area
164 pub async fn quarantine(&self, cid: Cid) -> Result<()> {
165 self.move_object(MoveObject {
166 from: self.get_stored_path(cid),
167 to: self.get_quarantined_path(cid),
168 })
169 .await?;
170
171 debug!("Quarantined blob: {}", cid);
172 Ok(())
173 }
174
175 /// Unquarantine a blob by moving it back to regular storage
176 pub async fn unquarantine(&self, cid: Cid) -> Result<()> {
177 self.move_object(MoveObject {
178 from: self.get_quarantined_path(cid),
179 to: self.get_stored_path(cid),
180 })
181 .await?;
182
183 debug!("Unquarantined blob: {}", cid);
184 Ok(())
185 }
186
187 /// Get a blob as a stream
188 async fn get_object(&self, cid: Cid) -> Result<ByteStream> {
189 let blob_path = self.get_stored_path(cid);
190
191 match async_fs::read(&blob_path).await {
192 Ok(bytes) => Ok(ByteStream::new(Bytes::from(bytes))),
193 Err(e) => {
194 error!("Failed to read blob at path {:?}: {}", blob_path, e);
195 Err(anyhow::Error::new(BlobError::BlobNotFoundError))
196 }
197 }
198 }
199
200 /// Get blob bytes
201 pub async fn get_bytes(&self, cid: Cid) -> Result<Bytes> {
202 let stream = self.get_object(cid).await?;
203 stream.collect().await
204 }
205
206 /// Get a blob as a stream
207 pub async fn get_stream(&self, cid: Cid) -> Result<ByteStream> {
208 self.get_object(cid).await
209 }
210
211 /// Delete a blob by CID string
212 pub async fn delete(&self, cid_str: String) -> Result<()> {
213 match Cid::from_str(&cid_str) {
214 Ok(cid) => self.delete_path(self.get_stored_path(cid)).await,
215 Err(e) => {
216 warn!("Invalid CID: {} - {}", cid_str, e);
217 Err(anyhow::anyhow!("Invalid CID: {}", e))
218 }
219 }
220 }
221
222 /// Delete multiple blobs by CID
223 pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
224 let mut futures = Vec::with_capacity(cids.len());
225
226 for cid in cids {
227 futures.push(self.delete_path(self.get_stored_path(cid)));
228 }
229
230 // Execute all delete operations concurrently
231 let results = futures::future::join_all(futures).await;
232
233 // Count errors but don't fail the operation
234 let error_count = results.iter().filter(|r| r.is_err()).count();
235 if error_count > 0 {
236 warn!(
237 "{} errors occurred while deleting {} blobs",
238 error_count,
239 results.len()
240 );
241 }
242
243 Ok(())
244 }
245
246 /// Check if a blob is stored in the regular storage
247 pub async fn has_stored(&self, cid: Cid) -> Result<bool> {
248 let blob_path = self.get_stored_path(cid);
249 Ok(blob_path.exists())
250 }
251
252 /// Check if a temporary blob exists
253 pub async fn has_temp(&self, key: String) -> Result<bool> {
254 let temp_path = self.get_tmp_path(&key);
255 Ok(temp_path.exists())
256 }
257
258 /// Helper function to delete a file at the given path
259 async fn delete_path(&self, path: PathBuf) -> Result<()> {
260 if path.exists() {
261 async_fs::remove_file(&path).await?;
262 debug!("Deleted file at: {:?}", path);
263 Ok(())
264 } else {
265 Err(anyhow::Error::new(BlobError::BlobNotFoundError))
266 }
267 }
268
269 /// Move a blob from one path to another
270 async fn move_object(&self, mov: MoveObject) -> Result<()> {
271 // Ensure the source exists
272 if !mov.from.exists() {
273 return Err(anyhow::Error::new(BlobError::BlobNotFoundError));
274 }
275
276 // Ensure the target directory exists
277 if let Some(parent) = mov.to.parent() {
278 async_fs::create_dir_all(parent).await?;
279 }
280
281 // Move the file
282 async_fs::rename(&mov.from, &mov.to).await?;
283
284 debug!("Moved blob: {:?} -> {:?}", mov.from, mov.to);
285 Ok(())
286 }
287}