Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at main 99 lines 3.0 kB view raw
1use crate::jobs::upload_blob::{BlobType, UploadBlobJobContext}; 2use crate::jobs::{AnyhowErrorWrapper, filter_missing_blob_cids, push_job_json, upload_blob}; 3use apalis::prelude::{Data, Error}; 4use jacquard_api::com_atproto::sync::list_blobs::ListBlobsRequest; 5use jacquard_common::xrpc::XrpcEndpoint; 6use serde::{Deserialize, Serialize}; 7use sqlx::{Pool, Postgres}; 8use std::sync::Arc; 9 10pub const JOB_NAMESPACE: &str = "apalis::AccountBackup"; 11 12#[derive(Debug, Deserialize, Serialize, Clone)] 13pub struct AccountBackupJobContext { 14 pub did: String, 15 // If the job start comes from the PDS backup this is already set saving a request to the PDS 16 pub rev: Option<String>, 17 pub pds_host: String, 18} 19 20#[derive(Deserialize)] 21struct ListBlobsResponse { 22 #[allow(dead_code)] 23 cursor: Option<String>, 24 cids: Vec<String>, 25} 26 27pub async fn account_backup_job( 28 job: AccountBackupJobContext, 29 pool: Data<Pool<Postgres>>, 30 atproto_client: Data<Arc<reqwest::Client>>, 31) -> Result<(), Error> { 32 log::info!("Starting backup for did: {}", job.did); 33 34 push_job_json( 35 &pool, 36 upload_blob::JOB_NAMESPACE, 37 &UploadBlobJobContext { 38 account_backup_job_context: job.clone(), 39 blob_type: BlobType::Repo(job.rev.clone()), 40 last_upload_batch: false, 41 }, 42 ) 43 .await?; 44 45 let mut cursor: Option<String> = None; 46 loop { 47 let mut url = format!( 48 "https://{}{}?did={}&limit={}", 49 job.pds_host, 50 ListBlobsRequest::PATH, 51 job.did, 52 1000 53 ); 54 55 if let Some(ref c) = cursor { 56 if !c.is_empty() { 57 url.push_str("&cursor="); 58 url.push_str(c); 59 } 60 } 61 62 let resp = atproto_client 63 .get(url) 64 .send() 65 .await 66 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))? 67 .json::<ListBlobsResponse>() 68 .await 69 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 70 71 let missing_cids = filter_missing_blob_cids(&pool, &resp.cids, &job.did) 72 .await 73 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?; 74 75 // Process missing CIDs in batches of 5 76 let mut processed = 0; 77 78 for chunk in missing_cids.chunks(5) { 79 processed += chunk.len(); 80 let last_chunk = processed >= missing_cids.len(); 81 push_job_json( 82 &pool, 83 upload_blob::JOB_NAMESPACE, 84 &UploadBlobJobContext { 85 account_backup_job_context: job.clone(), 86 blob_type: BlobType::Blob(chunk.to_vec()), 87 last_upload_batch: last_chunk, 88 }, 89 ) 90 .await?; 91 } 92 93 cursor = resp.cursor; 94 if cursor.is_none() || cursor.as_ref().unwrap().is_empty() { 95 break; 96 } 97 } 98 Ok(()) 99}