this repo has no description
1use async_trait::async_trait; 2use aws_config::BehaviorVersion; 3use aws_config::meta::region::RegionProviderChain; 4use aws_sdk_s3::Client; 5use aws_sdk_s3::primitives::ByteStream; 6use bytes::Bytes; 7use thiserror::Error; 8 9#[derive(Error, Debug)] 10pub enum StorageError { 11 #[error("IO error: {0}")] 12 Io(#[from] std::io::Error), 13 #[error("S3 error: {0}")] 14 S3(String), 15 #[error("Other: {0}")] 16 Other(String), 17} 18 19#[async_trait] 20pub trait BlobStorage: Send + Sync { 21 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError>; 22 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 23 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 24 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 25 async fn delete(&self, key: &str) -> Result<(), StorageError>; 26} 27 28pub struct S3BlobStorage { 29 client: Client, 30 bucket: String, 31} 32 33impl S3BlobStorage { 34 pub async fn new() -> Self { 35 let bucket = std::env::var("S3_BUCKET").expect("S3_BUCKET must be set"); 36 let client = create_s3_client().await; 37 Self { client, bucket } 38 } 39} 40 41async fn create_s3_client() -> Client { 42 let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); 43 44 let config = aws_config::defaults(BehaviorVersion::latest()) 45 .region(region_provider) 46 .load() 47 .await; 48 49 if let Ok(endpoint) = std::env::var("S3_ENDPOINT") { 50 let s3_config = aws_sdk_s3::config::Builder::from(&config) 51 .endpoint_url(endpoint) 52 .force_path_style(true) 53 .build(); 54 Client::from_conf(s3_config) 55 } else { 56 Client::new(&config) 57 } 58} 59 60pub struct BackupStorage { 61 client: Client, 62 bucket: String, 63} 64 65impl BackupStorage { 66 pub async fn new() -> Option<Self> { 67 let backup_enabled = std::env::var("BACKUP_ENABLED") 68 .map(|v| v != "false" && v != "0") 69 .unwrap_or(true); 70 71 if !backup_enabled { 72 return None; 73 } 74 75 let bucket = std::env::var("BACKUP_S3_BUCKET").ok()?; 76 let client = create_s3_client().await; 77 Some(Self { client, bucket }) 78 } 79 80 pub fn retention_count() -> u32 { 81 std::env::var("BACKUP_RETENTION_COUNT") 82 .ok() 83 .and_then(|v| v.parse().ok()) 84 .unwrap_or(7) 85 } 86 87 pub fn interval_secs() -> u64 { 88 std::env::var("BACKUP_INTERVAL_SECS") 89 .ok() 90 .and_then(|v| v.parse().ok()) 91 .unwrap_or(86400) 92 } 93 94 pub async fn put_backup( 95 &self, 96 did: &str, 97 rev: &str, 98 data: &[u8], 99 ) -> Result<String, StorageError> { 100 let key = format!("{}/{}.car", did, rev); 101 self.client 102 .put_object() 103 .bucket(&self.bucket) 104 .key(&key) 105 .body(ByteStream::from(Bytes::copy_from_slice(data))) 106 .send() 107 .await 108 .map_err(|e| { 109 crate::metrics::record_s3_operation("backup_put", "error"); 110 StorageError::S3(e.to_string()) 111 })?; 112 113 crate::metrics::record_s3_operation("backup_put", "success"); 114 Ok(key) 115 } 116 117 pub async fn get_backup(&self, storage_key: &str) -> Result<Bytes, StorageError> { 118 let resp = self 119 .client 120 .get_object() 121 .bucket(&self.bucket) 122 .key(storage_key) 123 .send() 124 .await 125 .map_err(|e| { 126 crate::metrics::record_s3_operation("backup_get", "error"); 127 StorageError::S3(e.to_string()) 128 })?; 129 130 let data = resp 131 .body 132 .collect() 133 .await 134 .map_err(|e| { 135 crate::metrics::record_s3_operation("backup_get", "error"); 136 StorageError::S3(e.to_string()) 137 })? 138 .into_bytes(); 139 140 crate::metrics::record_s3_operation("backup_get", "success"); 141 Ok(data) 142 } 143 144 pub async fn delete_backup(&self, storage_key: &str) -> Result<(), StorageError> { 145 self.client 146 .delete_object() 147 .bucket(&self.bucket) 148 .key(storage_key) 149 .send() 150 .await 151 .map_err(|e| { 152 crate::metrics::record_s3_operation("backup_delete", "error"); 153 StorageError::S3(e.to_string()) 154 })?; 155 156 crate::metrics::record_s3_operation("backup_delete", "success"); 157 Ok(()) 158 } 159} 160 161#[async_trait] 162impl BlobStorage for S3BlobStorage { 163 async fn put(&self, key: &str, data: &[u8]) -> Result<(), StorageError> { 164 self.put_bytes(key, Bytes::copy_from_slice(data)).await 165 } 166 167 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError> { 168 let result = self 169 .client 170 .put_object() 171 .bucket(&self.bucket) 172 .key(key) 173 .body(ByteStream::from(data)) 174 .send() 175 .await 176 .map_err(|e| StorageError::S3(e.to_string())); 177 178 match &result { 179 Ok(_) => crate::metrics::record_s3_operation("put", "success"), 180 Err(_) => crate::metrics::record_s3_operation("put", "error"), 181 } 182 183 result?; 184 Ok(()) 185 } 186 187 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> { 188 self.get_bytes(key).await.map(|b| b.to_vec()) 189 } 190 191 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError> { 192 let resp = self 193 .client 194 .get_object() 195 .bucket(&self.bucket) 196 .key(key) 197 .send() 198 .await 199 .map_err(|e| { 200 crate::metrics::record_s3_operation("get", "error"); 201 StorageError::S3(e.to_string()) 202 })?; 203 204 let data = resp 205 .body 206 .collect() 207 .await 208 .map_err(|e| { 209 crate::metrics::record_s3_operation("get", "error"); 210 StorageError::S3(e.to_string()) 211 })? 212 .into_bytes(); 213 214 crate::metrics::record_s3_operation("get", "success"); 215 Ok(data) 216 } 217 218 async fn delete(&self, key: &str) -> Result<(), StorageError> { 219 let result = self 220 .client 221 .delete_object() 222 .bucket(&self.bucket) 223 .key(key) 224 .send() 225 .await 226 .map_err(|e| StorageError::S3(e.to_string())); 227 228 match &result { 229 Ok(_) => crate::metrics::record_s3_operation("delete", "success"), 230 Err(_) => crate::metrics::record_s3_operation("delete", "error"), 231 } 232 233 result?; 234 Ok(()) 235 } 236}