Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at main 215 lines 8.3 kB view raw
1use crate::jobs::account_backup::AccountBackupJobContext; 2use crate::jobs::{account_backup, pds_backup, push_job_json}; 3use apalis::prelude::{Data, Error}; 4use jacquard::{client::BasicClient, identity::PublicResolver, prelude::IdentityResolver, url}; 5use jacquard_api::com_atproto::sync::get_repo_status::GetRepoStatus; 6use jacquard_common::{types::did::Did, types::tid::Tid, xrpc::XrpcClient}; 7use serde::{Deserialize, Serialize}; 8use sqlx::{Pool, Postgres}; 9use std::sync::Arc; 10 11pub const JOB_NAMESPACE: &str = "apalis::ScheduledBackUpStart"; 12 13#[derive(Debug, Deserialize, Serialize, Clone)] 14pub struct ScheduledBackUpStartJobContext; 15 16/// Finds the new PDS and saves it in the database if the user has changed PDSs since signing up 17async fn find_new_pds_host(pool: &Pool<Postgres>, did: String) -> Option<String> { 18 let resolver = PublicResolver::default(); 19 let did_string = did.clone(); 20 let did = match Did::new(did.as_str()) { 21 Ok(did) => did, 22 Err(err) => { 23 log::error!("Failed to parse did {did} : {err}"); 24 return None; 25 } 26 }; 27 28 let did_doc = match resolver.resolve_did_doc_owned(&did).await { 29 Ok(did_doc) => did_doc, 30 Err(err) => { 31 log::error!("Failed to resolve did doc for {did} : {err}"); 32 return None; 33 } 34 }; 35 36 match did_doc.pds_endpoint() { 37 None => None, 38 Some(url) => match url.host_str() { 39 None => { 40 log::error!("Failed to parse pds_endpoint for {did} : {url}"); 41 return None; 42 } 43 Some(host) => { 44 //I may of copied and pasted this to get it going <.< who needs dry in all this snow 45 let result = sqlx::query( 46 "UPDATE accounts SET pds_host = $2 WHERE did = $1 AND pds_host <> $2", 47 ) 48 .bind(did_string) 49 .bind(host) 50 .execute(pool) 51 .await; 52 if let Err(e) = result { 53 log::error!("Failed to update pds_host for {did} : {e}"); 54 } 55 // Ok(result.rows_affected()) 56 Some(host.to_string()) 57 } 58 }, 59 } 60} 61 62/// This scheduled job finds: 63/// - accounts that have not been backed up in the last 2 hours and have pds_sign_up = false, 64/// and enqueues AccountBackup jobs for them; 65/// - pds_hosts that are active and have not started a backup in the last 2 hours (tracked via 66/// pds_hosts.last_backup_start), and enqueues PdsBackup jobs for each. 67pub async fn scheduled_back_up_start_job( 68 _job: ScheduledBackUpStartJobContext, 69 pool: Data<Pool<Postgres>>, 70) -> Result<(), Error> { 71 log::info!("Starting a backup for the whole instance"); 72 // Record the start of a whole-network backup run 73 sqlx::query(r#"INSERT INTO network_backup_runs DEFAULT VALUES"#) 74 .execute(&*pool) 75 .await 76 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 77 // 1) Query accounts needing backup 78 // Condition: pds_sign_up = false AND (last_backup is NULL OR older than 6h) 79 // We include did and pds_host to build AccountBackupJobContext 80 let accounts: Vec<(String, String)> = sqlx::query_as( 81 r#" 82 SELECT did, pds_host 83 FROM accounts 84 WHERE pds_sign_up = FALSE 85 AND (last_backup IS NULL OR last_backup < NOW() - INTERVAL '2 HOURS') 86 "#, 87 ) 88 .fetch_all(&*pool) 89 .await 90 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 91 92 //TODO maybe check here? 93 let agent = BasicClient::unauthenticated(); 94 for (did, pds_host) in accounts { 95 //This is a holder we can change the PDS if we need to 96 let mut pds_host_to_call = pds_host.clone(); 97 98 //Need to do matches here cause I don't want to fail the whole loop 99 let pds_url = match url::Url::parse(&format!("https://{}", pds_host)) { 100 Ok(url) => url, 101 Err(e) => { 102 log::warn!("Failed to parse pds_host: {pds_host} \n {e}"); 103 continue; 104 } 105 }; 106 107 //Sets the agent to the new PDS URL 108 agent.set_base_uri(pds_url).await; 109 110 let request = GetRepoStatus { 111 //May just leave this ? 112 did: did.clone().parse().map_err(|_| { 113 Error::Failed(Arc::new(Box::new(std::io::Error::new( 114 std::io::ErrorKind::Other, 115 "Failed to parse did", 116 )))) 117 })?, 118 }; 119 120 let rev: Option<Tid> = match agent.send(request).await { 121 Ok(response) => match response.parse() { 122 Ok(output) => { 123 if output.active { 124 output.rev 125 } else { 126 match output.status { 127 None => { 128 log::warn!( 129 "{did} has no status and not active. Not sure if this happens" 130 ); 131 continue; 132 } 133 Some(status) => { 134 if status == "deactivated" { 135 log::info!( 136 "{did} has been deactivated on this PDS. Let's see if they moved." 137 ); 138 match find_new_pds_host(&pool, did.clone()).await { 139 None => continue, 140 Some(new_host) => { 141 log::info!( 142 "{did} has moved to {new_host}. Database updated." 143 ); 144 pds_host_to_call = new_host; 145 } 146 } 147 //As long as it's successful, we can just send along None for the sub job to pick up the new rev on the new PDS 148 None 149 } else { 150 //TODO will most likely set accounts to deactivated as i collect more data here 151 log::warn!("{did} has the repo status {status}."); 152 continue; 153 } 154 } 155 } 156 } 157 } 158 Err(err) => { 159 log::warn!("Failed to parse GetRepoStatusResponse for: {did} \n {err}"); 160 continue; 161 } 162 }, 163 Err(err) => { 164 log::warn!("Failed to send GetRepoStatusRequest for: {did} \n {err}"); 165 continue; 166 } 167 }; 168 169 let rev_string: Option<String> = rev.map(|r| r.to_string()); 170 171 let ctx = AccountBackupJobContext { 172 did, 173 pds_host: pds_host_to_call, 174 rev: rev_string, 175 }; 176 177 push_job_json(&pool, account_backup::JOB_NAMESPACE, &ctx).await?; 178 } 179 180 // 3) Query pds_hosts needing backup start 181 // Condition: active = TRUE AND (last_backup_start is NULL OR older than 24h) 182 let pds_hosts: Vec<String> = sqlx::query_scalar( 183 r#" 184 SELECT pds_host 185 FROM pds_hosts 186 WHERE active = TRUE 187 AND (last_backup_start IS NULL OR last_backup_start < NOW() - INTERVAL '2 HOURS') 188 "#, 189 ) 190 .fetch_all(&*pool) 191 .await 192 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 193 194 // To avoid racing / double-enqueue, mark last_backup_start immediately for those we are about to enqueue 195 if !pds_hosts.is_empty() { 196 sqlx::query( 197 r#" 198 UPDATE pds_hosts 199 SET last_backup_start = NOW() 200 WHERE pds_host = ANY($1) 201 "#, 202 ) 203 .bind(&pds_hosts) 204 .execute(&*pool) 205 .await 206 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 207 } 208 209 for host in pds_hosts { 210 let ctx = crate::jobs::pds_backup::PdsBackupJobContext { pds_host: host }; 211 push_job_json(&pool, pds_backup::JOB_NAMESPACE, &ctx).await?; 212 } 213 214 Ok(()) 215}