this repo has no description
1pub use tranquil_infra::{BlobStorage, StorageError, StreamUploadResult};
2
3use async_trait::async_trait;
4use aws_config::BehaviorVersion;
5use aws_config::meta::region::RegionProviderChain;
6use aws_sdk_s3::Client;
7use aws_sdk_s3::primitives::ByteStream;
8use aws_sdk_s3::types::CompletedMultipartUpload;
9use aws_sdk_s3::types::CompletedPart;
10use bytes::Bytes;
11use futures::Stream;
12use sha2::{Digest, Sha256};
13use std::pin::Pin;
14
15const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
16
17pub struct S3BlobStorage {
18 client: Client,
19 bucket: String,
20}
21
22impl S3BlobStorage {
23 pub async fn new() -> Self {
24 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set");
25 let client = create_s3_client().await;
26 Self { client, bucket }
27 }
28
29 pub async fn with_bucket(bucket: String) -> Self {
30 let client = create_s3_client().await;
31 Self { client, bucket }
32 }
33}
34
35async fn create_s3_client() -> Client {
36 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
37
38 let config = aws_config::defaults(BehaviorVersion::latest())
39 .region(region_provider)
40 .load()
41 .await;
42
43 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") {
44 let s3_config = aws_sdk_s3::config::Builder::from(&config)
45 .endpoint_url(endpoint)
46 .force_path_style(true)
47 .build();
48 Client::from_conf(s3_config)
49 } else {
50 Client::new(&config)
51 }
52}
53
54pub struct BackupStorage {
55 client: Client,
56 bucket: String,
57}
58
59impl BackupStorage {
60 pub async fn new() -> Option<Self> {
61 let backup_enabled = std::env::var("BACKUP_ENABLED")
62 .map(|v| v != "false" && v != "0")
63 .unwrap_or(true);
64
65 if !backup_enabled {
66 return None;
67 }
68
69 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?;
70 let client = create_s3_client().await;
71 Some(Self { client, bucket })
72 }
73
74 pub fn retention_count() -> u32 {
75 std::env::var("BACKUP_RETENTION_COUNT")
76 .ok()
77 .and_then(|v| v.parse().ok())
78 .unwrap_or(7)
79 }
80
81 pub fn interval_secs() -> u64 {
82 std::env::var("BACKUP_INTERVAL_SECS")
83 .ok()
84 .and_then(|v| v.parse().ok())
85 .unwrap_or(86400)
86 }
87
88 pub async fn put_backup(
89 &self,
90 did: &str,
91 rev: &str,
92 data: &[u8],
93 ) -> Result<String, StorageError> {
94 let key = format!("{}/{}.car", did, rev);
95 self.client
96 .put_object()
97 .bucket(&self.bucket)
98 .key(&key)
99 .body(ByteStream::from(Bytes::copy_from_slice(data)))
100 .send()
101 .await
102 .map_err(|e| StorageError::S3(e.to_string()))?;
103
104 Ok(key)
105 }
106
107 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> {
108 let resp = self
109 .client
110 .get_object()
111 .bucket(&self.bucket)
112 .key(storage_key)
113 .send()
114 .await
115 .map_err(|e| StorageError::S3(e.to_string()))?;
116
117 let data = resp
118 .body
119 .collect()
120 .await
121 .map_err(|e| StorageError::S3(e.to_string()))?
122 .into_bytes();
123
124 Ok(data)
125 }
126
127 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> {
128 self.client
129 .delete_object()
130 .bucket(&self.bucket)
131 .key(storage_key)
132 .send()
133 .await
134 .map_err(|e| StorageError::S3(e.to_string()))?;
135
136 Ok(())
137 }
138}
139
140#[async_trait]
141impl BlobStorage for S3BlobStorage {
142 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> {
143 self.put_bytes(key, Bytes::copy_from_slice(data)).await
144 }
145
146 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
147 self.client
148 .put_object()
149 .bucket(&self.bucket)
150 .key(key)
151 .body(ByteStream::from(data))
152 .send()
153 .await
154 .map_err(|e| StorageError::S3(e.to_string()))?;
155
156 Ok(())
157 }
158
159 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
160 self.get_bytes(key).await.map(|b| b.to_vec())
161 }
162
163 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> {
164 let resp = self
165 .client
166 .get_object()
167 .bucket(&self.bucket)
168 .key(key)
169 .send()
170 .await
171 .map_err(|e| StorageError::S3(e.to_string()))?;
172
173 let data = resp
174 .body
175 .collect()
176 .await
177 .map_err(|e| StorageError::S3(e.to_string()))?
178 .into_bytes();
179
180 Ok(data)
181 }
182
183 async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError> {
184 let range = format!("bytes=0-{}", size.saturating_sub(1));
185 let resp = self
186 .client
187 .get_object()
188 .bucket(&self.bucket)
189 .key(key)
190 .range(range)
191 .send()
192 .await
193 .map_err(|e| StorageError::S3(e.to_string()))?;
194
195 let data = resp
196 .body
197 .collect()
198 .await
199 .map_err(|e| StorageError::S3(e.to_string()))?
200 .into_bytes();
201
202 Ok(data)
203 }
204
205 async fn delete(&self, key: &str) -> Result<(), StorageError> {
206 self.client
207 .delete_object()
208 .bucket(&self.bucket)
209 .key(key)
210 .send()
211 .await
212 .map_err(|e| StorageError::S3(e.to_string()))?;
213
214 Ok(())
215 }
216
217 async fn put_stream(
218 &self,
219 key: &str,
220 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
221 ) -> Result<StreamUploadResult, StorageError> {
222 use futures::StreamExt;
223
224 let create_resp = self
225 .client
226 .create_multipart_upload()
227 .bucket(&self.bucket)
228 .key(key)
229 .send()
230 .await
231 .map_err(|e| StorageError::S3(format!("Failed to create multipart upload: {}", e)))?;
232
233 let upload_id = create_resp
234 .upload_id()
235 .ok_or_else(|| StorageError::S3("No upload ID returned".to_string()))?
236 .to_string();
237
238 let mut hasher = Sha256::new();
239 let mut total_size: u64 = 0;
240 let mut part_number = 1;
241 let mut completed_parts: Vec<CompletedPart> = Vec::new();
242 let mut buffer = Vec::with_capacity(MIN_PART_SIZE);
243
244 let upload_part = |client: &Client,
245 bucket: &str,
246 key: &str,
247 upload_id: &str,
248 part_num: i32,
249 data: Vec<u8>|
250 -> std::pin::Pin<
251 Box<dyn std::future::Future<Output = Result<CompletedPart, StorageError>> + Send>,
252 > {
253 let client = client.clone();
254 let bucket = bucket.to_string();
255 let key = key.to_string();
256 let upload_id = upload_id.to_string();
257 Box::pin(async move {
258 let resp = client
259 .upload_part()
260 .bucket(&bucket)
261 .key(&key)
262 .upload_id(&upload_id)
263 .part_number(part_num)
264 .body(ByteStream::from(data))
265 .send()
266 .await
267 .map_err(|e| StorageError::S3(format!("Failed to upload part: {}", e)))?;
268
269 let etag = resp
270 .e_tag()
271 .ok_or_else(|| StorageError::S3("No ETag returned for part".to_string()))?
272 .to_string();
273
274 Ok(CompletedPart::builder()
275 .part_number(part_num)
276 .e_tag(etag)
277 .build())
278 })
279 };
280
281 loop {
282 match stream.next().await {
283 Some(Ok(chunk)) => {
284 hasher.update(&chunk);
285 total_size += chunk.len() as u64;
286 buffer.extend_from_slice(&chunk);
287
288 if buffer.len() >= MIN_PART_SIZE {
289 let part_data =
290 std::mem::replace(&mut buffer, Vec::with_capacity(MIN_PART_SIZE));
291 let part = upload_part(
292 &self.client,
293 &self.bucket,
294 key,
295 &upload_id,
296 part_number,
297 part_data,
298 )
299 .await?;
300 completed_parts.push(part);
301 part_number += 1;
302 }
303 }
304 Some(Err(e)) => {
305 let _ = self
306 .client
307 .abort_multipart_upload()
308 .bucket(&self.bucket)
309 .key(key)
310 .upload_id(&upload_id)
311 .send()
312 .await;
313 return Err(StorageError::Io(e));
314 }
315 None => break,
316 }
317 }
318
319 if !buffer.is_empty() {
320 let part = upload_part(
321 &self.client,
322 &self.bucket,
323 key,
324 &upload_id,
325 part_number,
326 buffer,
327 )
328 .await?;
329 completed_parts.push(part);
330 }
331
332 if completed_parts.is_empty() {
333 let _ = self
334 .client
335 .abort_multipart_upload()
336 .bucket(&self.bucket)
337 .key(key)
338 .upload_id(&upload_id)
339 .send()
340 .await;
341 return Err(StorageError::Other("Empty upload".to_string()));
342 }
343
344 let completed_upload = CompletedMultipartUpload::builder()
345 .set_parts(Some(completed_parts))
346 .build();
347
348 self.client
349 .complete_multipart_upload()
350 .bucket(&self.bucket)
351 .key(key)
352 .upload_id(&upload_id)
353 .multipart_upload(completed_upload)
354 .send()
355 .await
356 .map_err(|e| StorageError::S3(format!("Failed to complete multipart upload: {}", e)))?;
357
358 let hash: [u8; 32] = hasher.finalize().into();
359 Ok(StreamUploadResult {
360 sha256_hash: hash,
361 size: total_size,
362 })
363 }
364
365 async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> {
366 let copy_source = format!("{}/{}", self.bucket, src_key);
367
368 self.client
369 .copy_object()
370 .bucket(&self.bucket)
371 .copy_source(©_source)
372 .key(dst_key)
373 .send()
374 .await
375 .map_err(|e| StorageError::S3(format!("Failed to copy object: {}", e)))?;
376
377 Ok(())
378 }
379}