this repo has no description
1use async_trait::async_trait;
2use aws_config::BehaviorVersion;
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::Client;
5use aws_sdk_s3::primitives::ByteStream;
6use bytes::Bytes;
7use thiserror::Error;
8#[derive(Error, Debug)]
9pub enum StorageError {
10 #[error("IO error: {0}")]
11 Io(#[from] std::io::Error),
12 #[error("S3 error: {0}")]
13 S3(String),
14 #[error("Other: {0}")]
15 Other(String),
16}
17#[async_trait]
18pub trait BlobStorage: Send + Sync {
19 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>;
20 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>;
21 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>;
22 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>;
23 async fn delete(&self, key: &str) -> Result<(), StorageError>;
24}
25pub struct S3BlobStorage {
26 client: Client,
27 bucket: String,
28}
29impl S3BlobStorage {
30 pub async fn new() -> Self {
31 // heheheh
32 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
33 let config = aws_config::defaults(BehaviorVersion::latest())
34 .region(region_provider)
35 .load()
36 .await;
37 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set");
38 let client = if let Ok(endpoint) = std::env::var("S3_ENDPOINT") {
39 let s3_config = aws_sdk_s3::config::Builder::from(&config)
40 .endpoint_url(endpoint)
41 .force_path_style(true)
42 .build();
43 Client::from_conf(s3_config)
44 } else {
45 Client::new(&config)
46 };
47 Self { client, bucket }
48 }
49}
50#[async_trait]
51impl BlobStorage for S3BlobStorage {
52 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> {
53 self.put_bytes(key, Bytes::copy_from_slice(data)).await
54 }
55 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
56 let result = self.client
57 .put_object()
58 .bucket(&self.bucket)
59 .key(key)
60 .body(ByteStream::from(data))
61 .send()
62 .await
63 .map_err(|e| StorageError::S3(e.to_string()));
64 match &result {
65 Ok(_) => crate::metrics::record_s3_operation("put", "success"),
66 Err(_) => crate::metrics::record_s3_operation("put", "error"),
67 }
68 result?;
69 Ok(())
70 }
71 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
72 self.get_bytes(key).await.map(|b| b.to_vec())
73 }
74 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> {
75 let resp = self
76 .client
77 .get_object()
78 .bucket(&self.bucket)
79 .key(key)
80 .send()
81 .await
82 .map_err(|e| {
83 crate::metrics::record_s3_operation("get", "error");
84 StorageError::S3(e.to_string())
85 })?;
86 let data = resp
87 .body
88 .collect()
89 .await
90 .map_err(|e| {
91 crate::metrics::record_s3_operation("get", "error");
92 StorageError::S3(e.to_string())
93 })?
94 .into_bytes();
95 crate::metrics::record_s3_operation("get", "success");
96 Ok(data)
97 }
98 async fn delete(&self, key: &str) -> Result<(), StorageError> {
99 let result = self.client
100 .delete_object()
101 .bucket(&self.bucket)
102 .key(key)
103 .send()
104 .await
105 .map_err(|e| StorageError::S3(e.to_string()));
106 match &result {
107 Ok(_) => crate::metrics::record_s3_operation("delete", "success"),
108 Err(_) => crate::metrics::record_s3_operation("delete", "error"),
109 }
110 result?;
111 Ok(())
112 }
113}