this repo has no description
1use async_trait::async_trait; 2use aws_config::BehaviorVersion; 3use aws_config::meta::region::RegionProviderChain; 4use aws_sdk_s3::Client; 5use aws_sdk_s3::primitives::ByteStream; 6use bytes::Bytes; 7use thiserror::Error; 8 9#[derive(Error, Debug)] 10pub enum StorageError { 11 #[error("IO error: {0}")] 12 Io(#[from] std::io::Error), 13 #[error("S3 error: {0}")] 14 S3(String), 15 #[error("Other: {0}")] 16 Other(String), 17} 18 19#[async_trait] 20pub trait BlobStorage: Send + Sync { 21 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>; 22 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 23 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 24 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 25 async fn delete(&self, key: &str) -> Result<(), StorageError>; 26} 27 28pub struct S3BlobStorage { 29 client: Client, 30 bucket: String, 31} 32 33impl S3BlobStorage { 34 pub async fn new() -> Self { 35 // heheheh 36 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); 37 let config = aws_config::defaults(BehaviorVersion::latest()) 38 .region(region_provider) 39 .load() 40 .await; 41 42 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set"); 43 44 let client = if let Ok(endpoint) = std::env::var("S3_ENDPOINT") { 45 let s3_config = aws_sdk_s3::config::Builder::from(&config) 46 .endpoint_url(endpoint) 47 .force_path_style(true) 48 .build(); 49 Client::from_conf(s3_config) 50 } else { 51 Client::new(&config) 52 }; 53 54 Self { client, bucket } 55 } 56} 57 58#[async_trait] 59impl BlobStorage for S3BlobStorage { 60 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> { 61 self.put_bytes(key, Bytes::copy_from_slice(data)).await 62 } 63 64 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> { 65 let result = self.client 66 .put_object() 67 .bucket(&self.bucket) 68 .key(key) 69 .body(ByteStream::from(data)) 70 .send() 71 .await 72 .map_err(|e| StorageError::S3(e.to_string())); 73 74 match &result { 75 Ok(_) => crate::metrics::record_s3_operation("put", "success"), 76 Err(_) => crate::metrics::record_s3_operation("put", "error"), 77 } 78 result?; 79 Ok(()) 80 } 81 82 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> { 83 self.get_bytes(key).await.map(|b| b.to_vec()) 84 } 85 86 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> { 87 let resp = self 88 .client 89 .get_object() 90 .bucket(&self.bucket) 91 .key(key) 92 .send() 93 .await 94 .map_err(|e| { 95 crate::metrics::record_s3_operation("get", "error"); 96 StorageError::S3(e.to_string()) 97 })?; 98 99 let data = resp 100 .body 101 .collect() 102 .await 103 .map_err(|e| { 104 crate::metrics::record_s3_operation("get", "error"); 105 StorageError::S3(e.to_string()) 106 })? 107 .into_bytes(); 108 109 crate::metrics::record_s3_operation("get", "success"); 110 Ok(data) 111 } 112 113 async fn delete(&self, key: &str) -> Result<(), StorageError> { 114 let result = self.client 115 .delete_object() 116 .bucket(&self.bucket) 117 .key(key) 118 .send() 119 .await 120 .map_err(|e| StorageError::S3(e.to_string())); 121 122 match &result { 123 Ok(_) => crate::metrics::record_s3_operation("delete", "success"), 124 Err(_) => crate::metrics::record_s3_operation("delete", "error"), 125 } 126 result?; 127 Ok(()) 128 } 129}