this repo has no description
1use cid::Cid;
2use ipld_core::ipld::Ipld;
3use jacquard_repo::commit::Commit;
4use jacquard_repo::storage::BlockStore;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::{BackupStorage, BlobStorage};
15use crate::sync::car::encode_car_header;
16
17async fn update_genesis_blocks_cids(db: &PgPool, blocks_cids: &[String], seq: i64) -> Result<(), sqlx::Error> {
18 sqlx::query!(
19 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
20 blocks_cids,
21 seq
22 )
23 .execute(db)
24 .await?;
25 Ok(())
26}
27
28async fn update_repo_rev(db: &PgPool, rev: &str, user_id: uuid::Uuid) -> Result<(), sqlx::Error> {
29 sqlx::query!(
30 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
31 rev,
32 user_id
33 )
34 .execute(db)
35 .await?;
36 Ok(())
37}
38
39async fn insert_user_blocks(db: &PgPool, user_id: uuid::Uuid, block_cids: &[Vec<u8>]) -> Result<(), sqlx::Error> {
40 sqlx::query!(
41 r#"
42 INSERT INTO user_blocks (user_id, block_cid)
43 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
44 ON CONFLICT (user_id, block_cid) DO NOTHING
45 "#,
46 user_id,
47 block_cids
48 )
49 .execute(db)
50 .await?;
51 Ok(())
52}
53
54async fn fetch_user_records(db: &PgPool, user_id: uuid::Uuid) -> Result<Vec<(String, String, String)>, sqlx::Error> {
55 let rows = sqlx::query!(
56 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
57 user_id
58 )
59 .fetch_all(db)
60 .await?;
61 Ok(rows.into_iter().map(|r| (r.collection, r.rkey, r.record_cid)).collect())
62}
63
64async fn insert_record_blobs(db: &PgPool, user_id: uuid::Uuid, record_uris: &[String], blob_cids: &[String]) -> Result<(), sqlx::Error> {
65 sqlx::query!(
66 r#"
67 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
68 SELECT $1, record_uri, blob_cid
69 FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)
70 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
71 "#,
72 user_id,
73 record_uris,
74 blob_cids
75 )
76 .execute(db)
77 .await?;
78 Ok(())
79}
80
81async fn delete_backup_record(db: &PgPool, id: uuid::Uuid) -> Result<(), sqlx::Error> {
82 sqlx::query!("DELETE FROM account_backups WHERE id = $1", id)
83 .execute(db)
84 .await?;
85 Ok(())
86}
87
88async fn fetch_old_backups(
89 db: &PgPool,
90 user_id: uuid::Uuid,
91 retention_count: i64,
92) -> Result<Vec<(uuid::Uuid, String)>, sqlx::Error> {
93 let rows = sqlx::query!(
94 r#"
95 SELECT id, storage_key
96 FROM account_backups
97 WHERE user_id = $1
98 ORDER BY created_at DESC
99 OFFSET $2
100 "#,
101 user_id,
102 retention_count
103 )
104 .fetch_all(db)
105 .await?;
106 Ok(rows.into_iter().map(|r| (r.id, r.storage_key)).collect())
107}
108
109async fn insert_backup_record(
110 db: &PgPool,
111 user_id: uuid::Uuid,
112 storage_key: &str,
113 repo_root_cid: &str,
114 repo_rev: &str,
115 block_count: i32,
116 size_bytes: i64,
117) -> Result<(), sqlx::Error> {
118 sqlx::query!(
119 r#"
120 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)
121 VALUES ($1, $2, $3, $4, $5, $6)
122 "#,
123 user_id,
124 storage_key,
125 repo_root_cid,
126 repo_rev,
127 block_count,
128 size_bytes
129 )
130 .execute(db)
131 .await?;
132 Ok(())
133}
134
135struct GenesisCommitRow {
136 seq: i64,
137 did: String,
138 commit_cid: Option<String>,
139}
140
141async fn process_genesis_commit(
142 db: &PgPool,
143 block_store: &PostgresBlockStore,
144 row: GenesisCommitRow,
145) -> Result<(String, i64), (i64, &'static str)> {
146 let commit_cid_str = row.commit_cid.ok_or((row.seq, "missing commit_cid"))?;
147 let commit_cid = Cid::from_str(&commit_cid_str).map_err(|_| (row.seq, "invalid CID"))?;
148 let block = block_store
149 .get(&commit_cid)
150 .await
151 .map_err(|_| (row.seq, "failed to fetch block"))?
152 .ok_or((row.seq, "block not found"))?;
153 let commit = Commit::from_cbor(&block).map_err(|_| (row.seq, "failed to parse commit"))?;
154 let blocks_cids = vec![commit.data.to_string(), commit_cid.to_string()];
155 update_genesis_blocks_cids(db, &blocks_cids, row.seq)
156 .await
157 .map_err(|_| (row.seq, "failed to update"))?;
158 Ok((row.did, row.seq))
159}
160
161pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
162 let broken_genesis_commits = match sqlx::query!(
163 r#"
164 SELECT seq, did, commit_cid
165 FROM repo_seq
166 WHERE event_type = 'commit'
167 AND prev_cid IS NULL
168 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)
169 "#
170 )
171 .fetch_all(db)
172 .await
173 {
174 Ok(rows) => rows,
175 Err(e) => {
176 error!("Failed to query repo_seq for genesis commit backfill: {}", e);
177 return;
178 }
179 };
180
181 if broken_genesis_commits.is_empty() {
182 debug!("No genesis commits need blocks_cids backfill");
183 return;
184 }
185
186 info!(
187 count = broken_genesis_commits.len(),
188 "Backfilling blocks_cids for genesis commits"
189 );
190
191 let results = futures::future::join_all(broken_genesis_commits.into_iter().map(|row| {
192 process_genesis_commit(
193 db,
194 &block_store,
195 GenesisCommitRow {
196 seq: row.seq,
197 did: row.did,
198 commit_cid: row.commit_cid,
199 },
200 )
201 }))
202 .await;
203
204 let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r {
205 Ok((did, seq)) => {
206 info!(seq = seq, did = %did, "Fixed genesis commit blocks_cids");
207 (s + 1, f)
208 }
209 Err((seq, reason)) => {
210 warn!(seq = seq, reason = reason, "Failed to process genesis commit");
211 (s, f + 1)
212 }
213 });
214
215 info!(
216 success,
217 failed, "Completed genesis commit blocks_cids backfill"
218 );
219}
220
221async fn process_repo_rev(
222 db: &PgPool,
223 block_store: &PostgresBlockStore,
224 user_id: uuid::Uuid,
225 repo_root_cid: String,
226) -> Result<uuid::Uuid, uuid::Uuid> {
227 let cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?;
228 let block = block_store
229 .get(&cid)
230 .await
231 .ok()
232 .flatten()
233 .ok_or(user_id)?;
234 let commit = Commit::from_cbor(&block).map_err(|_| user_id)?;
235 let rev = commit.rev().to_string();
236 update_repo_rev(db, &rev, user_id)
237 .await
238 .map_err(|_| user_id)?;
239 Ok(user_id)
240}
241
242pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
243 let repos_missing_rev =
244 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL")
245 .fetch_all(db)
246 .await
247 {
248 Ok(rows) => rows,
249 Err(e) => {
250 error!("Failed to query repos for backfill: {}", e);
251 return;
252 }
253 };
254
255 if repos_missing_rev.is_empty() {
256 debug!("No repos need repo_rev backfill");
257 return;
258 }
259
260 info!(
261 count = repos_missing_rev.len(),
262 "Backfilling repo_rev for existing repos"
263 );
264
265 let results = futures::future::join_all(repos_missing_rev.into_iter().map(|repo| {
266 process_repo_rev(db, &block_store, repo.user_id, repo.repo_root_cid)
267 }))
268 .await;
269
270 let (success, failed) = results
271 .iter()
272 .fold((0, 0), |(s, f), r| match r {
273 Ok(_) => (s + 1, f),
274 Err(user_id) => {
275 warn!(user_id = %user_id, "Failed to update repo_rev");
276 (s, f + 1)
277 }
278 });
279
280 info!(success, failed, "Completed repo_rev backfill");
281}
282
283async fn process_user_blocks(
284 db: &PgPool,
285 block_store: &PostgresBlockStore,
286 user_id: uuid::Uuid,
287 repo_root_cid: String,
288) -> Result<(uuid::Uuid, usize), uuid::Uuid> {
289 let root_cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?;
290 let block_cids = collect_current_repo_blocks(block_store, &root_cid)
291 .await
292 .map_err(|_| user_id)?;
293 if block_cids.is_empty() {
294 return Err(user_id);
295 }
296 let count = block_cids.len();
297 insert_user_blocks(db, user_id, &block_cids)
298 .await
299 .map_err(|_| user_id)?;
300 Ok((user_id, count))
301}
302
303pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
304 let users_without_blocks = match sqlx::query!(
305 r#"
306 SELECT u.id as user_id, r.repo_root_cid
307 FROM users u
308 JOIN repos r ON r.user_id = u.id
309 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
310 "#
311 )
312 .fetch_all(db)
313 .await
314 {
315 Ok(rows) => rows,
316 Err(e) => {
317 error!("Failed to query users for user_blocks backfill: {}", e);
318 return;
319 }
320 };
321
322 if users_without_blocks.is_empty() {
323 debug!("No users need user_blocks backfill");
324 return;
325 }
326
327 info!(
328 count = users_without_blocks.len(),
329 "Backfilling user_blocks for existing repos"
330 );
331
332 let results = futures::future::join_all(users_without_blocks.into_iter().map(|user| {
333 process_user_blocks(db, &block_store, user.user_id, user.repo_root_cid)
334 }))
335 .await;
336
337 let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r {
338 Ok((user_id, count)) => {
339 info!(user_id = %user_id, block_count = count, "Backfilled user_blocks");
340 (s + 1, f)
341 }
342 Err(user_id) => {
343 warn!(user_id = %user_id, "Failed to backfill user_blocks");
344 (s, f + 1)
345 }
346 });
347
348 info!(success, failed, "Completed user_blocks backfill");
349}
350
351pub async fn collect_current_repo_blocks(
352 block_store: &PostgresBlockStore,
353 head_cid: &Cid,
354) -> Result<Vec<Vec<u8>>, String> {
355 let mut block_cids: Vec<Vec<u8>> = Vec::new();
356 let mut to_visit = vec![*head_cid];
357 let mut visited = std::collections::HashSet::new();
358
359 while let Some(cid) = to_visit.pop() {
360 if visited.contains(&cid) {
361 continue;
362 }
363 visited.insert(cid);
364 block_cids.push(cid.to_bytes());
365
366 let block = match block_store.get(&cid).await {
367 Ok(Some(b)) => b,
368 Ok(None) => continue,
369 Err(e) => return Err(format!("Failed to get block {}: {:?}", cid, e)),
370 };
371
372 if let Ok(commit) = Commit::from_cbor(&block) {
373 to_visit.push(commit.data);
374 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
375 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
376 to_visit.push(*left_cid);
377 }
378 if let Some(Ipld::List(entries)) = obj.get("e") {
379 to_visit.extend(
380 entries
381 .iter()
382 .filter_map(|entry| match entry {
383 Ipld::Map(entry_obj) => Some(entry_obj),
384 _ => None,
385 })
386 .flat_map(|entry_obj| {
387 [entry_obj.get("t"), entry_obj.get("v")]
388 .into_iter()
389 .flatten()
390 .filter_map(|v| match v {
391 Ipld::Link(cid) => Some(*cid),
392 _ => None,
393 })
394 }),
395 );
396 }
397 }
398 }
399
400 Ok(block_cids)
401}
402
403async fn process_record_blobs(
404 db: &PgPool,
405 block_store: &PostgresBlockStore,
406 user_id: uuid::Uuid,
407 did: String,
408) -> Result<(uuid::Uuid, String, usize), (uuid::Uuid, &'static str)> {
409 let records = fetch_user_records(db, user_id)
410 .await
411 .map_err(|_| (user_id, "failed to fetch records"))?;
412
413 let mut batch_record_uris: Vec<String> = Vec::new();
414 let mut batch_blob_cids: Vec<String> = Vec::new();
415
416 futures::future::join_all(records.into_iter().map(|(collection, rkey, record_cid)| {
417 let did = did.clone();
418 async move {
419 let cid = Cid::from_str(&record_cid).ok()?;
420 let block_bytes = block_store.get(&cid).await.ok()??;
421 let record_ipld: Ipld = serde_ipld_dagcbor::from_slice(&block_bytes).ok()?;
422 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
423 Some(
424 blob_refs
425 .into_iter()
426 .map(|blob_ref| {
427 let record_uri = format!("at://{}/{}/{}", did, collection, rkey);
428 (record_uri, blob_ref.cid)
429 })
430 .collect::<Vec<_>>(),
431 )
432 }
433 }))
434 .await
435 .into_iter()
436 .flatten()
437 .flatten()
438 .for_each(|(uri, cid)| {
439 batch_record_uris.push(uri);
440 batch_blob_cids.push(cid);
441 });
442
443 let blob_refs_found = batch_record_uris.len();
444 if !batch_record_uris.is_empty() {
445 insert_record_blobs(db, user_id, &batch_record_uris, &batch_blob_cids)
446 .await
447 .map_err(|_| (user_id, "failed to insert"))?;
448 }
449 Ok((user_id, did, blob_refs_found))
450}
451
452pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
453 let users_needing_backfill = match sqlx::query!(
454 r#"
455 SELECT DISTINCT u.id as user_id, u.did
456 FROM users u
457 JOIN records r ON r.repo_id = u.id
458 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
459 LIMIT 100
460 "#
461 )
462 .fetch_all(db)
463 .await
464 {
465 Ok(rows) => rows,
466 Err(e) => {
467 error!("Failed to query users for record_blobs backfill: {}", e);
468 return;
469 }
470 };
471
472 if users_needing_backfill.is_empty() {
473 debug!("No users need record_blobs backfill");
474 return;
475 }
476
477 info!(
478 count = users_needing_backfill.len(),
479 "Backfilling record_blobs for existing repos"
480 );
481
482 let results = futures::future::join_all(users_needing_backfill.into_iter().map(|user| {
483 process_record_blobs(db, &block_store, user.user_id, user.did)
484 }))
485 .await;
486
487 let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r {
488 Ok((user_id, did, blob_refs)) => {
489 if *blob_refs > 0 {
490 info!(user_id = %user_id, did = %did, blob_refs = blob_refs, "Backfilled record_blobs");
491 }
492 (s + 1, f)
493 }
494 Err((user_id, reason)) => {
495 warn!(user_id = %user_id, reason = reason, "Failed to backfill record_blobs");
496 (s, f + 1)
497 }
498 });
499
500 info!(success, failed, "Completed record_blobs backfill");
501}
502
503pub async fn start_scheduled_tasks(
504 db: PgPool,
505 blob_store: Arc<dyn BlobStorage>,
506 mut shutdown_rx: watch::Receiver<bool>,
507) {
508 let check_interval = Duration::from_secs(
509 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
510 .ok()
511 .and_then(|s| s.parse().ok())
512 .unwrap_or(3600),
513 );
514
515 info!(
516 check_interval_secs = check_interval.as_secs(),
517 "Starting scheduled tasks service"
518 );
519
520 let mut ticker = interval(check_interval);
521 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
522
523 loop {
524 tokio::select! {
525 _ = shutdown_rx.changed() => {
526 if *shutdown_rx.borrow() {
527 info!("Scheduled tasks service shutting down");
528 break;
529 }
530 }
531 _ = ticker.tick() => {
532 if let Err(e) = process_scheduled_deletions(&db, blob_store.as_ref()).await {
533 error!("Error processing scheduled deletions: {}", e);
534 }
535 }
536 }
537 }
538}
539
540async fn process_scheduled_deletions(
541 db: &PgPool,
542 blob_store: &dyn BlobStorage,
543) -> Result<(), String> {
544 let accounts_to_delete = sqlx::query!(
545 r#"
546 SELECT did, handle
547 FROM users
548 WHERE delete_after IS NOT NULL
549 AND delete_after < NOW()
550 AND deactivated_at IS NOT NULL
551 LIMIT 100
552 "#
553 )
554 .fetch_all(db)
555 .await
556 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
557
558 if accounts_to_delete.is_empty() {
559 debug!("No accounts scheduled for deletion");
560 return Ok(());
561 }
562
563 info!(
564 count = accounts_to_delete.len(),
565 "Processing scheduled account deletions"
566 );
567
568 futures::future::join_all(accounts_to_delete.into_iter().map(|account| async move {
569 let result = delete_account_data(db, blob_store, &account.did, &account.handle).await;
570 (account.did, account.handle, result)
571 }))
572 .await
573 .into_iter()
574 .for_each(|(did, handle, result)| match result {
575 Ok(()) => info!(did = %did, handle = %handle, "Successfully deleted scheduled account"),
576 Err(e) => warn!(did = %did, handle = %handle, error = %e, "Failed to delete scheduled account"),
577 });
578
579 Ok(())
580}
581
582async fn delete_account_data(
583 db: &PgPool,
584 blob_store: &dyn BlobStorage,
585 did: &str,
586 _handle: &str,
587) -> Result<(), String> {
588 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
589 .fetch_one(db)
590 .await
591 .map_err(|e| format!("DB error fetching user: {}", e))?;
592
593 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
594 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
595 user_id
596 )
597 .fetch_all(db)
598 .await
599 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
600
601 futures::future::join_all(blob_storage_keys.iter().map(|storage_key| async move {
602 (storage_key, blob_store.delete(storage_key).await)
603 }))
604 .await
605 .into_iter()
606 .filter_map(|(key, result)| result.err().map(|e| (key, e)))
607 .for_each(|(key, e)| {
608 warn!(storage_key = %key, error = %e, "Failed to delete blob from storage (continuing anyway)");
609 });
610
611 let mut tx = db
612 .begin()
613 .await
614 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
615
616 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
617 .execute(&mut *tx)
618 .await
619 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
620
621 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
622 .execute(&mut *tx)
623 .await
624 .map_err(|e| format!("Failed to delete user: {}", e))?;
625
626 let account_seq = sqlx::query_scalar!(
627 r#"
628 INSERT INTO repo_seq (did, event_type, active, status)
629 VALUES ($1, 'account', false, 'deleted')
630 RETURNING seq
631 "#,
632 did
633 )
634 .fetch_one(&mut *tx)
635 .await
636 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
637
638 sqlx::query!(
639 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
640 did,
641 account_seq
642 )
643 .execute(&mut *tx)
644 .await
645 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
646
647 tx.commit()
648 .await
649 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
650
651 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
652 .execute(db)
653 .await
654 .map_err(|e| format!("Failed to notify: {}", e))?;
655
656 info!(
657 did = %did,
658 blob_count = blob_storage_keys.len(),
659 "Deleted account data including blobs from storage"
660 );
661
662 Ok(())
663}
664
665pub async fn start_backup_tasks(
666 db: PgPool,
667 block_store: PostgresBlockStore,
668 backup_storage: Arc<BackupStorage>,
669 mut shutdown_rx: watch::Receiver<bool>,
670) {
671 let backup_interval = Duration::from_secs(BackupStorage::interval_secs());
672
673 info!(
674 interval_secs = backup_interval.as_secs(),
675 retention_count = BackupStorage::retention_count(),
676 "Starting backup service"
677 );
678
679 let mut ticker = interval(backup_interval);
680 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
681
682 loop {
683 tokio::select! {
684 _ = shutdown_rx.changed() => {
685 if *shutdown_rx.borrow() {
686 info!("Backup service shutting down");
687 break;
688 }
689 }
690 _ = ticker.tick() => {
691 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await {
692 error!("Error processing scheduled backups: {}", e);
693 }
694 }
695 }
696 }
697}
698
699struct BackupResult {
700 did: String,
701 repo_rev: String,
702 size_bytes: i64,
703 block_count: i32,
704 user_id: uuid::Uuid,
705}
706
707enum BackupOutcome {
708 Success(BackupResult),
709 Skipped(String, &'static str),
710 Failed(String, String),
711}
712
713async fn process_single_backup(
714 db: &PgPool,
715 block_store: &PostgresBlockStore,
716 backup_storage: &BackupStorage,
717 user_id: uuid::Uuid,
718 did: String,
719 repo_root_cid: String,
720 repo_rev: Option<String>,
721) -> BackupOutcome {
722 let repo_rev = match repo_rev {
723 Some(rev) => rev,
724 None => return BackupOutcome::Skipped(did, "no repo_rev"),
725 };
726
727 let head_cid = match Cid::from_str(&repo_root_cid) {
728 Ok(c) => c,
729 Err(_) => return BackupOutcome::Skipped(did, "invalid repo_root_cid"),
730 };
731
732 let car_bytes = match generate_full_backup(db, block_store, user_id, &head_cid).await {
733 Ok(bytes) => bytes,
734 Err(e) => return BackupOutcome::Failed(did, format!("CAR generation: {}", e)),
735 };
736
737 let block_count = count_car_blocks(&car_bytes);
738 let size_bytes = car_bytes.len() as i64;
739
740 let storage_key = match backup_storage.put_backup(&did, &repo_rev, &car_bytes).await {
741 Ok(key) => key,
742 Err(e) => return BackupOutcome::Failed(did, format!("S3 upload: {}", e)),
743 };
744
745 if let Err(e) = insert_backup_record(
746 db,
747 user_id,
748 &storage_key,
749 &repo_root_cid,
750 &repo_rev,
751 block_count,
752 size_bytes,
753 )
754 .await
755 {
756 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await {
757 error!(
758 did = %did,
759 storage_key = %storage_key,
760 error = %rollback_err,
761 "Failed to rollback orphaned backup from S3"
762 );
763 }
764 return BackupOutcome::Failed(did, format!("DB insert: {}", e));
765 }
766
767 BackupOutcome::Success(BackupResult {
768 did,
769 repo_rev,
770 size_bytes,
771 block_count,
772 user_id,
773 })
774}
775
776async fn process_scheduled_backups(
777 db: &PgPool,
778 block_store: &PostgresBlockStore,
779 backup_storage: &BackupStorage,
780) -> Result<(), String> {
781 let backup_interval_secs = BackupStorage::interval_secs() as i64;
782 let retention_count = BackupStorage::retention_count();
783
784 let users_needing_backup = sqlx::query!(
785 r#"
786 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev
787 FROM users u
788 JOIN repos r ON r.user_id = u.id
789 WHERE u.backup_enabled = true
790 AND u.deactivated_at IS NULL
791 AND (
792 NOT EXISTS (
793 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id
794 )
795 OR (
796 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id
797 ) < NOW() - make_interval(secs => $1)
798 )
799 LIMIT 50
800 "#,
801 backup_interval_secs as f64
802 )
803 .fetch_all(db)
804 .await
805 .map_err(|e| format!("DB error fetching users for backup: {}", e))?;
806
807 if users_needing_backup.is_empty() {
808 debug!("No accounts need backup");
809 return Ok(());
810 }
811
812 info!(
813 count = users_needing_backup.len(),
814 "Processing scheduled backups"
815 );
816
817 let results = futures::future::join_all(users_needing_backup.into_iter().map(|user| {
818 process_single_backup(
819 db,
820 block_store,
821 backup_storage,
822 user.user_id,
823 user.did,
824 user.repo_root_cid,
825 user.repo_rev,
826 )
827 }))
828 .await;
829
830 futures::future::join_all(results.into_iter().map(|outcome| async move {
831 match outcome {
832 BackupOutcome::Success(result) => {
833 info!(
834 did = %result.did,
835 rev = %result.repo_rev,
836 size_bytes = result.size_bytes,
837 block_count = result.block_count,
838 "Created backup"
839 );
840 if let Err(e) =
841 cleanup_old_backups(db, backup_storage, result.user_id, retention_count).await
842 {
843 warn!(did = %result.did, error = %e, "Failed to cleanup old backups");
844 }
845 }
846 BackupOutcome::Skipped(did, reason) => {
847 warn!(did = %did, reason = reason, "Skipped backup");
848 }
849 BackupOutcome::Failed(did, error) => {
850 warn!(did = %did, error = %error, "Failed backup");
851 }
852 }
853 }))
854 .await;
855
856 Ok(())
857}
858
859pub async fn generate_repo_car(
860 block_store: &PostgresBlockStore,
861 head_cid: &Cid,
862) -> Result<Vec<u8>, String> {
863 use jacquard_repo::storage::BlockStore;
864
865 let block_cids_bytes = collect_current_repo_blocks(block_store, head_cid).await?;
866 let block_cids: Vec<Cid> = block_cids_bytes
867 .iter()
868 .filter_map(|b| Cid::try_from(b.as_slice()).ok())
869 .collect();
870
871 let car_bytes =
872 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?;
873
874 let blocks = block_store
875 .get_many(&block_cids)
876 .await
877 .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?;
878
879 let car_bytes = block_cids
880 .iter()
881 .zip(blocks.iter())
882 .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block)))
883 .fold(car_bytes, |mut acc, (cid, block)| {
884 acc.extend(encode_car_block(cid, block));
885 acc
886 });
887
888 Ok(car_bytes)
889}
890
891fn encode_car_block(cid: &Cid, block: &[u8]) -> Vec<u8> {
892 use std::io::Write;
893 let cid_bytes = cid.to_bytes();
894 let total_len = cid_bytes.len() + block.len();
895 let mut writer = Vec::new();
896 crate::sync::car::write_varint(&mut writer, total_len as u64)
897 .expect("Writing to Vec<u8> should never fail");
898 writer
899 .write_all(&cid_bytes)
900 .expect("Writing to Vec<u8> should never fail");
901 writer
902 .write_all(block)
903 .expect("Writing to Vec<u8> should never fail");
904 writer
905}
906
907pub async fn generate_repo_car_from_user_blocks(
908 db: &PgPool,
909 block_store: &PostgresBlockStore,
910 user_id: uuid::Uuid,
911 _head_cid: &Cid,
912) -> Result<Vec<u8>, String> {
913 use std::str::FromStr;
914
915 let repo_root_cid_str: String = sqlx::query_scalar!(
916 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
917 user_id
918 )
919 .fetch_optional(db)
920 .await
921 .map_err(|e| format!("Failed to fetch repo: {}", e))?
922 .ok_or_else(|| "Repository not found".to_string())?;
923
924 let actual_head_cid =
925 Cid::from_str(&repo_root_cid_str).map_err(|e| format!("Invalid repo_root_cid: {}", e))?;
926
927 generate_repo_car(block_store, &actual_head_cid).await
928}
929
930pub async fn generate_full_backup(
931 db: &PgPool,
932 block_store: &PostgresBlockStore,
933 user_id: uuid::Uuid,
934 head_cid: &Cid,
935) -> Result<Vec<u8>, String> {
936 generate_repo_car_from_user_blocks(db, block_store, user_id, head_cid).await
937}
938
939pub fn count_car_blocks(car_bytes: &[u8]) -> i32 {
940 let mut count = 0;
941 let mut pos = 0;
942
943 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) {
944 pos += header_varint_len + header_len as usize;
945 } else {
946 return 0;
947 }
948
949 while pos < car_bytes.len() {
950 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) {
951 pos += varint_len + block_len as usize;
952 count += 1;
953 } else {
954 break;
955 }
956 }
957
958 count
959}
960
961fn read_varint(data: &[u8]) -> Option<(u64, usize)> {
962 let mut value: u64 = 0;
963 let mut shift = 0;
964 let mut pos = 0;
965
966 while pos < data.len() && pos < 10 {
967 let byte = data[pos];
968 value |= ((byte & 0x7f) as u64) << shift;
969 pos += 1;
970 if byte & 0x80 == 0 {
971 return Some((value, pos));
972 }
973 shift += 7;
974 }
975
976 None
977}
978
979async fn cleanup_old_backups(
980 db: &PgPool,
981 backup_storage: &BackupStorage,
982 user_id: uuid::Uuid,
983 retention_count: u32,
984) -> Result<(), String> {
985 let old_backups = fetch_old_backups(db, user_id, retention_count as i64)
986 .await
987 .map_err(|e| format!("DB error fetching old backups: {}", e))?;
988
989 let results = futures::future::join_all(old_backups.into_iter().map(|(id, storage_key)| async move {
990 match backup_storage.delete_backup(&storage_key).await {
991 Ok(()) => match delete_backup_record(db, id).await {
992 Ok(()) => Ok(()),
993 Err(e) => Err(format!("DB delete failed for {}: {}", storage_key, e)),
994 },
995 Err(e) => {
996 warn!(
997 storage_key = %storage_key,
998 error = %e,
999 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan"
1000 );
1001 Ok(())
1002 }
1003 }
1004 }))
1005 .await;
1006
1007 results
1008 .into_iter()
1009 .find_map(|r| r.err())
1010 .map_or(Ok(()), Err)
1011}