Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at main 207 lines 6.6 kB view raw
1pub mod account_backup; 2pub mod pds_backup; 3pub mod remove_repo; 4pub mod scheduled_back_up_start; 5pub mod start_all_backup; 6pub mod upload_blob; 7pub mod verify_backups; 8 9use crate::db::models; 10use crate::db::models::BlobModel; 11use apalis::prelude::*; 12use serde::{Deserialize, Serialize}; 13use serde_json::{self}; 14use sqlx::{Pool, Postgres, query}; 15use std::collections::HashSet; 16use std::fmt; 17use std::sync::Arc; 18 19#[derive(Debug)] 20pub enum JobError { 21 SomeError(&'static str), 22} 23 24impl std::fmt::Display for JobError { 25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 26 write!(f, "{self:?}") 27 } 28} 29 30// Create a wrapper struct that implements std::error::Error 31#[derive(Debug)] 32struct AnyhowErrorWrapper(anyhow::Error); 33 34impl fmt::Display for AnyhowErrorWrapper { 35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 36 write!(f, "{}", self.0) 37 } 38} 39 40impl std::error::Error for AnyhowErrorWrapper {} 41 42/// Generic helper to manually enqueue any Apalis job by calling the SQL function directly. 43/// 44/// - job_namespace: fully-qualified job type name as stored by Apalis (e.g., "apalis::Email"). 45/// - payload: any struct that implements Serialize + Deserialize; it will be sent as JSON. 46pub async fn push_job_json<T>( 47 pool: &Pool<Postgres>, 48 job_namespace: &str, 49 payload: &T, 50) -> Result<(), Error> 51where 52 T: Serialize + for<'de> Deserialize<'de>, 53{ 54 // Serialize payload to JSON and send to Postgres as json 55 let json_str = 56 serde_json::to_string(payload).map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 57 58 query("select apalis.push_job($1, $2::json)") 59 .bind(job_namespace) 60 .bind(json_str) 61 .execute(pool) 62 .await 63 .map(|_| ()) 64 .map_err(|e| Error::Failed(Arc::new(Box::new(e)))) 65} 66 67/// Given a list of CIDs, returns those that are NOT already present in the blobs table 68/// with blob type = 'blob' and matches the user's did in the case of duplicate blobs for each user 69pub async fn filter_missing_blob_cids( 70 pool: &Pool<Postgres>, 71 cids: &Vec<String>, 72 users_did: &String, 73) -> anyhow::Result<Vec<String>> { 74 if cids.is_empty() { 75 return Ok(Vec::new()); 76 } 77 78 // Fetch the subset of provided CIDs that already exist as type 'blob' 79 let existing: Vec<String> = sqlx::query_scalar( 80 r#"SELECT cid_or_rev FROM blobs WHERE type = $1 AND cid_or_rev = ANY($2) AND account_did = $3"#, 81 ) 82 .bind(crate::db::models::BlobType::Blob) 83 .bind(&cids) 84 .bind(users_did) 85 .fetch_all(pool) 86 .await?; 87 88 let existing_set: HashSet<&str> = existing.iter().map(|s| s.as_str()).collect(); 89 let missing: Vec<String> = cids 90 .iter() 91 .filter(|cid| !existing_set.contains(cid.as_str())) 92 .cloned() 93 .collect(); 94 95 Ok(missing) 96} 97 98pub async fn record_new_blob( 99 pool: &Pool<Postgres>, 100 did: String, 101 cid_or_rev: String, 102 size: i64, 103 blob_type: models::BlobType, 104) -> anyhow::Result<models::BlobModel> { 105 match blob_type { 106 //On repo we need to upsert on did 107 models::BlobType::Repo => { 108 // First try to update an existing 'repo' blob row for this DID. 109 if let Some(updated) = sqlx::query_as::<_, BlobModel>( 110 r#" 111 UPDATE blobs 112 SET size = $2, 113 type = $3, 114 cid_or_rev = $4 115 WHERE account_did = $1 AND type = $3 116 RETURNING id, created_at, account_did, size, type, cid_or_rev 117 "#, 118 ) 119 .bind(&did) 120 .bind(size) 121 .bind(&blob_type) 122 .bind(&cid_or_rev) 123 .fetch_optional(pool) 124 .await? 125 { 126 Ok(updated) 127 } else { 128 // If no row was updated, insert a new one for this DID and repo type. 129 Ok(sqlx::query_as::<_, BlobModel>( 130 r#" 131 INSERT INTO blobs (account_did, size, type, cid_or_rev) 132 VALUES ($1, $2, $3, $4) 133 RETURNING id, created_at, account_did, size, type, cid_or_rev 134 "#, 135 ) 136 .bind(did) 137 .bind(size) 138 .bind(blob_type) 139 .bind(cid_or_rev) 140 .fetch_one(pool) 141 .await?) 142 } 143 } 144 //on blob we upsert on (account_did, cid_or_rev) 145 models::BlobType::Blob | _ => Ok(sqlx::query_as::<_, BlobModel>( 146 r#" 147 INSERT INTO blobs (account_did, size, type, cid_or_rev) 148 VALUES ($1, $2, $3, $4) 149 ON CONFLICT (account_did, cid_or_rev) DO UPDATE 150 SET size = EXCLUDED.size, 151 type = EXCLUDED.type 152 RETURNING id, created_at, account_did, size, type, cid_or_rev 153 "#, 154 ) 155 .bind(did) 156 .bind(size) 157 .bind(blob_type) 158 .bind(cid_or_rev) 159 .fetch_one(pool) 160 .await?), 161 } 162} 163 164/// Look up the user's account by DID and return their repo_rev, if present. 165pub async fn get_repo_rev_by_did( 166 pool: &Pool<Postgres>, 167 did: &str, 168) -> anyhow::Result<Option<String>> { 169 // repo_rev is nullable; also the account row may not exist. 170 // Using fetch_optional yields Option<Option<String>> which we flatten to a single Option<String>. 171 let result: Option<Option<String>> = 172 sqlx::query_scalar(r#"SELECT repo_rev FROM accounts WHERE did = $1"#) 173 .bind(did) 174 .fetch_optional(pool) 175 .await?; 176 177 Ok(result.flatten()) 178} 179 180/// Update the repo_rev for a given account identified by DID. 181/// Returns true if a row was updated. 182pub async fn update_repo_rev_by_did( 183 pool: &Pool<Postgres>, 184 did: &str, 185 repo_rev: &str, 186) -> anyhow::Result<bool> { 187 let result = 188 sqlx::query(r#"UPDATE accounts SET repo_rev = $2, last_backup = NOW() WHERE did = $1"#) 189 .bind(did) 190 .bind(repo_rev) 191 .execute(pool) 192 .await?; 193 Ok(result.rows_affected() > 0) 194} 195 196/// Update last_backup to the current timestamp for a given account identified by DID. 197/// Returns true if a row was updated. 198pub async fn update_last_backup_now_by_did( 199 pool: &Pool<Postgres>, 200 did: &str, 201) -> anyhow::Result<bool> { 202 let result = sqlx::query(r#"UPDATE accounts SET last_backup = NOW() WHERE did = $1"#) 203 .bind(did) 204 .execute(pool) 205 .await?; 206 Ok(result.rows_affected() > 0) 207}