Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1use crate::db::models;
2use crate::jobs::account_backup::AccountBackupJobContext;
3use crate::jobs::{
4 AnyhowErrorWrapper, get_repo_rev_by_did, record_new_blob, update_last_backup_now_by_did,
5 update_repo_rev_by_did,
6};
7use crate::storage::{blob_backup_path, repo_backup_path};
8use apalis::prelude::{Data, Error};
9use async_compression::futures::bufread::ZstdEncoder;
10use futures::TryStreamExt;
11use reqwest::Client;
12use reqwest::header::{ACCEPT, CONTENT_TYPE};
13use s3::Bucket;
14use serde::{Deserialize, Serialize};
15use sqlx::{Pool, Postgres};
16use std::sync::Arc;
17use tokio_util::compat::FuturesAsyncReadCompatExt;
18
19pub const JOB_NAMESPACE: &str = "apalis::UploadBlob";
20
21#[derive(Serialize, Deserialize, Debug, Clone)]
22pub struct XrpcError {
23 error: String,
24 message: Option<String>,
25}
26
27impl std::fmt::Display for XrpcError {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match &self.message {
30 Some(msg) => write!(f, "{}: {}", self.error, msg),
31 None => write!(f, "{}", self.error),
32 }
33 }
34}
35
36#[derive(Debug, Deserialize, Serialize, Clone)]
37pub enum BlobType {
38 ///The string here is the rev. If it's from the pds backup start it already has the rev saving a web call
39 Repo(Option<String>),
40 //The vec is the cids of the blobs to pull
41 Blob(Vec<String>),
42}
43
44#[derive(Debug, Deserialize, Serialize, Clone)]
45struct AtProtoRepoStatusResponse {
46 pub rev: String,
47}
48
49#[derive(Debug, Deserialize, Serialize, Clone)]
50pub struct UploadBlobJobContext {
51 pub account_backup_job_context: AccountBackupJobContext,
52 pub blob_type: BlobType,
53 /// If set to true, this is the last batch of blobs to upload for this account and can mark the job as complete.
54 pub last_upload_batch: bool,
55}
56
57pub async fn upload_blob_job(
58 job: UploadBlobJobContext,
59 pool: Data<Pool<Postgres>>,
60 atproto_client: Data<Arc<reqwest::Client>>,
61 s3_client: Data<Arc<Box<Bucket>>>,
62) -> Result<(), Error> {
63 let did = job.account_backup_job_context.did.clone();
64 let pds_host = job.account_backup_job_context.pds_host.clone();
65
66 match job.blob_type {
67 BlobType::Repo(rev) => {
68 let current_rev = match rev {
69 None => {
70 //todo Prob just always pass in a rev? induivial uplaods do this anyhow
71 let url = format!(
72 "https://{}/xrpc/com.atproto.sync.getRepoStatus?did={}",
73 pds_host, did,
74 );
75
76 let resp = atproto_client
77 .get(url)
78 .send()
79 .await
80 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?
81 .json::<AtProtoRepoStatusResponse>()
82 .await
83 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
84 resp.rev.clone()
85 }
86 Some(rev) => rev,
87 };
88
89 //TODO this is fialing on reruns since it's not being updated
90 let current_backed_up_rev =
91 get_repo_rev_by_did(&pool, &job.account_backup_job_context.did)
92 .await
93 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?
94 //I'm cool with this unwrap since it should fail the if check
95 .unwrap_or("".to_string());
96 if current_backed_up_rev == current_rev {
97 log::info!("Repo is already backed up at the rev: {}", current_rev);
98 update_last_backup_now_by_did(&pool, did.as_str())
99 .await
100 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?;
101 return Ok(());
102 }
103
104 let repo_url = format!(
105 "https://{}/xrpc/com.atproto.sync.getRepo?did={}",
106 pds_host, did
107 );
108
109 match download_compress_and_upload_blob(
110 repo_backup_path(did.clone()),
111 &atproto_client,
112 &s3_client,
113 repo_url,
114 models::BlobType::Repo,
115 )
116 .await
117 {
118 Ok(size) => {
119 record_new_blob(
120 &pool,
121 did.clone(),
122 current_rev.clone(),
123 //TODO bad
124 size.try_into().unwrap(),
125 models::BlobType::Repo,
126 )
127 .await
128 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?;
129
130 update_repo_rev_by_did(&pool, &did, ¤t_rev)
131 .await
132 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?;
133 }
134 Err(e) => {
135 return match e {
136 DownloadCompressAndUploadError::AnyError(e) => Err(e),
137 DownloadCompressAndUploadError::BlobDownloadError => {
138 Err(Error::Failed(Arc::new(
139 anyhow::anyhow!("Error downloading the repo for: {did}").into(),
140 )))
141 }
142 DownloadCompressAndUploadError::BlobNotFound => {
143 log::warn!("Repo not found, skipping backup");
144 Err(Error::Failed(Arc::new(Box::new(std::io::Error::new(
145 std::io::ErrorKind::NotFound,
146 "Repo not found",
147 )))))
148 }
149 };
150 }
151 };
152 }
153
154 BlobType::Blob(cids) => {
155 // log::info!("Uploading {:?} blobs for {}", cids.len(), did);
156 for cid in cids {
157 let blob_url = format!(
158 "https://{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}",
159 pds_host, did, cid
160 );
161
162 match download_compress_and_upload_blob(
163 blob_backup_path(did.clone(), cid.clone()),
164 &atproto_client,
165 &s3_client,
166 blob_url,
167 models::BlobType::Blob,
168 )
169 .await
170 {
171 Ok(blob_size) => {
172 record_new_blob(
173 &pool,
174 did.clone(),
175 cid,
176 blob_size.try_into().unwrap(),
177 models::BlobType::Blob,
178 )
179 .await
180 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?;
181 }
182 Err(err) => match err {
183 DownloadCompressAndUploadError::AnyError(err) => {
184 return Err(err);
185 }
186 DownloadCompressAndUploadError::BlobNotFound => {
187 // Record missing blob so it can be retried or inspected later. On conflict, do nothing.
188 if let Err(e) = sqlx::query(
189 "INSERT INTO missing_blobs (did, cid, created_date) VALUES ($1, $2, now()) ON CONFLICT DO NOTHING",
190 )
191 .bind(&did)
192 .bind(&cid)
193 .execute(&*pool)
194 .await
195 {
196 log::warn!("Failed to record missing blob {cid} for {did}: {e}");
197 }
198 log::warn!("Blob: {cid} not found for: {did}");
199 }
200 DownloadCompressAndUploadError::BlobDownloadError => {
201 //Silently ignoring atm not to mess with the chunk
202 }
203 },
204 }
205 }
206
207 if job.last_upload_batch {
208 log::info!("backup completed for {}", did);
209 update_last_backup_now_by_did(&pool, did.as_str())
210 .await
211 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?;
212 }
213 }
214 }
215
216 Ok(())
217}
218
219pub enum DownloadCompressAndUploadError {
220 AnyError(Error),
221 BlobNotFound,
222 BlobDownloadError,
223}
224
225async fn download_compress_and_upload_blob(
226 object_key: String,
227 atproto_client: &Data<Arc<Client>>,
228 s3_client: &Data<Arc<Box<Bucket>>>,
229 repo_url: String,
230 blob_type: models::BlobType,
231) -> Result<usize, DownloadCompressAndUploadError> {
232 let accept_type = match blob_type {
233 models::BlobType::Repo => "application/vnd.ipld.car",
234 _ => "*/*",
235 };
236
237 let response = atproto_client
238 .get(repo_url.clone())
239 .header(ACCEPT, accept_type)
240 .send()
241 .await
242 .map_err(|e| {
243 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e))))
244 })?;
245 let response_status = response.status();
246
247 if !response_status.is_success() {
248 let error_body = response.json::<XrpcError>().await.map_err(|e| {
249 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e))))
250 })?;
251 if error_body.error == "InvalidRequest" {
252 if let Some(message) = error_body.message.as_deref() {
253 if message == "Blob not found" {
254 return Err(DownloadCompressAndUploadError::BlobNotFound);
255 }
256 }
257 }
258 // response.error_for_status().map_err(|e| {
259 // DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e))))
260 // })?;
261 return Err(DownloadCompressAndUploadError::AnyError(Error::Failed(
262 Arc::new(
263 anyhow::anyhow!("Error downloading the blob: {response_status} {repo_url}").into(),
264 ),
265 )));
266 }
267
268 // Derive content type from the blob response headers; default to octet-stream
269 // Should also get the car type
270 let content_type = response
271 .headers()
272 .get(CONTENT_TYPE)
273 .and_then(|v| v.to_str().ok())
274 .unwrap_or("application/octet-stream")
275 .to_string();
276
277 let blob_reader = response
278 .bytes_stream()
279 .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
280 .into_async_read();
281
282 let zstd_encoder = ZstdEncoder::new(blob_reader);
283 let mut zstd_tokio_reader = zstd_encoder.compat();
284
285 match s3_client
286 .put_object_stream_builder(object_key.as_str())
287 .with_content_type(content_type)
288 .with_content_encoding("zstd")
289 .map_err(|e| {
290 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e))))
291 })?
292 .execute_stream(&mut zstd_tokio_reader)
293 .await
294 {
295 Ok(result) => {
296 log::debug!("Uploaded: {}", object_key);
297 Ok(result.uploaded_bytes().try_into().unwrap())
298 }
299 Err(err) => {
300 log::warn!("Failed to upload: {}: {}, trying again.", object_key, err);
301 //Try again but with a more basic content type. Usually happens when a user has an odd blob that is not supported by the s3. Like an html page
302 let put_result = s3_client
303 .put_object_stream_builder(&object_key)
304 .with_content_type("application/octet-stream")
305 .with_content_encoding("zstd")
306 .map_err(|e| {
307 DownloadCompressAndUploadError::AnyError(Error::Failed(Arc::new(Box::new(e))))
308 })?
309 .execute_stream(&mut zstd_tokio_reader)
310 .await;
311 match put_result {
312 Ok(result) => {
313 log::debug!("Uploaded: {}", object_key);
314 Ok(result.uploaded_bytes().try_into().unwrap())
315 }
316 Err(err) => Err(DownloadCompressAndUploadError::AnyError(Error::Failed(
317 Arc::new(Box::new(err)),
318 ))),
319 }
320 }
321 }
322}