Alternative ATProto PDS implementation
at oauth 287 lines 8.7 kB view raw
1//! File system implementation of blob storage 2//! Based on the S3 implementation but using local file system instead 3use anyhow::Result; 4use axum::body::Bytes; 5use cidv10::Cid; 6use rsky_common::get_random_str; 7use rsky_repo::error::BlobError; 8use std::path::PathBuf; 9use std::str::FromStr; 10use tokio::fs as async_fs; 11use tokio::io::AsyncWriteExt; 12use tracing::{debug, error, warn}; 13 14/// ByteStream implementation for blob data 15pub struct ByteStream { 16 pub bytes: Bytes, 17} 18 19impl ByteStream { 20 /// Create a new ByteStream with the given bytes 21 pub const fn new(bytes: Bytes) -> Self { 22 Self { bytes } 23 } 24 25 /// Collect the bytes from the stream 26 pub async fn collect(self) -> Result<Bytes> { 27 Ok(self.bytes) 28 } 29} 30 31/// Path information for moving a blob 32struct MoveObject { 33 from: PathBuf, 34 to: PathBuf, 35} 36 37/// File system implementation of blob storage 38pub struct BlobStoreFs { 39 /// Base directory for storing blobs 40 pub base_dir: PathBuf, 41 /// DID of the actor 42 pub did: String, 43} 44 45impl BlobStoreFs { 46 /// Create a new file system blob store for the given DID and base directory 47 pub const fn new(did: String, base_dir: PathBuf) -> Self { 48 Self { base_dir, did } 49 } 50 51 /// Create a factory function for blob stores 52 pub fn creator(base_dir: PathBuf) -> Box<dyn Fn(String) -> Self> { 53 let base_dir_clone = base_dir; 54 Box::new(move |did: String| Self::new(did, base_dir_clone.clone())) 55 } 56 57 /// Generate a random key for temporary storage 58 fn gen_key(&self) -> String { 59 get_random_str() 60 } 61 62 /// Get path to the temporary blob storage 63 fn get_tmp_path(&self, key: &str) -> PathBuf { 64 self.base_dir.join("tmp").join(&self.did).join(key) 65 } 66 67 /// Get path to the stored blob with appropriate sharding 68 fn get_stored_path(&self, cid: Cid) -> PathBuf { 69 let cid_str = cid.to_string(); 70 71 // Create two-level sharded structure based on CID 72 // First 10 chars for level 1, next 10 chars for level 2 73 let first_level = if cid_str.len() >= 10 { 74 &cid_str[0..10] 75 } else { 76 "short" 77 }; 78 79 let second_level = if cid_str.len() >= 20 { 80 &cid_str[10..20] 81 } else { 82 "short" 83 }; 84 85 self.base_dir 86 .join("blocks") 87 .join(&self.did) 88 .join(first_level) 89 .join(second_level) 90 .join(&cid_str) 91 } 92 93 /// Get path to the quarantined blob 94 fn get_quarantined_path(&self, cid: Cid) -> PathBuf { 95 let cid_str = cid.to_string(); 96 self.base_dir 97 .join("quarantine") 98 .join(&self.did) 99 .join(&cid_str) 100 } 101 102 /// Store a blob temporarily 103 pub async fn put_temp(&self, bytes: Bytes) -> Result<String> { 104 let key = self.gen_key(); 105 let temp_path = self.get_tmp_path(&key); 106 107 // Ensure the directory exists 108 if let Some(parent) = temp_path.parent() { 109 async_fs::create_dir_all(parent).await?; 110 } 111 112 // Write the temporary blob 113 let mut file = async_fs::File::create(&temp_path).await?; 114 file.write_all(&bytes).await?; 115 file.flush().await?; 116 117 debug!("Stored temp blob at: {:?}", temp_path); 118 Ok(key) 119 } 120 121 /// Make a temporary blob permanent by moving it to the blob store 122 pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> { 123 let already_has = self.has_stored(cid).await?; 124 125 if !already_has { 126 // Move the temporary blob to permanent storage 127 self.move_object(MoveObject { 128 from: self.get_tmp_path(&key), 129 to: self.get_stored_path(cid), 130 }) 131 .await?; 132 debug!("Moved temp blob to permanent: {} -> {}", key, cid); 133 } else { 134 // Already saved, so just delete the temp 135 let temp_path = self.get_tmp_path(&key); 136 if temp_path.exists() { 137 async_fs::remove_file(temp_path).await?; 138 debug!("Deleted temp blob as permanent already exists: {}", key); 139 } 140 } 141 142 Ok(()) 143 } 144 145 /// Store a blob directly as permanent 146 pub async fn put_permanent(&self, cid: Cid, bytes: Bytes) -> Result<()> { 147 let target_path = self.get_stored_path(cid); 148 149 // Ensure the directory exists 150 if let Some(parent) = target_path.parent() { 151 async_fs::create_dir_all(parent).await?; 152 } 153 154 // Write the blob 155 let mut file = async_fs::File::create(&target_path).await?; 156 file.write_all(&bytes).await?; 157 file.flush().await?; 158 159 debug!("Stored permanent blob: {}", cid); 160 Ok(()) 161 } 162 163 /// Quarantine a blob by moving it to the quarantine area 164 pub async fn quarantine(&self, cid: Cid) -> Result<()> { 165 self.move_object(MoveObject { 166 from: self.get_stored_path(cid), 167 to: self.get_quarantined_path(cid), 168 }) 169 .await?; 170 171 debug!("Quarantined blob: {}", cid); 172 Ok(()) 173 } 174 175 /// Unquarantine a blob by moving it back to regular storage 176 pub async fn unquarantine(&self, cid: Cid) -> Result<()> { 177 self.move_object(MoveObject { 178 from: self.get_quarantined_path(cid), 179 to: self.get_stored_path(cid), 180 }) 181 .await?; 182 183 debug!("Unquarantined blob: {}", cid); 184 Ok(()) 185 } 186 187 /// Get a blob as a stream 188 async fn get_object(&self, cid: Cid) -> Result<ByteStream> { 189 let blob_path = self.get_stored_path(cid); 190 191 match async_fs::read(&blob_path).await { 192 Ok(bytes) => Ok(ByteStream::new(Bytes::from(bytes))), 193 Err(e) => { 194 error!("Failed to read blob at path {:?}: {}", blob_path, e); 195 Err(anyhow::Error::new(BlobError::BlobNotFoundError)) 196 } 197 } 198 } 199 200 /// Get blob bytes 201 pub async fn get_bytes(&self, cid: Cid) -> Result<Bytes> { 202 let stream = self.get_object(cid).await?; 203 stream.collect().await 204 } 205 206 /// Get a blob as a stream 207 pub async fn get_stream(&self, cid: Cid) -> Result<ByteStream> { 208 self.get_object(cid).await 209 } 210 211 /// Delete a blob by CID string 212 pub async fn delete(&self, cid_str: String) -> Result<()> { 213 match Cid::from_str(&cid_str) { 214 Ok(cid) => self.delete_path(self.get_stored_path(cid)).await, 215 Err(e) => { 216 warn!("Invalid CID: {} - {}", cid_str, e); 217 Err(anyhow::anyhow!("Invalid CID: {}", e)) 218 } 219 } 220 } 221 222 /// Delete multiple blobs by CID 223 pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> { 224 let mut futures = Vec::with_capacity(cids.len()); 225 226 for cid in cids { 227 futures.push(self.delete_path(self.get_stored_path(cid))); 228 } 229 230 // Execute all delete operations concurrently 231 let results = futures::future::join_all(futures).await; 232 233 // Count errors but don't fail the operation 234 let error_count = results.iter().filter(|r| r.is_err()).count(); 235 if error_count > 0 { 236 warn!( 237 "{} errors occurred while deleting {} blobs", 238 error_count, 239 results.len() 240 ); 241 } 242 243 Ok(()) 244 } 245 246 /// Check if a blob is stored in the regular storage 247 pub async fn has_stored(&self, cid: Cid) -> Result<bool> { 248 let blob_path = self.get_stored_path(cid); 249 Ok(blob_path.exists()) 250 } 251 252 /// Check if a temporary blob exists 253 pub async fn has_temp(&self, key: String) -> Result<bool> { 254 let temp_path = self.get_tmp_path(&key); 255 Ok(temp_path.exists()) 256 } 257 258 /// Helper function to delete a file at the given path 259 async fn delete_path(&self, path: PathBuf) -> Result<()> { 260 if path.exists() { 261 async_fs::remove_file(&path).await?; 262 debug!("Deleted file at: {:?}", path); 263 Ok(()) 264 } else { 265 Err(anyhow::Error::new(BlobError::BlobNotFoundError)) 266 } 267 } 268 269 /// Move a blob from one path to another 270 async fn move_object(&self, mov: MoveObject) -> Result<()> { 271 // Ensure the source exists 272 if !mov.from.exists() { 273 return Err(anyhow::Error::new(BlobError::BlobNotFoundError)); 274 } 275 276 // Ensure the target directory exists 277 if let Some(parent) = mov.to.parent() { 278 async_fs::create_dir_all(parent).await?; 279 } 280 281 // Move the file 282 async_fs::rename(&mov.from, &mov.to).await?; 283 284 debug!("Moved blob: {:?} -> {:?}", mov.from, mov.to); 285 Ok(()) 286 } 287}