Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1use crate::jobs::upload_blob::{BlobType, UploadBlobJobContext};
2use crate::jobs::{AnyhowErrorWrapper, filter_missing_blob_cids, push_job_json, upload_blob};
3use apalis::prelude::{Data, Error};
4use jacquard_api::com_atproto::sync::list_blobs::ListBlobsRequest;
5use jacquard_common::xrpc::XrpcEndpoint;
6use serde::{Deserialize, Serialize};
7use sqlx::{Pool, Postgres};
8use std::sync::Arc;
9
10pub const JOB_NAMESPACE: &str = "apalis::AccountBackup";
11
12#[derive(Debug, Deserialize, Serialize, Clone)]
13pub struct AccountBackupJobContext {
14 pub did: String,
15 // If the job start comes from the PDS backup this is already set saving a request to the PDS
16 pub rev: Option<String>,
17 pub pds_host: String,
18}
19
20#[derive(Deserialize)]
21struct ListBlobsResponse {
22 #[allow(dead_code)]
23 cursor: Option<String>,
24 cids: Vec<String>,
25}
26
27pub async fn account_backup_job(
28 job: AccountBackupJobContext,
29 pool: Data<Pool<Postgres>>,
30 atproto_client: Data<Arc<reqwest::Client>>,
31) -> Result<(), Error> {
32 log::info!("Starting backup for did: {}", job.did);
33
34 push_job_json(
35 &pool,
36 upload_blob::JOB_NAMESPACE,
37 &UploadBlobJobContext {
38 account_backup_job_context: job.clone(),
39 blob_type: BlobType::Repo(job.rev.clone()),
40 last_upload_batch: false,
41 },
42 )
43 .await?;
44
45 let mut cursor: Option<String> = None;
46 loop {
47 let mut url = format!(
48 "https://{}{}?did={}&limit={}",
49 job.pds_host,
50 ListBlobsRequest::PATH,
51 job.did,
52 1000
53 );
54
55 if let Some(ref c) = cursor {
56 if !c.is_empty() {
57 url.push_str("&cursor=");
58 url.push_str(c);
59 }
60 }
61
62 let resp = atproto_client
63 .get(url)
64 .send()
65 .await
66 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?
67 .json::<ListBlobsResponse>()
68 .await
69 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
70
71 let missing_cids = filter_missing_blob_cids(&pool, &resp.cids, &job.did)
72 .await
73 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?;
74
75 // Process missing CIDs in batches of 5
76 let mut processed = 0;
77
78 for chunk in missing_cids.chunks(5) {
79 processed += chunk.len();
80 let last_chunk = processed >= missing_cids.len();
81 push_job_json(
82 &pool,
83 upload_blob::JOB_NAMESPACE,
84 &UploadBlobJobContext {
85 account_backup_job_context: job.clone(),
86 blob_type: BlobType::Blob(chunk.to_vec()),
87 last_upload_batch: last_chunk,
88 },
89 )
90 .await?;
91 }
92
93 cursor = resp.cursor;
94 if cursor.is_none() || cursor.as_ref().unwrap().is_empty() {
95 break;
96 }
97 }
98 Ok(())
99}