Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1pub mod account_backup;
2pub mod pds_backup;
3pub mod remove_repo;
4pub mod scheduled_back_up_start;
5pub mod start_all_backup;
6pub mod upload_blob;
7pub mod verify_backups;
8
9use crate::db::models;
10use crate::db::models::BlobModel;
11use apalis::prelude::*;
12use serde::{Deserialize, Serialize};
13use serde_json::{self};
14use sqlx::{Pool, Postgres, query};
15use std::collections::HashSet;
16use std::fmt;
17use std::sync::Arc;
18
19#[derive(Debug)]
20pub enum JobError {
21 SomeError(&'static str),
22}
23
24impl std::fmt::Display for JobError {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(f, "{self:?}")
27 }
28}
29
30// Create a wrapper struct that implements std::error::Error
31#[derive(Debug)]
32struct AnyhowErrorWrapper(anyhow::Error);
33
34impl fmt::Display for AnyhowErrorWrapper {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 write!(f, "{}", self.0)
37 }
38}
39
40impl std::error::Error for AnyhowErrorWrapper {}
41
42/// Generic helper to manually enqueue any Apalis job by calling the SQL function directly.
43///
44/// - job_namespace: fully-qualified job type name as stored by Apalis (e.g., "apalis::Email").
45/// - payload: any struct that implements Serialize + Deserialize; it will be sent as JSON.
46pub async fn push_job_json<T>(
47 pool: &Pool<Postgres>,
48 job_namespace: &str,
49 payload: &T,
50) -> Result<(), Error>
51where
52 T: Serialize + for<'de> Deserialize<'de>,
53{
54 // Serialize payload to JSON and send to Postgres as json
55 let json_str =
56 serde_json::to_string(payload).map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
57
58 query("select apalis.push_job($1, $2::json)")
59 .bind(job_namespace)
60 .bind(json_str)
61 .execute(pool)
62 .await
63 .map(|_| ())
64 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))
65}
66
67/// Given a list of CIDs, returns those that are NOT already present in the blobs table
68/// with blob type = 'blob' and matches the user's did in the case of duplicate blobs for each user
69pub async fn filter_missing_blob_cids(
70 pool: &Pool<Postgres>,
71 cids: &Vec<String>,
72 users_did: &String,
73) -> anyhow::Result<Vec<String>> {
74 if cids.is_empty() {
75 return Ok(Vec::new());
76 }
77
78 // Fetch the subset of provided CIDs that already exist as type 'blob'
79 let existing: Vec<String> = sqlx::query_scalar(
80 r#"SELECT cid_or_rev FROM blobs WHERE type = $1 AND cid_or_rev = ANY($2) AND account_did = $3"#,
81 )
82 .bind(crate::db::models::BlobType::Blob)
83 .bind(&cids)
84 .bind(users_did)
85 .fetch_all(pool)
86 .await?;
87
88 let existing_set: HashSet<&str> = existing.iter().map(|s| s.as_str()).collect();
89 let missing: Vec<String> = cids
90 .iter()
91 .filter(|cid| !existing_set.contains(cid.as_str()))
92 .cloned()
93 .collect();
94
95 Ok(missing)
96}
97
98pub async fn record_new_blob(
99 pool: &Pool<Postgres>,
100 did: String,
101 cid_or_rev: String,
102 size: i64,
103 blob_type: models::BlobType,
104) -> anyhow::Result<models::BlobModel> {
105 match blob_type {
106 //On repo we need to upsert on did
107 models::BlobType::Repo => {
108 // First try to update an existing 'repo' blob row for this DID.
109 if let Some(updated) = sqlx::query_as::<_, BlobModel>(
110 r#"
111 UPDATE blobs
112 SET size = $2,
113 type = $3,
114 cid_or_rev = $4
115 WHERE account_did = $1 AND type = $3
116 RETURNING id, created_at, account_did, size, type, cid_or_rev
117 "#,
118 )
119 .bind(&did)
120 .bind(size)
121 .bind(&blob_type)
122 .bind(&cid_or_rev)
123 .fetch_optional(pool)
124 .await?
125 {
126 Ok(updated)
127 } else {
128 // If no row was updated, insert a new one for this DID and repo type.
129 Ok(sqlx::query_as::<_, BlobModel>(
130 r#"
131 INSERT INTO blobs (account_did, size, type, cid_or_rev)
132 VALUES ($1, $2, $3, $4)
133 RETURNING id, created_at, account_did, size, type, cid_or_rev
134 "#,
135 )
136 .bind(did)
137 .bind(size)
138 .bind(blob_type)
139 .bind(cid_or_rev)
140 .fetch_one(pool)
141 .await?)
142 }
143 }
144 //on blob we upsert on (account_did, cid_or_rev)
145 models::BlobType::Blob | _ => Ok(sqlx::query_as::<_, BlobModel>(
146 r#"
147 INSERT INTO blobs (account_did, size, type, cid_or_rev)
148 VALUES ($1, $2, $3, $4)
149 ON CONFLICT (account_did, cid_or_rev) DO UPDATE
150 SET size = EXCLUDED.size,
151 type = EXCLUDED.type
152 RETURNING id, created_at, account_did, size, type, cid_or_rev
153 "#,
154 )
155 .bind(did)
156 .bind(size)
157 .bind(blob_type)
158 .bind(cid_or_rev)
159 .fetch_one(pool)
160 .await?),
161 }
162}
163
164/// Look up the user's account by DID and return their repo_rev, if present.
165pub async fn get_repo_rev_by_did(
166 pool: &Pool<Postgres>,
167 did: &str,
168) -> anyhow::Result<Option<String>> {
169 // repo_rev is nullable; also the account row may not exist.
170 // Using fetch_optional yields Option<Option<String>> which we flatten to a single Option<String>.
171 let result: Option<Option<String>> =
172 sqlx::query_scalar(r#"SELECT repo_rev FROM accounts WHERE did = $1"#)
173 .bind(did)
174 .fetch_optional(pool)
175 .await?;
176
177 Ok(result.flatten())
178}
179
180/// Update the repo_rev for a given account identified by DID.
181/// Returns true if a row was updated.
182pub async fn update_repo_rev_by_did(
183 pool: &Pool<Postgres>,
184 did: &str,
185 repo_rev: &str,
186) -> anyhow::Result<bool> {
187 let result =
188 sqlx::query(r#"UPDATE accounts SET repo_rev = $2, last_backup = NOW() WHERE did = $1"#)
189 .bind(did)
190 .bind(repo_rev)
191 .execute(pool)
192 .await?;
193 Ok(result.rows_affected() > 0)
194}
195
196/// Update last_backup to the current timestamp for a given account identified by DID.
197/// Returns true if a row was updated.
198pub async fn update_last_backup_now_by_did(
199 pool: &Pool<Postgres>,
200 did: &str,
201) -> anyhow::Result<bool> {
202 let result = sqlx::query(r#"UPDATE accounts SET last_backup = NOW() WHERE did = $1"#)
203 .bind(did)
204 .execute(pool)
205 .await?;
206 Ok(result.rows_affected() > 0)
207}