this repo has no description
1use async_trait::async_trait;
2use aws_config::BehaviorVersion;
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::Client;
5use aws_sdk_s3::primitives::ByteStream;
6use bytes::Bytes;
7use thiserror::Error;
8
9#[derive(Error, Debug)]
10pub enum StorageError {
11 #[error("IO error: {0}")]
12 Io(#[from] std::io::Error),
13 #[error("S3 error: {0}")]
14 S3(String),
15 #[error("Other: {0}")]
16 Other(String),
17}
18
19#[async_trait]
20pub trait BlobStorage: Send + Sync {
21 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>;
22 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>;
23 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>;
24 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>;
25 async fn delete(&self, key: &str) -> Result<(), StorageError>;
26}
27
28pub struct S3BlobStorage {
29 client: Client,
30 bucket: String,
31}
32
33impl S3BlobStorage {
34 pub async fn new() -> Self {
35 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set");
36 let client = create_s3_client().await;
37 Self { client, bucket }
38 }
39}
40
41async fn create_s3_client() -> Client {
42 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
43
44 let config = aws_config::defaults(BehaviorVersion::latest())
45 .region(region_provider)
46 .load()
47 .await;
48
49 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") {
50 let s3_config = aws_sdk_s3::config::Builder::from(&config)
51 .endpoint_url(endpoint)
52 .force_path_style(true)
53 .build();
54 Client::from_conf(s3_config)
55 } else {
56 Client::new(&config)
57 }
58}
59
60pub struct BackupStorage {
61 client: Client,
62 bucket: String,
63}
64
65impl BackupStorage {
66 pub async fn new() -> Option<Self> {
67 let backup_enabled = std::env::var("BACKUP_ENABLED")
68 .map(|v| v != "false" && v != "0")
69 .unwrap_or(true);
70
71 if !backup_enabled {
72 return None;
73 }
74
75 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?;
76 let client = create_s3_client().await;
77 Some(Self { client, bucket })
78 }
79
80 pub fn retention_count() -> u32 {
81 std::env::var("BACKUP_RETENTION_COUNT")
82 .ok()
83 .and_then(|v| v.parse().ok())
84 .unwrap_or(7)
85 }
86
87 pub fn interval_secs() -> u64 {
88 std::env::var("BACKUP_INTERVAL_SECS")
89 .ok()
90 .and_then(|v| v.parse().ok())
91 .unwrap_or(86400)
92 }
93
94 pub async fn put_backup(
95 &self,
96 did: &str,
97 rev: &str,
98 data: &[u8],
99 ) -> Result<String, StorageError> {
100 let key = format!("{}/{}.car", did, rev);
101 self.client
102 .put_object()
103 .bucket(&self.bucket)
104 .key(&key)
105 .body(ByteStream::from(Bytes::copy_from_slice(data)))
106 .send()
107 .await
108 .map_err(|e| {
109 crate::metrics::record_s3_operation("backup_put", "error");
110 StorageError::S3(e.to_string())
111 })?;
112
113 crate::metrics::record_s3_operation("backup_put", "success");
114 Ok(key)
115 }
116
117 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> {
118 let resp = self
119 .client
120 .get_object()
121 .bucket(&self.bucket)
122 .key(storage_key)
123 .send()
124 .await
125 .map_err(|e| {
126 crate::metrics::record_s3_operation("backup_get", "error");
127 StorageError::S3(e.to_string())
128 })?;
129
130 let data = resp
131 .body
132 .collect()
133 .await
134 .map_err(|e| {
135 crate::metrics::record_s3_operation("backup_get", "error");
136 StorageError::S3(e.to_string())
137 })?
138 .into_bytes();
139
140 crate::metrics::record_s3_operation("backup_get", "success");
141 Ok(data)
142 }
143
144 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> {
145 self.client
146 .delete_object()
147 .bucket(&self.bucket)
148 .key(storage_key)
149 .send()
150 .await
151 .map_err(|e| {
152 crate::metrics::record_s3_operation("backup_delete", "error");
153 StorageError::S3(e.to_string())
154 })?;
155
156 crate::metrics::record_s3_operation("backup_delete", "success");
157 Ok(())
158 }
159}
160
161#[async_trait]
162impl BlobStorage for S3BlobStorage {
163 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> {
164 self.put_bytes(key, Bytes::copy_from_slice(data)).await
165 }
166
167 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
168 let result = self
169 .client
170 .put_object()
171 .bucket(&self.bucket)
172 .key(key)
173 .body(ByteStream::from(data))
174 .send()
175 .await
176 .map_err(|e| StorageError::S3(e.to_string()));
177
178 match &result {
179 Ok(_) => crate::metrics::record_s3_operation("put", "success"),
180 Err(_) => crate::metrics::record_s3_operation("put", "error"),
181 }
182
183 result?;
184 Ok(())
185 }
186
187 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
188 self.get_bytes(key).await.map(|b| b.to_vec())
189 }
190
191 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> {
192 let resp = self
193 .client
194 .get_object()
195 .bucket(&self.bucket)
196 .key(key)
197 .send()
198 .await
199 .map_err(|e| {
200 crate::metrics::record_s3_operation("get", "error");
201 StorageError::S3(e.to_string())
202 })?;
203
204 let data = resp
205 .body
206 .collect()
207 .await
208 .map_err(|e| {
209 crate::metrics::record_s3_operation("get", "error");
210 StorageError::S3(e.to_string())
211 })?
212 .into_bytes();
213
214 crate::metrics::record_s3_operation("get", "success");
215 Ok(data)
216 }
217
218 async fn delete(&self, key: &str) -> Result<(), StorageError> {
219 let result = self
220 .client
221 .delete_object()
222 .bucket(&self.bucket)
223 .key(key)
224 .send()
225 .await
226 .map_err(|e| StorageError::S3(e.to_string()));
227
228 match &result {
229 Ok(_) => crate::metrics::record_s3_operation("delete", "success"),
230 Err(_) => crate::metrics::record_s3_operation("delete", "error"),
231 }
232
233 result?;
234 Ok(())
235 }
236}