this repo has no description
1use async_trait::async_trait; 2use aws_config::BehaviorVersion; 3use aws_config::meta::region::RegionProviderChain; 4use aws_sdk_s3::Client; 5use aws_sdk_s3::primitives::ByteStream; 6use bytes::Bytes; 7use thiserror::Error; 8#[derive(Error, Debug)] 9pub enum StorageError { 10 #[error("IO error: {0}")] 11 Io(#[from] std::io::Error), 12 #[error("S3 error: {0}")] 13 S3(String), 14 #[error("Other: {0}")] 15 Other(String), 16} 17#[async_trait] 18pub trait BlobStorage: Send + Sync { 19 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>; 20 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 21 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 22 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 23 async fn delete(&self, key: &str) -> Result<(), StorageError>; 24} 25pub struct S3BlobStorage { 26 client: Client, 27 bucket: String, 28} 29impl S3BlobStorage { 30 pub async fn new() -> Self { 31 // heheheh 32 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); 33 let config = aws_config::defaults(BehaviorVersion::latest()) 34 .region(region_provider) 35 .load() 36 .await; 37 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set"); 38 let client = if let Ok(endpoint) = std::env::var("S3_ENDPOINT") { 39 let s3_config = aws_sdk_s3::config::Builder::from(&config) 40 .endpoint_url(endpoint) 41 .force_path_style(true) 42 .build(); 43 Client::from_conf(s3_config) 44 } else { 45 Client::new(&config) 46 }; 47 Self { client, bucket } 48 } 49} 50#[async_trait] 51impl BlobStorage for S3BlobStorage { 52 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> { 53 self.put_bytes(key, Bytes::copy_from_slice(data)).await 54 } 55 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> { 56 let result = self.client 57 .put_object() 58 .bucket(&self.bucket) 59 .key(key) 60 .body(ByteStream::from(data)) 61 .send() 62 .await 63 .map_err(|e| StorageError::S3(e.to_string())); 64 match &result { 65 Ok(_) => crate::metrics::record_s3_operation("put", "success"), 66 Err(_) => crate::metrics::record_s3_operation("put", "error"), 67 } 68 result?; 69 Ok(()) 70 } 71 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> { 72 self.get_bytes(key).await.map(|b| b.to_vec()) 73 } 74 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> { 75 let resp = self 76 .client 77 .get_object() 78 .bucket(&self.bucket) 79 .key(key) 80 .send() 81 .await 82 .map_err(|e| { 83 crate::metrics::record_s3_operation("get", "error"); 84 StorageError::S3(e.to_string()) 85 })?; 86 let data = resp 87 .body 88 .collect() 89 .await 90 .map_err(|e| { 91 crate::metrics::record_s3_operation("get", "error"); 92 StorageError::S3(e.to_string()) 93 })? 94 .into_bytes(); 95 crate::metrics::record_s3_operation("get", "success"); 96 Ok(data) 97 } 98 async fn delete(&self, key: &str) -> Result<(), StorageError> { 99 let result = self.client 100 .delete_object() 101 .bucket(&self.bucket) 102 .key(key) 103 .send() 104 .await 105 .map_err(|e| StorageError::S3(e.to_string())); 106 match &result { 107 Ok(_) => crate::metrics::record_s3_operation("delete", "success"), 108 Err(_) => crate::metrics::record_s3_operation("delete", "error"), 109 } 110 result?; 111 Ok(()) 112 } 113}