this repo has no description
1use async_trait::async_trait; 2use aws_config::BehaviorVersion; 3use aws_config::meta::region::RegionProviderChain; 4use aws_sdk_s3::Client; 5use aws_sdk_s3::primitives::ByteStream; 6use aws_sdk_s3::types::CompletedMultipartUpload; 7use aws_sdk_s3::types::CompletedPart; 8use bytes::Bytes; 9use futures::Stream; 10use sha2::{Digest, Sha256}; 11use std::pin::Pin; 12use thiserror::Error; 13 14const MIN_PART_SIZE: usize = 5 * 1024 * 1024; 15 16#[derive(Error, Debug)] 17pub enum StorageError { 18 #[error("IO error: {0}")] 19 Io(#[from] std::io::Error), 20 #[error("S3 error: {0}")] 21 S3(String), 22 #[error("Other: {0}")] 23 Other(String), 24} 25 26pub struct StreamUploadResult { 27 pub sha256_hash: [u8; 32], 28 pub size: u64, 29} 30 31#[async_trait] 32pub trait BlobStorage: Send + Sync { 33 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>; 34 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 35 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 36 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 37 async fn delete(&self, key: &str) -> Result<(), StorageError>; 38 async fn put_stream( 39 &self, 40 key: &str, 41 stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>, 42 ) -> Result<StreamUploadResult, StorageError>; 43 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError>; 44} 45 46pub struct S3BlobStorage { 47 client: Client, 48 bucket: String, 49} 50 51impl S3BlobStorage { 52 pub async fn new() -> Self { 53 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set"); 54 let client = create_s3_client().await; 55 Self { client, bucket } 56 } 57} 58 59async fn create_s3_client() -> Client { 60 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); 61 62 let config = aws_config::defaults(BehaviorVersion::latest()) 63 .region(region_provider) 64 .load() 65 .await; 66 67 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") { 68 let s3_config = aws_sdk_s3::config::Builder::from(&config) 69 .endpoint_url(endpoint) 70 .force_path_style(true) 71 .build(); 72 Client::from_conf(s3_config) 73 } else { 74 Client::new(&config) 75 } 76} 77 78pub struct BackupStorage { 79 client: Client, 80 bucket: String, 81} 82 83impl BackupStorage { 84 pub async fn new() -> Option<Self> { 85 let backup_enabled = std::env::var("BACKUP_ENABLED") 86 .map(|v| v != "false" && v != "0") 87 .unwrap_or(true); 88 89 if !backup_enabled { 90 return None; 91 } 92 93 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?; 94 let client = create_s3_client().await; 95 Some(Self { client, bucket }) 96 } 97 98 pub fn retention_count() -> u32 { 99 std::env::var("BACKUP_RETENTION_COUNT") 100 .ok() 101 .and_then(|v| v.parse().ok()) 102 .unwrap_or(7) 103 } 104 105 pub fn interval_secs() -> u64 { 106 std::env::var("BACKUP_INTERVAL_SECS") 107 .ok() 108 .and_then(|v| v.parse().ok()) 109 .unwrap_or(86400) 110 } 111 112 pub async fn put_backup( 113 &self, 114 did: &str, 115 rev: &str, 116 data: &[u8], 117 ) -> Result<String, StorageError> { 118 let key = format!("{}/{}.car", did, rev); 119 self.client 120 .put_object() 121 .bucket(&self.bucket) 122 .key(&key) 123 .body(ByteStream::from(Bytes::copy_from_slice(data))) 124 .send() 125 .await 126 .map_err(|e| { 127 crate::metrics::record_s3_operation("backup_put", "error"); 128 StorageError::S3(e.to_string()) 129 })?; 130 131 crate::metrics::record_s3_operation("backup_put", "success"); 132 Ok(key) 133 } 134 135 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> { 136 let resp = self 137 .client 138 .get_object() 139 .bucket(&self.bucket) 140 .key(storage_key) 141 .send() 142 .await 143 .map_err(|e| { 144 crate::metrics::record_s3_operation("backup_get", "error"); 145 StorageError::S3(e.to_string()) 146 })?; 147 148 let data = resp 149 .body 150 .collect() 151 .await 152 .map_err(|e| { 153 crate::metrics::record_s3_operation("backup_get", "error"); 154 StorageError::S3(e.to_string()) 155 })? 156 .into_bytes(); 157 158 crate::metrics::record_s3_operation("backup_get", "success"); 159 Ok(data) 160 } 161 162 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> { 163 self.client 164 .delete_object() 165 .bucket(&self.bucket) 166 .key(storage_key) 167 .send() 168 .await 169 .map_err(|e| { 170 crate::metrics::record_s3_operation("backup_delete", "error"); 171 StorageError::S3(e.to_string()) 172 })?; 173 174 crate::metrics::record_s3_operation("backup_delete", "success"); 175 Ok(()) 176 } 177} 178 179#[async_trait] 180impl BlobStorage for S3BlobStorage { 181 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> { 182 self.put_bytes(key, Bytes::copy_from_slice(data)).await 183 } 184 185 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> { 186 let result = self 187 .client 188 .put_object() 189 .bucket(&self.bucket) 190 .key(key) 191 .body(ByteStream::from(data)) 192 .send() 193 .await 194 .map_err(|e| StorageError::S3(e.to_string())); 195 196 match &result { 197 Ok(_) => crate::metrics::record_s3_operation("put", "success"), 198 Err(_) => crate::metrics::record_s3_operation("put", "error"), 199 } 200 201 result?; 202 Ok(()) 203 } 204 205 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> { 206 self.get_bytes(key).await.map(|b| b.to_vec()) 207 } 208 209 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> { 210 let resp = self 211 .client 212 .get_object() 213 .bucket(&self.bucket) 214 .key(key) 215 .send() 216 .await 217 .map_err(|e| { 218 crate::metrics::record_s3_operation("get", "error"); 219 StorageError::S3(e.to_string()) 220 })?; 221 222 let data = resp 223 .body 224 .collect() 225 .await 226 .map_err(|e| { 227 crate::metrics::record_s3_operation("get", "error"); 228 StorageError::S3(e.to_string()) 229 })? 230 .into_bytes(); 231 232 crate::metrics::record_s3_operation("get", "success"); 233 Ok(data) 234 } 235 236 async fn delete(&self, key: &str) -> Result<(), StorageError> { 237 let result = self 238 .client 239 .delete_object() 240 .bucket(&self.bucket) 241 .key(key) 242 .send() 243 .await 244 .map_err(|e| StorageError::S3(e.to_string())); 245 246 match &result { 247 Ok(_) => crate::metrics::record_s3_operation("delete", "success"), 248 Err(_) => crate::metrics::record_s3_operation("delete", "error"), 249 } 250 251 result?; 252 Ok(()) 253 } 254 255 async fn put_stream( 256 &self, 257 key: &str, 258 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>, 259 ) -> Result<StreamUploadResult, StorageError> { 260 use futures::StreamExt; 261 262 let create_resp = self 263 .client 264 .create_multipart_upload() 265 .bucket(&self.bucket) 266 .key(key) 267 .send() 268 .await 269 .map_err(|e| StorageError::S3(format!("Failed to create multipart upload: {}", e)))?; 270 271 let upload_id = create_resp 272 .upload_id() 273 .ok_or_else(|| StorageError::S3("No upload ID returned".to_string()))? 274 .to_string(); 275 276 let mut hasher = Sha256::new(); 277 let mut total_size: u64 = 0; 278 let mut part_number = 1; 279 let mut completed_parts: Vec<CompletedPart> = Vec::new(); 280 let mut buffer = Vec::with_capacity(MIN_PART_SIZE); 281 282 let upload_part = |client: &Client, 283 bucket: &str, 284 key: &str, 285 upload_id: &str, 286 part_num: i32, 287 data: Vec<u8>| 288 -> std::pin::Pin< 289 Box<dyn std::future::Future<Output = Result<CompletedPart, StorageError>> + Send>, 290 > { 291 let client = client.clone(); 292 let bucket = bucket.to_string(); 293 let key = key.to_string(); 294 let upload_id = upload_id.to_string(); 295 Box::pin(async move { 296 let resp = client 297 .upload_part() 298 .bucket(&bucket) 299 .key(&key) 300 .upload_id(&upload_id) 301 .part_number(part_num) 302 .body(ByteStream::from(data)) 303 .send() 304 .await 305 .map_err(|e| StorageError::S3(format!("Failed to upload part: {}", e)))?; 306 307 let etag = resp 308 .e_tag() 309 .ok_or_else(|| StorageError::S3("No ETag returned for part".to_string()))? 310 .to_string(); 311 312 Ok(CompletedPart::builder() 313 .part_number(part_num) 314 .e_tag(etag) 315 .build()) 316 }) 317 }; 318 319 loop { 320 match stream.next().await { 321 Some(Ok(chunk)) => { 322 hasher.update(&chunk); 323 total_size += chunk.len() as u64; 324 buffer.extend_from_slice(&chunk); 325 326 if buffer.len() >= MIN_PART_SIZE { 327 let part_data = 328 std::mem::replace(&mut buffer, Vec::with_capacity(MIN_PART_SIZE)); 329 let part = upload_part( 330 &self.client, 331 &self.bucket, 332 key, 333 &upload_id, 334 part_number, 335 part_data, 336 ) 337 .await?; 338 completed_parts.push(part); 339 part_number += 1; 340 } 341 } 342 Some(Err(e)) => { 343 let _ = self 344 .client 345 .abort_multipart_upload() 346 .bucket(&self.bucket) 347 .key(key) 348 .upload_id(&upload_id) 349 .send() 350 .await; 351 return Err(StorageError::Io(e)); 352 } 353 None => break, 354 } 355 } 356 357 if !buffer.is_empty() { 358 let part = upload_part( 359 &self.client, 360 &self.bucket, 361 key, 362 &upload_id, 363 part_number, 364 buffer, 365 ) 366 .await?; 367 completed_parts.push(part); 368 } 369 370 if completed_parts.is_empty() { 371 let _ = self 372 .client 373 .abort_multipart_upload() 374 .bucket(&self.bucket) 375 .key(key) 376 .upload_id(&upload_id) 377 .send() 378 .await; 379 return Err(StorageError::Other("Empty upload".to_string())); 380 } 381 382 let completed_upload = CompletedMultipartUpload::builder() 383 .set_parts(Some(completed_parts)) 384 .build(); 385 386 self.client 387 .complete_multipart_upload() 388 .bucket(&self.bucket) 389 .key(key) 390 .upload_id(&upload_id) 391 .multipart_upload(completed_upload) 392 .send() 393 .await 394 .map_err(|e| StorageError::S3(format!("Failed to complete multipart upload: {}", e)))?; 395 396 crate::metrics::record_s3_operation("put_stream", "success"); 397 398 let hash: [u8; 32] = hasher.finalize().into(); 399 Ok(StreamUploadResult { 400 sha256_hash: hash, 401 size: total_size, 402 }) 403 } 404 405 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> { 406 let copy_source = format!("{}/{}", self.bucket, src_key); 407 408 self.client 409 .copy_object() 410 .bucket(&self.bucket) 411 .copy_source(&copy_source) 412 .key(dst_key) 413 .send() 414 .await 415 .map_err(|e| StorageError::S3(format!("Failed to copy object: {}", e)))?; 416 417 crate::metrics::record_s3_operation("copy", "success"); 418 Ok(()) 419 } 420}