this repo has no description
1use async_trait::async_trait; 2use aws_config::BehaviorVersion; 3use aws_config::meta::region::RegionProviderChain; 4use aws_sdk_s3::Client; 5use aws_sdk_s3::primitives::ByteStream; 6use bytes::Bytes; 7use thiserror::Error; 8 9#[derive(Error, Debug)] 10pub enum StorageError { 11 #[error("IO error: {0}")] 12 Io(#[from] std::io::Error), 13 #[error("S3 error: {0}")] 14 S3(String), 15 #[error("Other: {0}")] 16 Other(String), 17} 18 19#[async_trait] 20pub trait BlobStorage: Send + Sync { 21 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>; 22 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 23 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 24 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 25 async fn delete(&self, key: &str) -> Result<(), StorageError>; 26} 27 28pub struct S3BlobStorage { 29 client: Client, 30 bucket: String, 31} 32 33impl S3BlobStorage { 34 pub async fn new() -> Self { 35 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); 36 37 let config = aws_config::defaults(BehaviorVersion::latest()) 38 .region(region_provider) 39 .load() 40 .await; 41 42 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set"); 43 44 let client = if let Ok(endpoint) = std::env::var("S3_ENDPOINT") { 45 let s3_config = aws_sdk_s3::config::Builder::from(&config) 46 .endpoint_url(endpoint) 47 .force_path_style(true) 48 .build(); 49 Client::from_conf(s3_config) 50 } else { 51 Client::new(&config) 52 }; 53 54 Self { client, bucket } 55 } 56} 57 58#[async_trait] 59impl BlobStorage for S3BlobStorage { 60 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> { 61 self.put_bytes(key, Bytes::copy_from_slice(data)).await 62 } 63 64 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> { 65 let result = self 66 .client 67 .put_object() 68 .bucket(&self.bucket) 69 .key(key) 70 .body(ByteStream::from(data)) 71 .send() 72 .await 73 .map_err(|e| StorageError::S3(e.to_string())); 74 75 match &result { 76 Ok(_) => crate::metrics::record_s3_operation("put", "success"), 77 Err(_) => crate::metrics::record_s3_operation("put", "error"), 78 } 79 80 result?; 81 Ok(()) 82 } 83 84 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> { 85 self.get_bytes(key).await.map(|b| b.to_vec()) 86 } 87 88 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> { 89 let resp = self 90 .client 91 .get_object() 92 .bucket(&self.bucket) 93 .key(key) 94 .send() 95 .await 96 .map_err(|e| { 97 crate::metrics::record_s3_operation("get", "error"); 98 StorageError::S3(e.to_string()) 99 })?; 100 101 let data = resp 102 .body 103 .collect() 104 .await 105 .map_err(|e| { 106 crate::metrics::record_s3_operation("get", "error"); 107 StorageError::S3(e.to_string()) 108 })? 109 .into_bytes(); 110 111 crate::metrics::record_s3_operation("get", "success"); 112 Ok(data) 113 } 114 115 async fn delete(&self, key: &str) -> Result<(), StorageError> { 116 let result = self 117 .client 118 .delete_object() 119 .bucket(&self.bucket) 120 .key(key) 121 .send() 122 .await 123 .map_err(|e| StorageError::S3(e.to_string())); 124 125 match &result { 126 Ok(_) => crate::metrics::record_s3_operation("delete", "success"), 127 Err(_) => crate::metrics::record_s3_operation("delete", "error"), 128 } 129 130 result?; 131 Ok(()) 132 } 133}