Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at main 322 lines 12 kB view raw
1use crate::db::models; 2use crate::jobs::account_backup::AccountBackupJobContext; 3use crate::jobs::{ 4 AnyhowErrorWrapper, get_repo_rev_by_did, record_new_blob, update_last_backup_now_by_did, 5 update_repo_rev_by_did, 6}; 7use crate::storage::{blob_backup_path, repo_backup_path}; 8use apalis::prelude::{Data, Error}; 9use async_compression::futures::bufread::ZstdEncoder; 10use futures::TryStreamExt; 11use reqwest::Client; 12use reqwest::header::{ACCEPT, CONTENT_TYPE}; 13use s3::Bucket; 14use serde::{Deserialize, Serialize}; 15use sqlx::{Pool, Postgres}; 16use std::sync::Arc; 17use tokio_util::compat::FuturesAsyncReadCompatExt; 18 19pub const JOB_NAMESPACE: &str = "apalis::UploadBlob"; 20 21#[derive(Serialize, Deserialize, Debug, Clone)] 22pub struct XrpcError { 23 error: String, 24 message: Option<String>, 25} 26 27impl std::fmt::Display for XrpcError { 28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 29 match &self.message { 30 Some(msg) => write!(f, "{}: {}", self.error, msg), 31 None => write!(f, "{}", self.error), 32 } 33 } 34} 35 36#[derive(Debug, Deserialize, Serialize, Clone)] 37pub enum BlobType { 38 ///The string here is the rev. If it's from the pds backup start it already has the rev saving a web call 39 Repo(Option<String>), 40 //The vec is the cids of the blobs to pull 41 Blob(Vec<String>), 42} 43 44#[derive(Debug, Deserialize, Serialize, Clone)] 45struct AtProtoRepoStatusResponse { 46 pub rev: String, 47} 48 49#[derive(Debug, Deserialize, Serialize, Clone)] 50pub struct UploadBlobJobContext { 51 pub account_backup_job_context: AccountBackupJobContext, 52 pub blob_type: BlobType, 53 /// If set to true, this is the last batch of blobs to upload for this account and can mark the job as complete. 54 pub last_upload_batch: bool, 55} 56 57pub async fn upload_blob_job( 58 job: UploadBlobJobContext, 59 pool: Data<Pool<Postgres>>, 60 atproto_client: Data<Arc<reqwest::Client>>, 61 s3_client: Data<Arc<Box<Bucket>>>, 62) -> Result<(), Error> { 63 let did = job.account_backup_job_context.did.clone(); 64 let pds_host = job.account_backup_job_context.pds_host.clone(); 65 66 match job.blob_type { 67 BlobType::Repo(rev) => { 68 let current_rev = match rev { 69 None => { 70 //todo Prob just always pass in a rev? induivial uplaods do this anyhow 71 let url = format!( 72 "https://{}/xrpc/com.atproto.sync.getRepoStatus?did={}", 73 pds_host, did, 74 ); 75 76 let resp = atproto_client 77 .get(url) 78 .send() 79 .await 80 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))? 81 .json::<AtProtoRepoStatusResponse>() 82 .await 83 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 84 resp.rev.clone() 85 } 86 Some(rev) => rev, 87 }; 88 89 //TODO this is fialing on reruns since it's not being updated 90 let current_backed_up_rev = 91 get_repo_rev_by_did(&pool, &job.account_backup_job_context.did) 92 .await 93 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))? 94 //I'm cool with this unwrap since it should fail the if check 95 .unwrap_or("".to_string()); 96 if current_backed_up_rev == current_rev { 97 log::info!("Repo is already backed up at the rev: {}", current_rev); 98 update_last_backup_now_by_did(&pool, did.as_str()) 99 .await 100 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?; 101 return Ok(()); 102 } 103 104 let repo_url = format!( 105 "https://{}/xrpc/com.atproto.sync.getRepo?did={}", 106 pds_host, did 107 ); 108 109 match download_compress_and_upload_blob( 110 repo_backup_path(did.clone()), 111 &atproto_client, 112 &s3_client, 113 repo_url, 114 models::BlobType::Repo, 115 ) 116 .await 117 { 118 Ok(size) => { 119 record_new_blob( 120 &pool, 121 did.clone(), 122 current_rev.clone(), 123 //TODO bad 124 size.try_into().unwrap(), 125 models::BlobType::Repo, 126 ) 127 .await 128 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?; 129 130 update_repo_rev_by_did(&pool, &did, &current_rev) 131 .await 132 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?; 133 } 134 Err(e) => { 135 return match e { 136 DownloadCompressAndUploadError::AnyError(e) => Err(e), 137 DownloadCompressAndUploadError::BlobDownloadError => { 138 Err(Error::Failed(Arc::new( 139 anyhow::anyhow!("Error downloading the repo for: {did}").into(), 140 ))) 141 } 142 DownloadCompressAndUploadError::BlobNotFound => { 143 log::warn!("Repo not found, skipping backup"); 144 Err(Error::Failed(Arc::new(Box::new(std::io::Error::new( 145 std::io::ErrorKind::NotFound, 146 "Repo not found", 147 ))))) 148 } 149 }; 150 } 151 }; 152 } 153 154 BlobType::Blob(cids) => { 155 // log::info!("Uploading {:?} blobs for {}", cids.len(), did); 156 for cid in cids { 157 let blob_url = format!( 158 "https://{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", 159 pds_host, did, cid 160 ); 161 162 match download_compress_and_upload_blob( 163 blob_backup_path(did.clone(), cid.clone()), 164 &atproto_client, 165 &s3_client, 166 blob_url, 167 models::BlobType::Blob, 168 ) 169 .await 170 { 171 Ok(blob_size) => { 172 record_new_blob( 173 &pool, 174 did.clone(), 175 cid, 176 blob_size.try_into().unwrap(), 177 models::BlobType::Blob, 178 ) 179 .await 180 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?; 181 } 182 Err(err) => match err { 183 DownloadCompressAndUploadError::AnyError(err) => { 184 return Err(err); 185 } 186 DownloadCompressAndUploadError::BlobNotFound => { 187 // Record missing blob so it can be retried or inspected later. On conflict, do nothing. 188 if let Err(e) = sqlx::query( 189 "INSERT INTO missing_blobs (did, cid, created_date) VALUES ($1, $2, now()) ON CONFLICT DO NOTHING", 190 ) 191 .bind(&did) 192 .bind(&cid) 193 .execute(&*pool) 194 .await 195 { 196 log::warn!("Failed to record missing blob {cid} for {did}: {e}"); 197 } 198 log::warn!("Blob: {cid} not found for: {did}"); 199 } 200 DownloadCompressAndUploadError::BlobDownloadError => { 201 //Silently ignoring atm not to mess with the chunk 202 } 203 }, 204 } 205 } 206 207 if job.last_upload_batch { 208 log::info!("backup completed for {}", did); 209 update_last_backup_now_by_did(&pool, did.as_str()) 210 .await 211 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?; 212 } 213 } 214 } 215 216 Ok(()) 217} 218 219pub enum DownloadCompressAndUploadError { 220 AnyError(Error), 221 BlobNotFound, 222 BlobDownloadError, 223} 224 225async fn download_compress_and_upload_blob( 226 object_key: String, 227 atproto_client: &Data<Arc<Client>>, 228 s3_client: &Data<Arc<Box<Bucket>>>, 229 repo_url: String, 230 blob_type: models::BlobType, 231) -> Result<usize, DownloadCompressAndUploadError> { 232 let accept_type = match blob_type { 233 models::BlobType::Repo => "application/vnd.ipld.car", 234 _ => "*/*", 235 }; 236 237 let response = atproto_client 238 .get(repo_url.clone()) 239 .header(ACCEPT, accept_type) 240 .send() 241 .await 242 .map_err(|e| { 243 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e)))) 244 })?; 245 let response_status = response.status(); 246 247 if !response_status.is_success() { 248 let error_body = response.json::<XrpcError>().await.map_err(|e| { 249 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e)))) 250 })?; 251 if error_body.error == "InvalidRequest" { 252 if let Some(message) = error_body.message.as_deref() { 253 if message == "Blob not found" { 254 return Err(DownloadCompressAndUploadError::BlobNotFound); 255 } 256 } 257 } 258 // response.error_for_status().map_err(|e| { 259 // DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e)))) 260 // })?; 261 return Err(DownloadCompressAndUploadError::AnyError(Error::Failed( 262 Arc::new( 263 anyhow::anyhow!("Error downloading the blob: {response_status} {repo_url}").into(), 264 ), 265 ))); 266 } 267 268 // Derive content type from the blob response headers; default to octet-stream 269 // Should also get the car type 270 let content_type = response 271 .headers() 272 .get(CONTENT_TYPE) 273 .and_then(|v| v.to_str().ok()) 274 .unwrap_or("application/octet-stream") 275 .to_string(); 276 277 let blob_reader = response 278 .bytes_stream() 279 .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e)) 280 .into_async_read(); 281 282 let zstd_encoder = ZstdEncoder::new(blob_reader); 283 let mut zstd_tokio_reader = zstd_encoder.compat(); 284 285 match s3_client 286 .put_object_stream_builder(object_key.as_str()) 287 .with_content_type(content_type) 288 .with_content_encoding("zstd") 289 .map_err(|e| { 290 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e)))) 291 })? 292 .execute_stream(&mut zstd_tokio_reader) 293 .await 294 { 295 Ok(result) => { 296 log::debug!("Uploaded: {}", object_key); 297 Ok(result.uploaded_bytes().try_into().unwrap()) 298 } 299 Err(err) => { 300 log::warn!("Failed to upload: {}: {}, trying again.", object_key, err); 301 //Try again but with a more basic content type. Usually happens when a user has an odd blob that is not supported by the s3. Like an html page 302 let put_result = s3_client 303 .put_object_stream_builder(&object_key) 304 .with_content_type("application/octet-stream") 305 .with_content_encoding("zstd") 306 .map_err(|e| { 307 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e)))) 308 })? 309 .execute_stream(&mut zstd_tokio_reader) 310 .await; 311 match put_result { 312 Ok(result) => { 313 log::debug!("Uploaded: {}", object_key); 314 Ok(result.uploaded_bytes().try_into().unwrap()) 315 } 316 Err(err) => Err(DownloadCompressAndUploadError::AnyError(Error::Failed( 317 Arc::new(Box::new(err)), 318 ))), 319 } 320 } 321 } 322}