Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at main 178 lines 6.8 kB view raw
1use apalis::layers::retry::RetryPolicy; 2use apalis::prelude::*; 3use apalis_sql::{ 4 Config, 5 postgres::{PgListen, PgPool, PostgresStorage}, 6}; 7use dotenvy::dotenv; 8use log::{debug, info}; 9use s3::creds::Credentials; 10use s3::{Bucket, Region}; 11use shared::jobs::account_backup::{AccountBackupJobContext, account_backup_job}; 12use shared::jobs::pds_backup::{PdsBackupJobContext, pds_backup_job}; 13use shared::jobs::remove_repo::RemoveRepoJobContext; 14use shared::jobs::scheduled_back_up_start::{ 15 ScheduledBackUpStartJobContext, scheduled_back_up_start_job, 16}; 17use shared::jobs::upload_blob::{UploadBlobJobContext, upload_blob_job}; 18use shared::jobs::{account_backup, pds_backup, remove_repo, scheduled_back_up_start, upload_blob}; 19use std::env; 20use std::sync::Arc; 21use tracing_subscriber::prelude::*; 22 23#[tokio::main] 24async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 use tracing_subscriber::EnvFilter; 26 let _ = dotenv(); 27 let fmt_layer = tracing_subscriber::fmt::layer().with_target(false); 28 let filter_layer = 29 EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("debug"))?; 30 tracing_subscriber::registry() 31 .with(filter_layer) 32 .with(fmt_layer) 33 .init(); 34 35 let worker_node_name = 36 std::env::var("WORKER_NODE_NAME").expect("Must specify worker node name"); 37 38 //job backend setup 39 let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db"); 40 41 let pool = PgPool::connect(&database_url).await?; 42 43 let mut start_backup_storage: PostgresStorage<AccountBackupJobContext> = 44 PostgresStorage::new_with_config(pool.clone(), Config::new(account_backup::JOB_NAMESPACE)); 45 46 let mut upload_blob_storage: PostgresStorage<UploadBlobJobContext> = 47 PostgresStorage::new_with_config(pool.clone(), Config::new(upload_blob::JOB_NAMESPACE)); 48 49 let mut start_back_up_listener = PgListen::new(pool.clone()).await?; 50 start_back_up_listener.subscribe_with(&mut start_backup_storage); 51 52 let mut pull_blobs_listener = PgListen::new(pool.clone()).await?; 53 pull_blobs_listener.subscribe_with(&mut upload_blob_storage); 54 55 let mut pds_backup_storage: PostgresStorage<PdsBackupJobContext> = 56 PostgresStorage::new_with_config(pool.clone(), Config::new(pds_backup::JOB_NAMESPACE)); 57 let mut pds_backup_listener = PgListen::new(pool.clone()).await?; 58 pds_backup_listener.subscribe_with(&mut pds_backup_storage); 59 60 let mut scheduled_backup_storage: PostgresStorage<ScheduledBackUpStartJobContext> = 61 PostgresStorage::new_with_config( 62 pool.clone(), 63 Config::new(scheduled_back_up_start::JOB_NAMESPACE), 64 ); 65 let mut scheduled_backup_listener = PgListen::new(pool.clone()).await?; 66 scheduled_backup_listener.subscribe_with(&mut scheduled_backup_storage); 67 68 let mut remove_repo_storage: PostgresStorage<RemoveRepoJobContext> = 69 PostgresStorage::new_with_config(pool.clone(), Config::new(remove_repo::JOB_NAMESPACE)); 70 let mut remove_repo_listener = PgListen::new(pool.clone()).await?; 71 remove_repo_listener.subscribe_with(&mut remove_repo_storage); 72 73 tokio::spawn(async move { 74 //TODO bad? 75 start_back_up_listener.listen().await.unwrap(); 76 pull_blobs_listener.listen().await.unwrap(); 77 pds_backup_listener.listen().await.unwrap(); 78 scheduled_backup_listener.listen().await.unwrap(); 79 remove_repo_listener.listen().await.unwrap(); 80 }); 81 82 //Atrpoto client setup 83 let atproto_client = Arc::new( 84 reqwest::Client::builder() 85 .user_agent("pds-moover-backups/0.0.1") 86 .build()?, 87 ); 88 89 //S3 90 let region_name = env::var("S3_REGION")?; 91 let endpoint = env::var("S3_ENDPOINT")?; 92 let region = Region::Custom { 93 region: region_name, 94 endpoint, 95 }; 96 97 let bucket = Bucket::new( 98 env::var("S3_BUCKET_NAME")?.as_str(), 99 region, 100 // Credentials are collected from environment, config, profile or instance metadata 101 Credentials::new( 102 Some(env::var("S3_ACCESS_KEY")?.as_str()), 103 Some(env::var("S3_SECRET_KEY")?.as_str()), 104 None, 105 None, 106 None, 107 )?, 108 )?; 109 110 let s3_bucket = Arc::new(bucket); 111 112 log::info!("Starting the worker node: {}", worker_node_name); 113 114 Monitor::new() 115 .register({ 116 WorkerBuilder::new(format!("{}-start-backup", worker_node_name)) 117 .data(pool.clone()) 118 .data(atproto_client.clone()) 119 .concurrency(5) 120 .retry(RetryPolicy::retries(5)) 121 .enable_tracing() 122 .backend(start_backup_storage) 123 .build_fn(account_backup_job) 124 }) 125 .register({ 126 WorkerBuilder::new(format!("{}-upload-blob", worker_node_name)) 127 .data(pool.clone()) 128 .data(atproto_client.clone()) 129 .data(s3_bucket.clone()) 130 .concurrency(20) 131 .retry(RetryPolicy::retries(5)) 132 .enable_tracing() 133 .backend(upload_blob_storage) 134 .build_fn(upload_blob_job) 135 // .chain(|s| { 136 // //Should be a performance boost of the job 137 // s.map_future(|f| async { 138 // let fut = tokio::spawn(f); 139 // let fut = fut.await?; 140 // fut 141 // }) 142 // }) 143 }) 144 .register({ 145 WorkerBuilder::new(format!("{}-pds-backup", worker_node_name)) 146 .data(pool.clone()) 147 .data(atproto_client.clone()) 148 .retry(RetryPolicy::retries(1)) 149 .enable_tracing() 150 .backend(pds_backup_storage) 151 .build_fn(pds_backup_job) 152 }) 153 .register({ 154 WorkerBuilder::new(format!("{}-scheduled-backup-start", worker_node_name)) 155 .data(pool.clone()) 156 .retry(RetryPolicy::retries(5)) 157 .enable_tracing() 158 .backend(scheduled_backup_storage) 159 .build_fn(scheduled_back_up_start_job) 160 }) 161 .register({ 162 WorkerBuilder::new(format!("{}-delete-repo", worker_node_name)) 163 .data(pool.clone()) 164 .data(s3_bucket.clone()) 165 .retry(RetryPolicy::retries(5)) 166 .enable_tracing() 167 .backend(remove_repo_storage) 168 .build_fn(remove_repo::run) 169 }) 170 .on_event(|e| debug!("{e}")) 171 .run_with_signal(async { 172 tokio::signal::ctrl_c().await?; 173 info!("Shutting down the system"); 174 Ok(()) 175 }) 176 .await?; 177 Ok(()) 178}