this repo has no description
1use async_trait::async_trait; 2use aws_config::BehaviorVersion; 3use aws_config::meta::region::RegionProviderChain; 4use aws_sdk_s3::Client; 5use aws_sdk_s3::primitives::ByteStream; 6use aws_sdk_s3::types::CompletedMultipartUpload; 7use aws_sdk_s3::types::CompletedPart; 8use bytes::Bytes; 9use futures::Stream; 10use sha2::{Digest, Sha256}; 11use std::pin::Pin; 12use thiserror::Error; 13 14const MIN_PART_SIZE: usize = 5 * 1024 * 1024; 15 16#[derive(Error, Debug)] 17pub enum StorageError { 18 #[error("IO error: {0}")] 19 Io(#[from] std::io::Error), 20 #[error("S3 error: {0}")] 21 S3(String), 22 #[error("Other: {0}")] 23 Other(String), 24} 25 26pub struct StreamUploadResult { 27 pub sha256_hash: [u8; 32], 28 pub size: u64, 29} 30 31#[async_trait] 32pub trait BlobStorage: Send + Sync { 33 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>; 34 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 35 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 36 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 37 async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError>; 38 async fn delete(&self, key: &str) -> Result<(), StorageError>; 39 async fn put_stream( 40 &self, 41 key: &str, 42 stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>, 43 ) -> Result<StreamUploadResult, StorageError>; 44 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError>; 45} 46 47pub struct S3BlobStorage { 48 client: Client, 49 bucket: String, 50} 51 52impl S3BlobStorage { 53 pub async fn new() -> Self { 54 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set"); 55 let client = create_s3_client().await; 56 Self { client, bucket } 57 } 58} 59 60async fn create_s3_client() -> Client { 61 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); 62 63 let config = aws_config::defaults(BehaviorVersion::latest()) 64 .region(region_provider) 65 .load() 66 .await; 67 68 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") { 69 let s3_config = aws_sdk_s3::config::Builder::from(&config) 70 .endpoint_url(endpoint) 71 .force_path_style(true) 72 .build(); 73 Client::from_conf(s3_config) 74 } else { 75 Client::new(&config) 76 } 77} 78 79pub struct BackupStorage { 80 client: Client, 81 bucket: String, 82} 83 84impl BackupStorage { 85 pub async fn new() -> Option<Self> { 86 let backup_enabled = std::env::var("BACKUP_ENABLED") 87 .map(|v| v != "false" && v != "0") 88 .unwrap_or(true); 89 90 if !backup_enabled { 91 return None; 92 } 93 94 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?; 95 let client = create_s3_client().await; 96 Some(Self { client, bucket }) 97 } 98 99 pub fn retention_count() -> u32 { 100 std::env::var("BACKUP_RETENTION_COUNT") 101 .ok() 102 .and_then(|v| v.parse().ok()) 103 .unwrap_or(7) 104 } 105 106 pub fn interval_secs() -> u64 { 107 std::env::var("BACKUP_INTERVAL_SECS") 108 .ok() 109 .and_then(|v| v.parse().ok()) 110 .unwrap_or(86400) 111 } 112 113 pub async fn put_backup( 114 &self, 115 did: &str, 116 rev: &str, 117 data: &[u8], 118 ) -> Result<String, StorageError> { 119 let key = format!("{}/{}.car", did, rev); 120 self.client 121 .put_object() 122 .bucket(&self.bucket) 123 .key(&key) 124 .body(ByteStream::from(Bytes::copy_from_slice(data))) 125 .send() 126 .await 127 .map_err(|e| { 128 crate::metrics::record_s3_operation("backup_put", "error"); 129 StorageError::S3(e.to_string()) 130 })?; 131 132 crate::metrics::record_s3_operation("backup_put", "success"); 133 Ok(key) 134 } 135 136 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> { 137 let resp = self 138 .client 139 .get_object() 140 .bucket(&self.bucket) 141 .key(storage_key) 142 .send() 143 .await 144 .map_err(|e| { 145 crate::metrics::record_s3_operation("backup_get", "error"); 146 StorageError::S3(e.to_string()) 147 })?; 148 149 let data = resp 150 .body 151 .collect() 152 .await 153 .map_err(|e| { 154 crate::metrics::record_s3_operation("backup_get", "error"); 155 StorageError::S3(e.to_string()) 156 })? 157 .into_bytes(); 158 159 crate::metrics::record_s3_operation("backup_get", "success"); 160 Ok(data) 161 } 162 163 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> { 164 self.client 165 .delete_object() 166 .bucket(&self.bucket) 167 .key(storage_key) 168 .send() 169 .await 170 .map_err(|e| { 171 crate::metrics::record_s3_operation("backup_delete", "error"); 172 StorageError::S3(e.to_string()) 173 })?; 174 175 crate::metrics::record_s3_operation("backup_delete", "success"); 176 Ok(()) 177 } 178} 179 180#[async_trait] 181impl BlobStorage for S3BlobStorage { 182 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> { 183 self.put_bytes(key, Bytes::copy_from_slice(data)).await 184 } 185 186 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> { 187 let result = self 188 .client 189 .put_object() 190 .bucket(&self.bucket) 191 .key(key) 192 .body(ByteStream::from(data)) 193 .send() 194 .await 195 .map_err(|e| StorageError::S3(e.to_string())); 196 197 match &result { 198 Ok(_) => crate::metrics::record_s3_operation("put", "success"), 199 Err(_) => crate::metrics::record_s3_operation("put", "error"), 200 } 201 202 result?; 203 Ok(()) 204 } 205 206 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> { 207 self.get_bytes(key).await.map(|b| b.to_vec()) 208 } 209 210 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> { 211 let resp = self 212 .client 213 .get_object() 214 .bucket(&self.bucket) 215 .key(key) 216 .send() 217 .await 218 .map_err(|e| { 219 crate::metrics::record_s3_operation("get", "error"); 220 StorageError::S3(e.to_string()) 221 })?; 222 223 let data = resp 224 .body 225 .collect() 226 .await 227 .map_err(|e| { 228 crate::metrics::record_s3_operation("get", "error"); 229 StorageError::S3(e.to_string()) 230 })? 231 .into_bytes(); 232 233 crate::metrics::record_s3_operation("get", "success"); 234 Ok(data) 235 } 236 237 async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError> { 238 let range = format!("bytes=0-{}", size.saturating_sub(1)); 239 let resp = self 240 .client 241 .get_object() 242 .bucket(&self.bucket) 243 .key(key) 244 .range(range) 245 .send() 246 .await 247 .map_err(|e| { 248 crate::metrics::record_s3_operation("get_head", "error"); 249 StorageError::S3(e.to_string()) 250 })?; 251 252 let data = resp 253 .body 254 .collect() 255 .await 256 .map_err(|e| { 257 crate::metrics::record_s3_operation("get_head", "error"); 258 StorageError::S3(e.to_string()) 259 })? 260 .into_bytes(); 261 262 crate::metrics::record_s3_operation("get_head", "success"); 263 Ok(data) 264 } 265 266 async fn delete(&self, key: &str) -> Result<(), StorageError> { 267 let result = self 268 .client 269 .delete_object() 270 .bucket(&self.bucket) 271 .key(key) 272 .send() 273 .await 274 .map_err(|e| StorageError::S3(e.to_string())); 275 276 match &result { 277 Ok(_) => crate::metrics::record_s3_operation("delete", "success"), 278 Err(_) => crate::metrics::record_s3_operation("delete", "error"), 279 } 280 281 result?; 282 Ok(()) 283 } 284 285 async fn put_stream( 286 &self, 287 key: &str, 288 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>, 289 ) -> Result<StreamUploadResult, StorageError> { 290 use futures::StreamExt; 291 292 let create_resp = self 293 .client 294 .create_multipart_upload() 295 .bucket(&self.bucket) 296 .key(key) 297 .send() 298 .await 299 .map_err(|e| StorageError::S3(format!("Failed to create multipart upload: {}", e)))?; 300 301 let upload_id = create_resp 302 .upload_id() 303 .ok_or_else(|| StorageError::S3("No upload ID returned".to_string()))? 304 .to_string(); 305 306 let mut hasher = Sha256::new(); 307 let mut total_size: u64 = 0; 308 let mut part_number = 1; 309 let mut completed_parts: Vec<CompletedPart> = Vec::new(); 310 let mut buffer = Vec::with_capacity(MIN_PART_SIZE); 311 312 let upload_part = |client: &Client, 313 bucket: &str, 314 key: &str, 315 upload_id: &str, 316 part_num: i32, 317 data: Vec<u8>| 318 -> std::pin::Pin< 319 Box<dyn std::future::Future<Output = Result<CompletedPart, StorageError>> + Send>, 320 > { 321 let client = client.clone(); 322 let bucket = bucket.to_string(); 323 let key = key.to_string(); 324 let upload_id = upload_id.to_string(); 325 Box::pin(async move { 326 let resp = client 327 .upload_part() 328 .bucket(&bucket) 329 .key(&key) 330 .upload_id(&upload_id) 331 .part_number(part_num) 332 .body(ByteStream::from(data)) 333 .send() 334 .await 335 .map_err(|e| StorageError::S3(format!("Failed to upload part: {}", e)))?; 336 337 let etag = resp 338 .e_tag() 339 .ok_or_else(|| StorageError::S3("No ETag returned for part".to_string()))? 340 .to_string(); 341 342 Ok(CompletedPart::builder() 343 .part_number(part_num) 344 .e_tag(etag) 345 .build()) 346 }) 347 }; 348 349 loop { 350 match stream.next().await { 351 Some(Ok(chunk)) => { 352 hasher.update(&chunk); 353 total_size += chunk.len() as u64; 354 buffer.extend_from_slice(&chunk); 355 356 if buffer.len() >= MIN_PART_SIZE { 357 let part_data = 358 std::mem::replace(&mut buffer, Vec::with_capacity(MIN_PART_SIZE)); 359 let part = upload_part( 360 &self.client, 361 &self.bucket, 362 key, 363 &upload_id, 364 part_number, 365 part_data, 366 ) 367 .await?; 368 completed_parts.push(part); 369 part_number += 1; 370 } 371 } 372 Some(Err(e)) => { 373 let _ = self 374 .client 375 .abort_multipart_upload() 376 .bucket(&self.bucket) 377 .key(key) 378 .upload_id(&upload_id) 379 .send() 380 .await; 381 return Err(StorageError::Io(e)); 382 } 383 None => break, 384 } 385 } 386 387 if !buffer.is_empty() { 388 let part = upload_part( 389 &self.client, 390 &self.bucket, 391 key, 392 &upload_id, 393 part_number, 394 buffer, 395 ) 396 .await?; 397 completed_parts.push(part); 398 } 399 400 if completed_parts.is_empty() { 401 let _ = self 402 .client 403 .abort_multipart_upload() 404 .bucket(&self.bucket) 405 .key(key) 406 .upload_id(&upload_id) 407 .send() 408 .await; 409 return Err(StorageError::Other("Empty upload".to_string())); 410 } 411 412 let completed_upload = CompletedMultipartUpload::builder() 413 .set_parts(Some(completed_parts)) 414 .build(); 415 416 self.client 417 .complete_multipart_upload() 418 .bucket(&self.bucket) 419 .key(key) 420 .upload_id(&upload_id) 421 .multipart_upload(completed_upload) 422 .send() 423 .await 424 .map_err(|e| StorageError::S3(format!("Failed to complete multipart upload: {}", e)))?; 425 426 crate::metrics::record_s3_operation("put_stream", "success"); 427 428 let hash: [u8; 32] = hasher.finalize().into(); 429 Ok(StreamUploadResult { 430 sha256_hash: hash, 431 size: total_size, 432 }) 433 } 434 435 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> { 436 let copy_source = format!("{}/{}", self.bucket, src_key); 437 438 self.client 439 .copy_object() 440 .bucket(&self.bucket) 441 .copy_source(&copy_source) 442 .key(dst_key) 443 .send() 444 .await 445 .map_err(|e| StorageError::S3(format!("Failed to copy object: {}", e)))?; 446 447 crate::metrics::record_s3_operation("copy", "success"); 448 Ok(()) 449 } 450}