this repo has no description
1use async_trait::async_trait;
2use aws_config::BehaviorVersion;
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::Client;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::types::CompletedMultipartUpload;
7use aws_sdk_s3::types::CompletedPart;
8use bytes::Bytes;
9use futures::Stream;
10use sha2::{Digest, Sha256};
11use std::pin::Pin;
12use thiserror::Error;
13
14const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
15
16#[derive(Error, Debug)]
17pub enum StorageError {
18 #[error("IO error: {0}")]
19 Io(#[from] std::io::Error),
20 #[error("S3 error: {0}")]
21 S3(String),
22 #[error("Other: {0}")]
23 Other(String),
24}
25
26pub struct StreamUploadResult {
27 pub sha256_hash: [u8; 32],
28 pub size: u64,
29}
30
31#[async_trait]
32pub trait BlobStorage: Send + Sync {
33 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>;
34 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>;
35 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>;
36 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>;
37 async fn delete(&self, key: &str) -> Result<(), StorageError>;
38 async fn put_stream(
39 &self,
40 key: &str,
41 stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
42 ) -> Result<StreamUploadResult, StorageError>;
43 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError>;
44}
45
46pub struct S3BlobStorage {
47 client: Client,
48 bucket: String,
49}
50
51impl S3BlobStorage {
52 pub async fn new() -> Self {
53 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set");
54 let client = create_s3_client().await;
55 Self { client, bucket }
56 }
57}
58
59async fn create_s3_client() -> Client {
60 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
61
62 let config = aws_config::defaults(BehaviorVersion::latest())
63 .region(region_provider)
64 .load()
65 .await;
66
67 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") {
68 let s3_config = aws_sdk_s3::config::Builder::from(&config)
69 .endpoint_url(endpoint)
70 .force_path_style(true)
71 .build();
72 Client::from_conf(s3_config)
73 } else {
74 Client::new(&config)
75 }
76}
77
78pub struct BackupStorage {
79 client: Client,
80 bucket: String,
81}
82
83impl BackupStorage {
84 pub async fn new() -> Option<Self> {
85 let backup_enabled = std::env::var("BACKUP_ENABLED")
86 .map(|v| v != "false" && v != "0")
87 .unwrap_or(true);
88
89 if !backup_enabled {
90 return None;
91 }
92
93 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?;
94 let client = create_s3_client().await;
95 Some(Self { client, bucket })
96 }
97
98 pub fn retention_count() -> u32 {
99 std::env::var("BACKUP_RETENTION_COUNT")
100 .ok()
101 .and_then(|v| v.parse().ok())
102 .unwrap_or(7)
103 }
104
105 pub fn interval_secs() -> u64 {
106 std::env::var("BACKUP_INTERVAL_SECS")
107 .ok()
108 .and_then(|v| v.parse().ok())
109 .unwrap_or(86400)
110 }
111
112 pub async fn put_backup(
113 &self,
114 did: &str,
115 rev: &str,
116 data: &[u8],
117 ) -> Result<String, StorageError> {
118 let key = format!("{}/{}.car", did, rev);
119 self.client
120 .put_object()
121 .bucket(&self.bucket)
122 .key(&key)
123 .body(ByteStream::from(Bytes::copy_from_slice(data)))
124 .send()
125 .await
126 .map_err(|e| {
127 crate::metrics::record_s3_operation("backup_put", "error");
128 StorageError::S3(e.to_string())
129 })?;
130
131 crate::metrics::record_s3_operation("backup_put", "success");
132 Ok(key)
133 }
134
135 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> {
136 let resp = self
137 .client
138 .get_object()
139 .bucket(&self.bucket)
140 .key(storage_key)
141 .send()
142 .await
143 .map_err(|e| {
144 crate::metrics::record_s3_operation("backup_get", "error");
145 StorageError::S3(e.to_string())
146 })?;
147
148 let data = resp
149 .body
150 .collect()
151 .await
152 .map_err(|e| {
153 crate::metrics::record_s3_operation("backup_get", "error");
154 StorageError::S3(e.to_string())
155 })?
156 .into_bytes();
157
158 crate::metrics::record_s3_operation("backup_get", "success");
159 Ok(data)
160 }
161
162 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> {
163 self.client
164 .delete_object()
165 .bucket(&self.bucket)
166 .key(storage_key)
167 .send()
168 .await
169 .map_err(|e| {
170 crate::metrics::record_s3_operation("backup_delete", "error");
171 StorageError::S3(e.to_string())
172 })?;
173
174 crate::metrics::record_s3_operation("backup_delete", "success");
175 Ok(())
176 }
177}
178
179#[async_trait]
180impl BlobStorage for S3BlobStorage {
181 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> {
182 self.put_bytes(key, Bytes::copy_from_slice(data)).await
183 }
184
185 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
186 let result = self
187 .client
188 .put_object()
189 .bucket(&self.bucket)
190 .key(key)
191 .body(ByteStream::from(data))
192 .send()
193 .await
194 .map_err(|e| StorageError::S3(e.to_string()));
195
196 match &result {
197 Ok(_) => crate::metrics::record_s3_operation("put", "success"),
198 Err(_) => crate::metrics::record_s3_operation("put", "error"),
199 }
200
201 result?;
202 Ok(())
203 }
204
205 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
206 self.get_bytes(key).await.map(|b| b.to_vec())
207 }
208
209 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> {
210 let resp = self
211 .client
212 .get_object()
213 .bucket(&self.bucket)
214 .key(key)
215 .send()
216 .await
217 .map_err(|e| {
218 crate::metrics::record_s3_operation("get", "error");
219 StorageError::S3(e.to_string())
220 })?;
221
222 let data = resp
223 .body
224 .collect()
225 .await
226 .map_err(|e| {
227 crate::metrics::record_s3_operation("get", "error");
228 StorageError::S3(e.to_string())
229 })?
230 .into_bytes();
231
232 crate::metrics::record_s3_operation("get", "success");
233 Ok(data)
234 }
235
236 async fn delete(&self, key: &str) -> Result<(), StorageError> {
237 let result = self
238 .client
239 .delete_object()
240 .bucket(&self.bucket)
241 .key(key)
242 .send()
243 .await
244 .map_err(|e| StorageError::S3(e.to_string()));
245
246 match &result {
247 Ok(_) => crate::metrics::record_s3_operation("delete", "success"),
248 Err(_) => crate::metrics::record_s3_operation("delete", "error"),
249 }
250
251 result?;
252 Ok(())
253 }
254
255 async fn put_stream(
256 &self,
257 key: &str,
258 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
259 ) -> Result<StreamUploadResult, StorageError> {
260 use futures::StreamExt;
261
262 let create_resp = self
263 .client
264 .create_multipart_upload()
265 .bucket(&self.bucket)
266 .key(key)
267 .send()
268 .await
269 .map_err(|e| StorageError::S3(format!("Failed to create multipart upload: {}", e)))?;
270
271 let upload_id = create_resp
272 .upload_id()
273 .ok_or_else(|| StorageError::S3("No upload ID returned".to_string()))?
274 .to_string();
275
276 let mut hasher = Sha256::new();
277 let mut total_size: u64 = 0;
278 let mut part_number = 1;
279 let mut completed_parts: Vec<CompletedPart> = Vec::new();
280 let mut buffer = Vec::with_capacity(MIN_PART_SIZE);
281
282 let upload_part = |client: &Client,
283 bucket: &str,
284 key: &str,
285 upload_id: &str,
286 part_num: i32,
287 data: Vec<u8>|
288 -> std::pin::Pin<
289 Box<dyn std::future::Future<Output = Result<CompletedPart, StorageError>> + Send>,
290 > {
291 let client = client.clone();
292 let bucket = bucket.to_string();
293 let key = key.to_string();
294 let upload_id = upload_id.to_string();
295 Box::pin(async move {
296 let resp = client
297 .upload_part()
298 .bucket(&bucket)
299 .key(&key)
300 .upload_id(&upload_id)
301 .part_number(part_num)
302 .body(ByteStream::from(data))
303 .send()
304 .await
305 .map_err(|e| StorageError::S3(format!("Failed to upload part: {}", e)))?;
306
307 let etag = resp
308 .e_tag()
309 .ok_or_else(|| StorageError::S3("No ETag returned for part".to_string()))?
310 .to_string();
311
312 Ok(CompletedPart::builder()
313 .part_number(part_num)
314 .e_tag(etag)
315 .build())
316 })
317 };
318
319 loop {
320 match stream.next().await {
321 Some(Ok(chunk)) => {
322 hasher.update(&chunk);
323 total_size += chunk.len() as u64;
324 buffer.extend_from_slice(&chunk);
325
326 if buffer.len() >= MIN_PART_SIZE {
327 let part_data =
328 std::mem::replace(&mut buffer, Vec::with_capacity(MIN_PART_SIZE));
329 let part = upload_part(
330 &self.client,
331 &self.bucket,
332 key,
333 &upload_id,
334 part_number,
335 part_data,
336 )
337 .await?;
338 completed_parts.push(part);
339 part_number += 1;
340 }
341 }
342 Some(Err(e)) => {
343 let _ = self
344 .client
345 .abort_multipart_upload()
346 .bucket(&self.bucket)
347 .key(key)
348 .upload_id(&upload_id)
349 .send()
350 .await;
351 return Err(StorageError::Io(e));
352 }
353 None => break,
354 }
355 }
356
357 if !buffer.is_empty() {
358 let part = upload_part(
359 &self.client,
360 &self.bucket,
361 key,
362 &upload_id,
363 part_number,
364 buffer,
365 )
366 .await?;
367 completed_parts.push(part);
368 }
369
370 if completed_parts.is_empty() {
371 let _ = self
372 .client
373 .abort_multipart_upload()
374 .bucket(&self.bucket)
375 .key(key)
376 .upload_id(&upload_id)
377 .send()
378 .await;
379 return Err(StorageError::Other("Empty upload".to_string()));
380 }
381
382 let completed_upload = CompletedMultipartUpload::builder()
383 .set_parts(Some(completed_parts))
384 .build();
385
386 self.client
387 .complete_multipart_upload()
388 .bucket(&self.bucket)
389 .key(key)
390 .upload_id(&upload_id)
391 .multipart_upload(completed_upload)
392 .send()
393 .await
394 .map_err(|e| StorageError::S3(format!("Failed to complete multipart upload: {}", e)))?;
395
396 crate::metrics::record_s3_operation("put_stream", "success");
397
398 let hash: [u8; 32] = hasher.finalize().into();
399 Ok(StreamUploadResult {
400 sha256_hash: hash,
401 size: total_size,
402 })
403 }
404
405 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> {
406 let copy_source = format!("{}/{}", self.bucket, src_key);
407
408 self.client
409 .copy_object()
410 .bucket(&self.bucket)
411 .copy_source(©_source)
412 .key(dst_key)
413 .send()
414 .await
415 .map_err(|e| StorageError::S3(format!("Failed to copy object: {}", e)))?;
416
417 crate::metrics::record_s3_operation("copy", "success");
418 Ok(())
419 }
420}