this repo has no description
1pub use tranquil_infra::{BlobStorage, StorageError, StreamUploadResult}; 2 3use async_trait::async_trait; 4use aws_config::BehaviorVersion; 5use aws_config::meta::region::RegionProviderChain; 6use aws_sdk_s3::Client; 7use aws_sdk_s3::primitives::ByteStream; 8use aws_sdk_s3::types::CompletedMultipartUpload; 9use aws_sdk_s3::types::CompletedPart; 10use bytes::Bytes; 11use futures::Stream; 12use sha2::{Digest, Sha256}; 13use std::pin::Pin; 14 15const MIN_PART_SIZE: usize = 5 * 1024 * 1024; 16 17pub struct S3BlobStorage { 18 client: Client, 19 bucket: String, 20} 21 22impl S3BlobStorage { 23 pub async fn new() -> Self { 24 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set"); 25 let client = create_s3_client().await; 26 Self { client, bucket } 27 } 28 29 pub async fn with_bucket(bucket: String) -> Self { 30 let client = create_s3_client().await; 31 Self { client, bucket } 32 } 33} 34 35async fn create_s3_client() -> Client { 36 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); 37 38 let config = aws_config::defaults(BehaviorVersion::latest()) 39 .region(region_provider) 40 .load() 41 .await; 42 43 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") { 44 let s3_config = aws_sdk_s3::config::Builder::from(&config) 45 .endpoint_url(endpoint) 46 .force_path_style(true) 47 .build(); 48 Client::from_conf(s3_config) 49 } else { 50 Client::new(&config) 51 } 52} 53 54pub struct BackupStorage { 55 client: Client, 56 bucket: String, 57} 58 59impl BackupStorage { 60 pub async fn new() -> Option<Self> { 61 let backup_enabled = std::env::var("BACKUP_ENABLED") 62 .map(|v| v != "false" && v != "0") 63 .unwrap_or(true); 64 65 if !backup_enabled { 66 return None; 67 } 68 69 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?; 70 let client = create_s3_client().await; 71 Some(Self { client, bucket }) 72 } 73 74 pub fn retention_count() -> u32 { 75 std::env::var("BACKUP_RETENTION_COUNT") 76 .ok() 77 .and_then(|v| v.parse().ok()) 78 .unwrap_or(7) 79 } 80 81 pub fn interval_secs() -> u64 { 82 std::env::var("BACKUP_INTERVAL_SECS") 83 .ok() 84 .and_then(|v| v.parse().ok()) 85 .unwrap_or(86400) 86 } 87 88 pub async fn put_backup( 89 &self, 90 did: &str, 91 rev: &str, 92 data: &[u8], 93 ) -> Result<String, StorageError> { 94 let key = format!("{}/{}.car", did, rev); 95 self.client 96 .put_object() 97 .bucket(&self.bucket) 98 .key(&key) 99 .body(ByteStream::from(Bytes::copy_from_slice(data))) 100 .send() 101 .await 102 .map_err(|e| StorageError::S3(e.to_string()))?; 103 104 Ok(key) 105 } 106 107 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> { 108 let resp = self 109 .client 110 .get_object() 111 .bucket(&self.bucket) 112 .key(storage_key) 113 .send() 114 .await 115 .map_err(|e| StorageError::S3(e.to_string()))?; 116 117 let data = resp 118 .body 119 .collect() 120 .await 121 .map_err(|e| StorageError::S3(e.to_string()))? 122 .into_bytes(); 123 124 Ok(data) 125 } 126 127 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> { 128 self.client 129 .delete_object() 130 .bucket(&self.bucket) 131 .key(storage_key) 132 .send() 133 .await 134 .map_err(|e| StorageError::S3(e.to_string()))?; 135 136 Ok(()) 137 } 138} 139 140#[async_trait] 141impl BlobStorage for S3BlobStorage { 142 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> { 143 self.put_bytes(key, Bytes::copy_from_slice(data)).await 144 } 145 146 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> { 147 self.client 148 .put_object() 149 .bucket(&self.bucket) 150 .key(key) 151 .body(ByteStream::from(data)) 152 .send() 153 .await 154 .map_err(|e| StorageError::S3(e.to_string()))?; 155 156 Ok(()) 157 } 158 159 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> { 160 self.get_bytes(key).await.map(|b| b.to_vec()) 161 } 162 163 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> { 164 let resp = self 165 .client 166 .get_object() 167 .bucket(&self.bucket) 168 .key(key) 169 .send() 170 .await 171 .map_err(|e| StorageError::S3(e.to_string()))?; 172 173 let data = resp 174 .body 175 .collect() 176 .await 177 .map_err(|e| StorageError::S3(e.to_string()))? 178 .into_bytes(); 179 180 Ok(data) 181 } 182 183 async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError> { 184 let range = format!("bytes=0-{}", size.saturating_sub(1)); 185 let resp = self 186 .client 187 .get_object() 188 .bucket(&self.bucket) 189 .key(key) 190 .range(range) 191 .send() 192 .await 193 .map_err(|e| StorageError::S3(e.to_string()))?; 194 195 let data = resp 196 .body 197 .collect() 198 .await 199 .map_err(|e| StorageError::S3(e.to_string()))? 200 .into_bytes(); 201 202 Ok(data) 203 } 204 205 async fn delete(&self, key: &str) -> Result<(), StorageError> { 206 self.client 207 .delete_object() 208 .bucket(&self.bucket) 209 .key(key) 210 .send() 211 .await 212 .map_err(|e| StorageError::S3(e.to_string()))?; 213 214 Ok(()) 215 } 216 217 async fn put_stream( 218 &self, 219 key: &str, 220 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>, 221 ) -> Result<StreamUploadResult, StorageError> { 222 use futures::StreamExt; 223 224 let create_resp = self 225 .client 226 .create_multipart_upload() 227 .bucket(&self.bucket) 228 .key(key) 229 .send() 230 .await 231 .map_err(|e| StorageError::S3(format!("Failed to create multipart upload: {}", e)))?; 232 233 let upload_id = create_resp 234 .upload_id() 235 .ok_or_else(|| StorageError::S3("No upload ID returned".to_string()))? 236 .to_string(); 237 238 let mut hasher = Sha256::new(); 239 let mut total_size: u64 = 0; 240 let mut part_number = 1; 241 let mut completed_parts: Vec<CompletedPart> = Vec::new(); 242 let mut buffer = Vec::with_capacity(MIN_PART_SIZE); 243 244 let upload_part = |client: &Client, 245 bucket: &str, 246 key: &str, 247 upload_id: &str, 248 part_num: i32, 249 data: Vec<u8>| 250 -> std::pin::Pin< 251 Box<dyn std::future::Future<Output = Result<CompletedPart, StorageError>> + Send>, 252 > { 253 let client = client.clone(); 254 let bucket = bucket.to_string(); 255 let key = key.to_string(); 256 let upload_id = upload_id.to_string(); 257 Box::pin(async move { 258 let resp = client 259 .upload_part() 260 .bucket(&bucket) 261 .key(&key) 262 .upload_id(&upload_id) 263 .part_number(part_num) 264 .body(ByteStream::from(data)) 265 .send() 266 .await 267 .map_err(|e| StorageError::S3(format!("Failed to upload part: {}", e)))?; 268 269 let etag = resp 270 .e_tag() 271 .ok_or_else(|| StorageError::S3("No ETag returned for part".to_string()))? 272 .to_string(); 273 274 Ok(CompletedPart::builder() 275 .part_number(part_num) 276 .e_tag(etag) 277 .build()) 278 }) 279 }; 280 281 loop { 282 match stream.next().await { 283 Some(Ok(chunk)) => { 284 hasher.update(&chunk); 285 total_size += chunk.len() as u64; 286 buffer.extend_from_slice(&chunk); 287 288 if buffer.len() >= MIN_PART_SIZE { 289 let part_data = 290 std::mem::replace(&mut buffer, Vec::with_capacity(MIN_PART_SIZE)); 291 let part = upload_part( 292 &self.client, 293 &self.bucket, 294 key, 295 &upload_id, 296 part_number, 297 part_data, 298 ) 299 .await?; 300 completed_parts.push(part); 301 part_number += 1; 302 } 303 } 304 Some(Err(e)) => { 305 let _ = self 306 .client 307 .abort_multipart_upload() 308 .bucket(&self.bucket) 309 .key(key) 310 .upload_id(&upload_id) 311 .send() 312 .await; 313 return Err(StorageError::Io(e)); 314 } 315 None => break, 316 } 317 } 318 319 if !buffer.is_empty() { 320 let part = upload_part( 321 &self.client, 322 &self.bucket, 323 key, 324 &upload_id, 325 part_number, 326 buffer, 327 ) 328 .await?; 329 completed_parts.push(part); 330 } 331 332 if completed_parts.is_empty() { 333 let _ = self 334 .client 335 .abort_multipart_upload() 336 .bucket(&self.bucket) 337 .key(key) 338 .upload_id(&upload_id) 339 .send() 340 .await; 341 return Err(StorageError::Other("Empty upload".to_string())); 342 } 343 344 let completed_upload = CompletedMultipartUpload::builder() 345 .set_parts(Some(completed_parts)) 346 .build(); 347 348 self.client 349 .complete_multipart_upload() 350 .bucket(&self.bucket) 351 .key(key) 352 .upload_id(&upload_id) 353 .multipart_upload(completed_upload) 354 .send() 355 .await 356 .map_err(|e| StorageError::S3(format!("Failed to complete multipart upload: {}", e)))?; 357 358 let hash: [u8; 32] = hasher.finalize().into(); 359 Ok(StreamUploadResult { 360 sha256_hash: hash, 361 size: total_size, 362 }) 363 } 364 365 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> { 366 let copy_source = format!("{}/{}", self.bucket, src_key); 367 368 self.client 369 .copy_object() 370 .bucket(&self.bucket) 371 .copy_source(&copy_source) 372 .key(dst_key) 373 .send() 374 .await 375 .map_err(|e| StorageError::S3(format!("Failed to copy object: {}", e)))?; 376 377 Ok(()) 378 } 379}