this repo has no description
1use async_trait::async_trait;
2use aws_config::BehaviorVersion;
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::Client;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::types::CompletedMultipartUpload;
7use aws_sdk_s3::types::CompletedPart;
8use bytes::Bytes;
9use futures::Stream;
10use sha2::{Digest, Sha256};
11use std::pin::Pin;
12use thiserror::Error;
13
14const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
15
16#[derive(Error, Debug)]
17pub enum StorageError {
18 #[error("IO error: {0}")]
19 Io(#[from] std::io::Error),
20 #[error("S3 error: {0}")]
21 S3(String),
22 #[error("Other: {0}")]
23 Other(String),
24}
25
26pub struct StreamUploadResult {
27 pub sha256_hash: [u8; 32],
28 pub size: u64,
29}
30
31#[async_trait]
32pub trait BlobStorage: Send + Sync {
33 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>;
34 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>;
35 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>;
36 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>;
37 async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError>;
38 async fn delete(&self, key: &str) -> Result<(), StorageError>;
39 async fn put_stream(
40 &self,
41 key: &str,
42 stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
43 ) -> Result<StreamUploadResult, StorageError>;
44 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError>;
45}
46
47pub struct S3BlobStorage {
48 client: Client,
49 bucket: String,
50}
51
52impl S3BlobStorage {
53 pub async fn new() -> Self {
54 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set");
55 let client = create_s3_client().await;
56 Self { client, bucket }
57 }
58}
59
60async fn create_s3_client() -> Client {
61 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
62
63 let config = aws_config::defaults(BehaviorVersion::latest())
64 .region(region_provider)
65 .load()
66 .await;
67
68 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") {
69 let s3_config = aws_sdk_s3::config::Builder::from(&config)
70 .endpoint_url(endpoint)
71 .force_path_style(true)
72 .build();
73 Client::from_conf(s3_config)
74 } else {
75 Client::new(&config)
76 }
77}
78
79pub struct BackupStorage {
80 client: Client,
81 bucket: String,
82}
83
84impl BackupStorage {
85 pub async fn new() -> Option<Self> {
86 let backup_enabled = std::env::var("BACKUP_ENABLED")
87 .map(|v| v != "false" && v != "0")
88 .unwrap_or(true);
89
90 if !backup_enabled {
91 return None;
92 }
93
94 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?;
95 let client = create_s3_client().await;
96 Some(Self { client, bucket })
97 }
98
99 pub fn retention_count() -> u32 {
100 std::env::var("BACKUP_RETENTION_COUNT")
101 .ok()
102 .and_then(|v| v.parse().ok())
103 .unwrap_or(7)
104 }
105
106 pub fn interval_secs() -> u64 {
107 std::env::var("BACKUP_INTERVAL_SECS")
108 .ok()
109 .and_then(|v| v.parse().ok())
110 .unwrap_or(86400)
111 }
112
113 pub async fn put_backup(
114 &self,
115 did: &str,
116 rev: &str,
117 data: &[u8],
118 ) -> Result<String, StorageError> {
119 let key = format!("{}/{}.car", did, rev);
120 self.client
121 .put_object()
122 .bucket(&self.bucket)
123 .key(&key)
124 .body(ByteStream::from(Bytes::copy_from_slice(data)))
125 .send()
126 .await
127 .map_err(|e| {
128 crate::metrics::record_s3_operation("backup_put", "error");
129 StorageError::S3(e.to_string())
130 })?;
131
132 crate::metrics::record_s3_operation("backup_put", "success");
133 Ok(key)
134 }
135
136 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> {
137 let resp = self
138 .client
139 .get_object()
140 .bucket(&self.bucket)
141 .key(storage_key)
142 .send()
143 .await
144 .map_err(|e| {
145 crate::metrics::record_s3_operation("backup_get", "error");
146 StorageError::S3(e.to_string())
147 })?;
148
149 let data = resp
150 .body
151 .collect()
152 .await
153 .map_err(|e| {
154 crate::metrics::record_s3_operation("backup_get", "error");
155 StorageError::S3(e.to_string())
156 })?
157 .into_bytes();
158
159 crate::metrics::record_s3_operation("backup_get", "success");
160 Ok(data)
161 }
162
163 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> {
164 self.client
165 .delete_object()
166 .bucket(&self.bucket)
167 .key(storage_key)
168 .send()
169 .await
170 .map_err(|e| {
171 crate::metrics::record_s3_operation("backup_delete", "error");
172 StorageError::S3(e.to_string())
173 })?;
174
175 crate::metrics::record_s3_operation("backup_delete", "success");
176 Ok(())
177 }
178}
179
180#[async_trait]
181impl BlobStorage for S3BlobStorage {
182 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> {
183 self.put_bytes(key, Bytes::copy_from_slice(data)).await
184 }
185
186 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
187 let result = self
188 .client
189 .put_object()
190 .bucket(&self.bucket)
191 .key(key)
192 .body(ByteStream::from(data))
193 .send()
194 .await
195 .map_err(|e| StorageError::S3(e.to_string()));
196
197 match &result {
198 Ok(_) => crate::metrics::record_s3_operation("put", "success"),
199 Err(_) => crate::metrics::record_s3_operation("put", "error"),
200 }
201
202 result?;
203 Ok(())
204 }
205
206 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
207 self.get_bytes(key).await.map(|b| b.to_vec())
208 }
209
210 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> {
211 let resp = self
212 .client
213 .get_object()
214 .bucket(&self.bucket)
215 .key(key)
216 .send()
217 .await
218 .map_err(|e| {
219 crate::metrics::record_s3_operation("get", "error");
220 StorageError::S3(e.to_string())
221 })?;
222
223 let data = resp
224 .body
225 .collect()
226 .await
227 .map_err(|e| {
228 crate::metrics::record_s3_operation("get", "error");
229 StorageError::S3(e.to_string())
230 })?
231 .into_bytes();
232
233 crate::metrics::record_s3_operation("get", "success");
234 Ok(data)
235 }
236
237 async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError> {
238 let range = format!("bytes=0-{}", size.saturating_sub(1));
239 let resp = self
240 .client
241 .get_object()
242 .bucket(&self.bucket)
243 .key(key)
244 .range(range)
245 .send()
246 .await
247 .map_err(|e| {
248 crate::metrics::record_s3_operation("get_head", "error");
249 StorageError::S3(e.to_string())
250 })?;
251
252 let data = resp
253 .body
254 .collect()
255 .await
256 .map_err(|e| {
257 crate::metrics::record_s3_operation("get_head", "error");
258 StorageError::S3(e.to_string())
259 })?
260 .into_bytes();
261
262 crate::metrics::record_s3_operation("get_head", "success");
263 Ok(data)
264 }
265
266 async fn delete(&self, key: &str) -> Result<(), StorageError> {
267 let result = self
268 .client
269 .delete_object()
270 .bucket(&self.bucket)
271 .key(key)
272 .send()
273 .await
274 .map_err(|e| StorageError::S3(e.to_string()));
275
276 match &result {
277 Ok(_) => crate::metrics::record_s3_operation("delete", "success"),
278 Err(_) => crate::metrics::record_s3_operation("delete", "error"),
279 }
280
281 result?;
282 Ok(())
283 }
284
285 async fn put_stream(
286 &self,
287 key: &str,
288 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
289 ) -> Result<StreamUploadResult, StorageError> {
290 use futures::StreamExt;
291
292 let create_resp = self
293 .client
294 .create_multipart_upload()
295 .bucket(&self.bucket)
296 .key(key)
297 .send()
298 .await
299 .map_err(|e| StorageError::S3(format!("Failed to create multipart upload: {}", e)))?;
300
301 let upload_id = create_resp
302 .upload_id()
303 .ok_or_else(|| StorageError::S3("No upload ID returned".to_string()))?
304 .to_string();
305
306 let mut hasher = Sha256::new();
307 let mut total_size: u64 = 0;
308 let mut part_number = 1;
309 let mut completed_parts: Vec<CompletedPart> = Vec::new();
310 let mut buffer = Vec::with_capacity(MIN_PART_SIZE);
311
312 let upload_part = |client: &Client,
313 bucket: &str,
314 key: &str,
315 upload_id: &str,
316 part_num: i32,
317 data: Vec<u8>|
318 -> std::pin::Pin<
319 Box<dyn std::future::Future<Output = Result<CompletedPart, StorageError>> + Send>,
320 > {
321 let client = client.clone();
322 let bucket = bucket.to_string();
323 let key = key.to_string();
324 let upload_id = upload_id.to_string();
325 Box::pin(async move {
326 let resp = client
327 .upload_part()
328 .bucket(&bucket)
329 .key(&key)
330 .upload_id(&upload_id)
331 .part_number(part_num)
332 .body(ByteStream::from(data))
333 .send()
334 .await
335 .map_err(|e| StorageError::S3(format!("Failed to upload part: {}", e)))?;
336
337 let etag = resp
338 .e_tag()
339 .ok_or_else(|| StorageError::S3("No ETag returned for part".to_string()))?
340 .to_string();
341
342 Ok(CompletedPart::builder()
343 .part_number(part_num)
344 .e_tag(etag)
345 .build())
346 })
347 };
348
349 loop {
350 match stream.next().await {
351 Some(Ok(chunk)) => {
352 hasher.update(&chunk);
353 total_size += chunk.len() as u64;
354 buffer.extend_from_slice(&chunk);
355
356 if buffer.len() >= MIN_PART_SIZE {
357 let part_data =
358 std::mem::replace(&mut buffer, Vec::with_capacity(MIN_PART_SIZE));
359 let part = upload_part(
360 &self.client,
361 &self.bucket,
362 key,
363 &upload_id,
364 part_number,
365 part_data,
366 )
367 .await?;
368 completed_parts.push(part);
369 part_number += 1;
370 }
371 }
372 Some(Err(e)) => {
373 let _ = self
374 .client
375 .abort_multipart_upload()
376 .bucket(&self.bucket)
377 .key(key)
378 .upload_id(&upload_id)
379 .send()
380 .await;
381 return Err(StorageError::Io(e));
382 }
383 None => break,
384 }
385 }
386
387 if !buffer.is_empty() {
388 let part = upload_part(
389 &self.client,
390 &self.bucket,
391 key,
392 &upload_id,
393 part_number,
394 buffer,
395 )
396 .await?;
397 completed_parts.push(part);
398 }
399
400 if completed_parts.is_empty() {
401 let _ = self
402 .client
403 .abort_multipart_upload()
404 .bucket(&self.bucket)
405 .key(key)
406 .upload_id(&upload_id)
407 .send()
408 .await;
409 return Err(StorageError::Other("Empty upload".to_string()));
410 }
411
412 let completed_upload = CompletedMultipartUpload::builder()
413 .set_parts(Some(completed_parts))
414 .build();
415
416 self.client
417 .complete_multipart_upload()
418 .bucket(&self.bucket)
419 .key(key)
420 .upload_id(&upload_id)
421 .multipart_upload(completed_upload)
422 .send()
423 .await
424 .map_err(|e| StorageError::S3(format!("Failed to complete multipart upload: {}", e)))?;
425
426 crate::metrics::record_s3_operation("put_stream", "success");
427
428 let hash: [u8; 32] = hasher.finalize().into();
429 Ok(StreamUploadResult {
430 sha256_hash: hash,
431 size: total_size,
432 })
433 }
434
435 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> {
436 let copy_source = format!("{}/{}", self.bucket, src_key);
437
438 self.client
439 .copy_object()
440 .bucket(&self.bucket)
441 .copy_source(©_source)
442 .key(dst_key)
443 .send()
444 .await
445 .map_err(|e| StorageError::S3(format!("Failed to copy object: {}", e)))?;
446
447 crate::metrics::record_s3_operation("copy", "success");
448 Ok(())
449 }
450}