this repo has no description
1use async_trait::async_trait;
2use aws_config::BehaviorVersion;
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::Client;
5use aws_sdk_s3::primitives::ByteStream;
6use bytes::Bytes;
7use thiserror::Error;
8
9#[derive(Error, Debug)]
10pub enum StorageError {
11 #[error("IO error: {0}")]
12 Io(#[from] std::io::Error),
13 #[error("S3 error: {0}")]
14 S3(String),
15 #[error("Other: {0}")]
16 Other(String),
17}
18
19#[async_trait]
20pub trait BlobStorage: Send + Sync {
21 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>;
22 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>;
23 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>;
24 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>;
25 async fn delete(&self, key: &str) -> Result<(), StorageError>;
26}
27
28pub struct S3BlobStorage {
29 client: Client,
30 bucket: String,
31}
32
33impl S3BlobStorage {
34 pub async fn new() -> Self {
35 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
36
37 let config = aws_config::defaults(BehaviorVersion::latest())
38 .region(region_provider)
39 .load()
40 .await;
41
42 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set");
43
44 let client = if let Ok(endpoint) = std::env::var("S3_ENDPOINT") {
45 let s3_config = aws_sdk_s3::config::Builder::from(&config)
46 .endpoint_url(endpoint)
47 .force_path_style(true)
48 .build();
49 Client::from_conf(s3_config)
50 } else {
51 Client::new(&config)
52 };
53
54 Self { client, bucket }
55 }
56}
57
58#[async_trait]
59impl BlobStorage for S3BlobStorage {
60 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> {
61 self.put_bytes(key, Bytes::copy_from_slice(data)).await
62 }
63
64 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
65 let result = self
66 .client
67 .put_object()
68 .bucket(&self.bucket)
69 .key(key)
70 .body(ByteStream::from(data))
71 .send()
72 .await
73 .map_err(|e| StorageError::S3(e.to_string()));
74
75 match &result {
76 Ok(_) => crate::metrics::record_s3_operation("put", "success"),
77 Err(_) => crate::metrics::record_s3_operation("put", "error"),
78 }
79
80 result?;
81 Ok(())
82 }
83
84 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
85 self.get_bytes(key).await.map(|b| b.to_vec())
86 }
87
88 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> {
89 let resp = self
90 .client
91 .get_object()
92 .bucket(&self.bucket)
93 .key(key)
94 .send()
95 .await
96 .map_err(|e| {
97 crate::metrics::record_s3_operation("get", "error");
98 StorageError::S3(e.to_string())
99 })?;
100
101 let data = resp
102 .body
103 .collect()
104 .await
105 .map_err(|e| {
106 crate::metrics::record_s3_operation("get", "error");
107 StorageError::S3(e.to_string())
108 })?
109 .into_bytes();
110
111 crate::metrics::record_s3_operation("get", "success");
112 Ok(data)
113 }
114
115 async fn delete(&self, key: &str) -> Result<(), StorageError> {
116 let result = self
117 .client
118 .delete_object()
119 .bucket(&self.bucket)
120 .key(key)
121 .send()
122 .await
123 .map_err(|e| StorageError::S3(e.to_string()));
124
125 match &result {
126 Ok(_) => crate::metrics::record_s3_operation("delete", "success"),
127 Err(_) => crate::metrics::record_s3_operation("delete", "error"),
128 }
129
130 result?;
131 Ok(())
132 }
133}