Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1use crate::jobs::account_backup::AccountBackupJobContext;
2use crate::jobs::{account_backup, pds_backup, push_job_json};
3use apalis::prelude::{Data, Error};
4use jacquard::{client::BasicClient, identity::PublicResolver, prelude::IdentityResolver, url};
5use jacquard_api::com_atproto::sync::get_repo_status::GetRepoStatus;
6use jacquard_common::{types::did::Did, types::tid::Tid, xrpc::XrpcClient};
7use serde::{Deserialize, Serialize};
8use sqlx::{Pool, Postgres};
9use std::sync::Arc;
10
11pub const JOB_NAMESPACE: &str = "apalis::ScheduledBackUpStart";
12
13#[derive(Debug, Deserialize, Serialize, Clone)]
14pub struct ScheduledBackUpStartJobContext;
15
16/// Finds the new PDS and saves it in the database if the user has changed PDSs since signing up
17async fn find_new_pds_host(pool: &Pool<Postgres>, did: String) -> Option<String> {
18 let resolver = PublicResolver::default();
19 let did_string = did.clone();
20 let did = match Did::new(did.as_str()) {
21 Ok(did) => did,
22 Err(err) => {
23 log::error!("Failed to parse did {did} : {err}");
24 return None;
25 }
26 };
27
28 let did_doc = match resolver.resolve_did_doc_owned(&did).await {
29 Ok(did_doc) => did_doc,
30 Err(err) => {
31 log::error!("Failed to resolve did doc for {did} : {err}");
32 return None;
33 }
34 };
35
36 match did_doc.pds_endpoint() {
37 None => None,
38 Some(url) => match url.host_str() {
39 None => {
40 log::error!("Failed to parse pds_endpoint for {did} : {url}");
41 return None;
42 }
43 Some(host) => {
44 //I may of copied and pasted this to get it going <.< who needs dry in all this snow
45 let result = sqlx::query(
46 "UPDATE accounts SET pds_host = $2 WHERE did = $1 AND pds_host <> $2",
47 )
48 .bind(did_string)
49 .bind(host)
50 .execute(pool)
51 .await;
52 if let Err(e) = result {
53 log::error!("Failed to update pds_host for {did} : {e}");
54 }
55 // Ok(result.rows_affected())
56 Some(host.to_string())
57 }
58 },
59 }
60}
61
62/// This scheduled job finds:
63/// - accounts that have not been backed up in the last 2 hours and have pds_sign_up = false,
64/// and enqueues AccountBackup jobs for them;
65/// - pds_hosts that are active and have not started a backup in the last 2 hours (tracked via
66/// pds_hosts.last_backup_start), and enqueues PdsBackup jobs for each.
67pub async fn scheduled_back_up_start_job(
68 _job: ScheduledBackUpStartJobContext,
69 pool: Data<Pool<Postgres>>,
70) -> Result<(), Error> {
71 log::info!("Starting a backup for the whole instance");
72 // Record the start of a whole-network backup run
73 sqlx::query(r#"INSERT INTO network_backup_runs DEFAULT VALUES"#)
74 .execute(&*pool)
75 .await
76 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
77 // 1) Query accounts needing backup
78 // Condition: pds_sign_up = false AND (last_backup is NULL OR older than 6h)
79 // We include did and pds_host to build AccountBackupJobContext
80 let accounts: Vec<(String, String)> = sqlx::query_as(
81 r#"
82 SELECT did, pds_host
83 FROM accounts
84 WHERE pds_sign_up = FALSE
85 AND (last_backup IS NULL OR last_backup < NOW() - INTERVAL '2 HOURS')
86 "#,
87 )
88 .fetch_all(&*pool)
89 .await
90 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
91
92 //TODO maybe check here?
93 let agent = BasicClient::unauthenticated();
94 for (did, pds_host) in accounts {
95 //This is a holder we can change the PDS if we need to
96 let mut pds_host_to_call = pds_host.clone();
97
98 //Need to do matches here cause I don't want to fail the whole loop
99 let pds_url = match url::Url::parse(&format!("https://{}", pds_host)) {
100 Ok(url) => url,
101 Err(e) => {
102 log::warn!("Failed to parse pds_host: {pds_host} \n {e}");
103 continue;
104 }
105 };
106
107 //Sets the agent to the new PDS URL
108 agent.set_base_uri(pds_url).await;
109
110 let request = GetRepoStatus {
111 //May just leave this ?
112 did: did.clone().parse().map_err(|_| {
113 Error::Failed(Arc::new(Box::new(std::io::Error::new(
114 std::io::ErrorKind::Other,
115 "Failed to parse did",
116 ))))
117 })?,
118 };
119
120 let rev: Option<Tid> = match agent.send(request).await {
121 Ok(response) => match response.parse() {
122 Ok(output) => {
123 if output.active {
124 output.rev
125 } else {
126 match output.status {
127 None => {
128 log::warn!(
129 "{did} has no status and not active. Not sure if this happens"
130 );
131 continue;
132 }
133 Some(status) => {
134 if status == "deactivated" {
135 log::info!(
136 "{did} has been deactivated on this PDS. Let's see if they moved."
137 );
138 match find_new_pds_host(&pool, did.clone()).await {
139 None => continue,
140 Some(new_host) => {
141 log::info!(
142 "{did} has moved to {new_host}. Database updated."
143 );
144 pds_host_to_call = new_host;
145 }
146 }
147 //As long as it's successful, we can just send along None for the sub job to pick up the new rev on the new PDS
148 None
149 } else {
150 //TODO will most likely set accounts to deactivated as i collect more data here
151 log::warn!("{did} has the repo status {status}.");
152 continue;
153 }
154 }
155 }
156 }
157 }
158 Err(err) => {
159 log::warn!("Failed to parse GetRepoStatusResponse for: {did} \n {err}");
160 continue;
161 }
162 },
163 Err(err) => {
164 log::warn!("Failed to send GetRepoStatusRequest for: {did} \n {err}");
165 continue;
166 }
167 };
168
169 let rev_string: Option<String> = rev.map(|r| r.to_string());
170
171 let ctx = AccountBackupJobContext {
172 did,
173 pds_host: pds_host_to_call,
174 rev: rev_string,
175 };
176
177 push_job_json(&pool, account_backup::JOB_NAMESPACE, &ctx).await?;
178 }
179
180 // 3) Query pds_hosts needing backup start
181 // Condition: active = TRUE AND (last_backup_start is NULL OR older than 24h)
182 let pds_hosts: Vec<String> = sqlx::query_scalar(
183 r#"
184 SELECT pds_host
185 FROM pds_hosts
186 WHERE active = TRUE
187 AND (last_backup_start IS NULL OR last_backup_start < NOW() - INTERVAL '2 HOURS')
188 "#,
189 )
190 .fetch_all(&*pool)
191 .await
192 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
193
194 // To avoid racing / double-enqueue, mark last_backup_start immediately for those we are about to enqueue
195 if !pds_hosts.is_empty() {
196 sqlx::query(
197 r#"
198 UPDATE pds_hosts
199 SET last_backup_start = NOW()
200 WHERE pds_host = ANY($1)
201 "#,
202 )
203 .bind(&pds_hosts)
204 .execute(&*pool)
205 .await
206 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
207 }
208
209 for host in pds_hosts {
210 let ctx = crate::jobs::pds_backup::PdsBackupJobContext { pds_host: host };
211 push_job_json(&pool, pds_backup::JOB_NAMESPACE, &ctx).await?;
212 }
213
214 Ok(())
215}