Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at main 131 lines 4.4 kB view raw
1use crate::jobs::account_backup::AccountBackupJobContext; 2use crate::jobs::{account_backup, push_job_json}; 3use apalis::prelude::{Data, Error}; 4use jacquard_common::url::Url; 5use jacquard_common::xrpc::XrpcEndpoint; 6use serde::{Deserialize, Serialize}; 7use sqlx::{Pool, Postgres}; 8use std::sync::Arc; 9 10pub const JOB_NAMESPACE: &str = "apalis::PdsBackup"; 11 12#[derive(Debug, Deserialize, Serialize, Clone)] 13pub struct PdsBackupJobContext { 14 pub pds_host: String, 15} 16 17#[derive(Debug, Deserialize, Serialize, Clone)] 18struct ListReposResponseRepo { 19 pub did: String, 20 pub active: bool, 21 pub rev: String, 22} 23 24#[derive(Debug, Deserialize, Serialize, Clone)] 25struct ListReposResponse { 26 pub repos: Vec<ListReposResponseRepo>, 27 pub cursor: Option<String>, 28} 29 30/// Call com.atproto.sync.listRepos on the given PDS host, upsert active repos into accounts, 31/// and enqueue account_backup jobs for each active repo. 32pub async fn pds_backup_job( 33 job: PdsBackupJobContext, 34 pool: Data<Pool<Postgres>>, 35 atproto_client: Data<Arc<reqwest::Client>>, 36) -> Result<(), Error> { 37 let pds_host = format!("https://{}", job.pds_host.clone()); 38 log::info!("Starting a backup for the PDS: {}", pds_host); 39 40 // Mark the start time for this PDS backup to prevent duplicate scheduling and to record start time 41 sqlx::query( 42 r#" 43 UPDATE pds_hosts 44 SET last_backup_start = NOW() 45 WHERE pds_host = $1 46 "#, 47 ) 48 .bind(&job.pds_host) 49 .execute(&*pool) 50 .await 51 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 52 53 //Always expecting the host to be just the host name 54 let base_url = 55 Url::parse(&pds_host.clone()).map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 56 57 let mut cursor: Option<String> = None; 58 loop { 59 let mut url = format!( 60 "{}{}?limit={}", 61 base_url.as_str().trim_end_matches('/'), 62 jacquard_api::com_atproto::sync::list_repos::ListReposRequest::PATH, 63 1000 64 ); 65 if let Some(ref c) = cursor { 66 if !c.is_empty() { 67 url.push_str("&cursor="); 68 url.push_str(c); 69 } 70 } 71 72 let resp = atproto_client 73 .get(url) 74 .send() 75 .await 76 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))? 77 .json::<ListReposResponse>() 78 .await 79 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 80 81 // Filter active repos 82 let active_repos: Vec<(String, String)> = resp 83 .repos 84 .into_iter() 85 .filter(|r| r.active) 86 .map(|r| (r.did, r.rev)) 87 .collect(); 88 89 if !active_repos.is_empty() { 90 // Batch upsert accounts using UNNEST; preserve created_at and pds_sign_up on conflict. 91 //TODO may filter ones not signed up on the PDS? Be a new SQL query to get those turned on via the PDS? 92 let dids_with_revs: Vec<(String, String)> = active_repos 93 .iter() 94 .map(|(d, rev)| (d.clone(), rev.clone())) 95 .collect(); 96 let dids: Vec<String> = active_repos.iter().map(|(d, _)| d.clone()).collect(); 97 98 // Insert and set pds_sign_up=true only for new rows. Do nothing on conflict. 99 // We unnest only DIDs; pds_host is a single scalar projected for each row. 100 sqlx::query( 101 r#" 102 INSERT INTO accounts (did, pds_host, pds_sign_up) 103 SELECT did, $2::text AS pds_host, TRUE 104 FROM UNNEST($1::text[]) AS u(did) 105 ON CONFLICT (did) DO NOTHING 106 "#, 107 ) 108 .bind(&dids) 109 .bind(&job.pds_host) 110 .execute(&*pool) 111 .await 112 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 113 114 // Enqueue account_backup jobs for each active repo in this page 115 for (did, rev) in dids_with_revs.into_iter() { 116 let ctx = AccountBackupJobContext { 117 did, 118 pds_host: job.pds_host.clone(), 119 rev: Some(rev), 120 }; 121 push_job_json(&pool, account_backup::JOB_NAMESPACE, &ctx).await?; 122 } 123 } 124 cursor = resp.cursor; 125 if cursor.as_deref().map(|s| s.is_empty()).unwrap_or(true) { 126 break; 127 } 128 } 129 130 Ok(()) 131}