Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow

detect new pds

+85 -27
+1 -1
Cargo.toml
··· 14 14 jacquard = "0.9.5" 15 15 jacquard-axum = "0.9.6" 16 16 jacquard-common = "0.9.5" 17 - jacquard-identity = "0.9.5" 17 + jacquard-identity = { version = "0.9.5", features = ["dns", "cache"] } 18 18 jacquard-api = "0.9.5" 19 19 apalis = { version = "0.7.3", features = ["retry", "limit", "layers", ] } 20 20 #Manually brought in migrations cause I was fighting them
+84 -26
shared/src/jobs/scheduled_back_up_start.rs
··· 1 1 use crate::jobs::account_backup::AccountBackupJobContext; 2 2 use crate::jobs::{account_backup, pds_backup, push_job_json}; 3 3 use apalis::prelude::{Data, Error}; 4 - use jacquard::client::{AgentSessionExt, BasicClient}; 5 - use jacquard::url; 6 - use jacquard_api::app_bsky::contact::match_and_contact_index_state::members::r#match; 7 - use jacquard_api::com_atproto::sync::get_repo_status::{ 8 - GetRepoStatus, GetRepoStatusError, GetRepoStatusOutput, 9 - }; 10 - use jacquard_common::CowStr; 11 - use jacquard_common::error::XrpcResult; 12 - use jacquard_common::xrpc::{XrpcClient, XrpcError}; 13 - use lexicon_types_crate::com_pdsmoover::backup::get_repo_status::GetRepoStatusRequest; 4 + use jacquard::{client::BasicClient, identity::PublicResolver, prelude::IdentityResolver, url}; 5 + use jacquard_api::com_atproto::sync::get_repo_status::GetRepoStatus; 6 + use jacquard_common::{types::did::Did, types::tid::Tid, xrpc::XrpcClient}; 14 7 use serde::{Deserialize, Serialize}; 15 8 use sqlx::{Pool, Postgres}; 16 9 use std::sync::Arc; ··· 20 13 #[derive(Debug, Deserialize, Serialize, Clone)] 21 14 pub struct ScheduledBackUpStartJobContext; 22 15 16 + /// Finds the new PDS and saves it in the database if the user has changed PDSs since signing up 17 + async fn find_new_pds_host(pool: &Pool<Postgres>, did: String) -> Option<String> { 18 + let resolver = PublicResolver::default(); 19 + let did_string = did.clone(); 20 + let did = match Did::new(did.as_str()) { 21 + Ok(did) => did, 22 + Err(err) => { 23 + log::error!("Failed to parse did {did} : {err}"); 24 + return None; 25 + } 26 + }; 27 + 28 + let did_doc = match resolver.resolve_did_doc_owned(&did).await { 29 + Ok(did_doc) => did_doc, 30 + Err(err) => { 31 + log::error!("Failed to resolve did doc for {did} : {err}"); 32 + return None; 33 + } 34 + }; 35 + 36 + match did_doc.pds_endpoint() { 37 + None => None, 38 + Some(url) => match url.host_str() { 39 + None => { 40 + log::error!("Failed to parse pds_endpoint for {did} : {url}"); 41 + return None; 42 + } 43 + Some(host) => { 44 + //I may of copied and pasted this to get it going <.< who needs dry in all this snow 45 + let result = sqlx::query( 46 + "UPDATE accounts SET pds_host = $2 WHERE did = $1 AND pds_host <> $2", 47 + ) 48 + .bind(did_string) 49 + .bind(host) 50 + .execute(pool) 51 + .await; 52 + if let Err(e) = result { 53 + log::error!("Failed to update pds_host for {did} : {e}"); 54 + } 55 + // Ok(result.rows_affected()) 56 + Some(host.to_string()) 57 + } 58 + }, 59 + } 60 + } 61 + 23 62 /// This scheduled job finds: 24 63 /// - accounts that have not been backed up in the last 6 hours and have pds_sign_up = false, 25 64 /// and enqueues AccountBackup jobs for them; ··· 53 92 //TODO maybe check here? 54 93 let agent = BasicClient::unauthenticated(); 55 94 for (did, pds_host) in accounts { 95 + //This is a holder we can change the PDS if we need to 96 + let mut pds_host_to_call = pds_host.clone(); 97 + 56 98 //Need to do matches here cause I don't want to fail the whole loop 57 99 let pds_url = match url::Url::parse(&format!("https://{}", pds_host)) { 58 100 Ok(url) => url, ··· 75 117 })?, 76 118 }; 77 119 78 - let rev = match agent.send(request).await { 120 + let rev: Option<Tid> = match agent.send(request).await { 79 121 Ok(response) => match response.parse() { 80 122 Ok(output) => { 123 + log::info!("{output:?}"); 81 124 if output.active { 82 - output.rev; 83 - } 84 - match output.status { 85 - None => { 86 - log::warn!( 87 - "{did} has no status and not active. Not sure if this happens" 88 - ); 89 - continue; 90 - } 91 - Some(status) => { 92 - if status == "deactivated" { 93 - //TODO do that call to update the PDS for the user 94 - } else { 95 - log::warn!("{did} has the repo status {status}."); 125 + output.rev 126 + } else { 127 + match output.status { 128 + None => { 129 + log::warn!( 130 + "{did} has no status and not active. Not sure if this happens" 131 + ); 96 132 continue; 97 133 } 134 + Some(status) => { 135 + if status == "deactivated" { 136 + log::info!( 137 + "{did} has been deactivated on this PDS. Let's see if they moved." 138 + ); 139 + match find_new_pds_host(&pool, did.clone()).await { 140 + None => continue, 141 + Some(new_host) => { 142 + log::info!( 143 + "{did} has moved to {new_host}. Database updated." 144 + ); 145 + pds_host_to_call = new_host; 146 + } 147 + } 148 + //As long as it's successful, we can just send along None for the sub job to pick up the new rev on the new PDS 149 + None 150 + } else { 151 + //TODO will most likely set accounts to deactivated as i collect more data here 152 + log::warn!("{did} has the repo status {status}."); 153 + continue; 154 + } 155 + } 98 156 } 99 157 } 100 158 } ··· 113 171 114 172 let ctx = AccountBackupJobContext { 115 173 did, 116 - pds_host, 174 + pds_host: pds_host_to_call, 117 175 rev: rev_string, 118 176 }; 119 177