Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow

detect new pds

authored by baileytownsend.dev and committed by tangled.org 610c6943 866a5886

+85 -27
+1 -1
Cargo.toml
··· 14 14 jacquard = "0.9.5" 15 15 jacquard-axum = "0.9.6" 16 16 jacquard-common = "0.9.5" 17 - jacquard-identity = "0.9.5" 17 + jacquard-identity = { version = "0.9.5", features = ["dns", "cache"] } 18 18 jacquard-api = "0.9.5" 19 19 apalis = { version = "0.7.3", features = ["retry", "limit", "layers", ] } 20 20 #Manually brought in migrations cause I was fighting them
+84 -26
shared/src/jobs/scheduled_back_up_start.rs
··· 1 1 use crate::jobs::account_backup::AccountBackupJobContext; 2 2 use crate::jobs::{account_backup, pds_backup, push_job_json}; 3 3 use apalis::prelude::{Data, Error}; 4 - use jacquard::client::{AgentSessionExt, BasicClient}; 5 - use jacquard::url; 6 - use jacquard_api::app_bsky::contact::match_and_contact_index_state::members::r#match; 7 - use jacquard_api::com_atproto::sync::get_repo_status::{ 8 - GetRepoStatus, GetRepoStatusError, GetRepoStatusOutput, 9 - }; 10 - use jacquard_common::CowStr; 11 - use jacquard_common::error::XrpcResult; 12 - use jacquard_common::xrpc::{XrpcClient, XrpcError}; 13 - use lexicon_types_crate::com_pdsmoover::backup::get_repo_status::GetRepoStatusRequest; 4 + use jacquard::{client::BasicClient, identity::PublicResolver, prelude::IdentityResolver, url}; 5 + use jacquard_api::com_atproto::sync::get_repo_status::GetRepoStatus; 6 + use jacquard_common::{types::did::Did, types::tid::Tid, xrpc::XrpcClient}; 14 7 use serde::{Deserialize, Serialize}; 15 8 use sqlx::{Pool, Postgres}; 16 9 use std::sync::Arc; ··· 20 13 #[derive(Debug, Deserialize, Serialize, Clone)] 21 14 pub struct ScheduledBackUpStartJobContext; 22 15 16 + /// Finds the new PDS and saves it in the database if the user has changed PDSs since signing up 17 + async fn find_new_pds_host(pool: &Pool<Postgres>, did: String) -> Option<String> { 18 + let resolver = PublicResolver::default(); 19 + let did_string = did.clone(); 20 + let did = match Did::new(did.as_str()) { 21 + Ok(did) => did, 22 + Err(err) => { 23 + log::error!("Failed to parse did {did} : {err}"); 24 + return None; 25 + } 26 + }; 27 + 28 + let did_doc = match resolver.resolve_did_doc_owned(&did).await { 29 + Ok(did_doc) => did_doc, 30 + Err(err) => { 31 + log::error!("Failed to resolve did doc for {did} : {err}"); 32 + return None; 33 + } 34 + }; 35 + 36 + match did_doc.pds_endpoint() { 37 + None => None, 38 + Some(url) => match url.host_str() { 39 + None => { 40 + log::error!("Failed to parse pds_endpoint for {did} : {url}"); 41 + return None; 42 + } 43 + Some(host) => { 44 + //I may of copied and pasted this to get it going <.< who needs dry in all this snow 45 + let result = sqlx::query( 46 + "UPDATE accounts SET pds_host = $2 WHERE did = $1 AND pds_host <> $2", 47 + ) 48 + .bind(did_string) 49 + .bind(host) 50 + .execute(pool) 51 + .await; 52 + if let Err(e) = result { 53 + log::error!("Failed to update pds_host for {did} : {e}"); 54 + } 55 + // Ok(result.rows_affected()) 56 + Some(host.to_string()) 57 + } 58 + }, 59 + } 60 + } 61 + 23 62 /// This scheduled job finds: 24 63 /// - accounts that have not been backed up in the last 6 hours and have pds_sign_up = false, 25 64 /// and enqueues AccountBackup jobs for them; ··· 53 92 //TODO maybe check here? 54 93 let agent = BasicClient::unauthenticated(); 55 94 for (did, pds_host) in accounts { 95 + //This is a holder we can change the PDS if we need to 96 + let mut pds_host_to_call = pds_host.clone(); 97 + 56 98 //Need to do matches here cause I don't want to fail the whole loop 57 99 let pds_url = match url::Url::parse(&format!("https://{}", pds_host)) { 58 100 Ok(url) => url, ··· 75 117 })?, 76 118 }; 77 119 78 - let rev = match agent.send(request).await { 120 + let rev: Option<Tid> = match agent.send(request).await { 79 121 Ok(response) => match response.parse() { 80 122 Ok(output) => { 123 + log::info!("{output:?}"); 81 124 if output.active { 82 - output.rev; 83 - } 84 - match output.status { 85 - None => { 86 - log::warn!( 87 - "{did} has no status and not active. Not sure if this happens" 88 - ); 89 - continue; 90 - } 91 - Some(status) => { 92 - if status == "deactivated" { 93 - //TODO do that call to update the PDS for the user 94 - } else { 95 - log::warn!("{did} has the repo status {status}."); 125 + output.rev 126 + } else { 127 + match output.status { 128 + None => { 129 + log::warn!( 130 + "{did} has no status and not active. Not sure if this happens" 131 + ); 96 132 continue; 97 133 } 134 + Some(status) => { 135 + if status == "deactivated" { 136 + log::info!( 137 + "{did} has been deactivated on this PDS. Let's see if they moved." 138 + ); 139 + match find_new_pds_host(&pool, did.clone()).await { 140 + None => continue, 141 + Some(new_host) => { 142 + log::info!( 143 + "{did} has moved to {new_host}. Database updated." 144 + ); 145 + pds_host_to_call = new_host; 146 + } 147 + } 148 + //As long as it's successful, we can just send along None for the sub job to pick up the new rev on the new PDS 149 + None 150 + } else { 151 + //TODO will most likely set accounts to deactivated as i collect more data here 152 + log::warn!("{did} has the repo status {status}."); 153 + continue; 154 + } 155 + } 98 156 } 99 157 } 100 158 } ··· 113 171 114 172 let ctx = AccountBackupJobContext { 115 173 did, 116 - pds_host, 174 + pds_host: pds_host_to_call, 117 175 rev: rev_string, 118 176 }; 119 177