this repo has no description
1use cid::Cid;
2use ipld_core::ipld::Ipld;
3use jacquard_repo::commit::Commit;
4use jacquard_repo::storage::BlockStore;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::{BackupStorage, BlobStorage};
15use crate::sync::car::encode_car_header;
16
17pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
18 let broken_genesis_commits = match sqlx::query!(
19 r#"
20 SELECT seq, did, commit_cid
21 FROM repo_seq
22 WHERE event_type = 'commit'
23 AND prev_cid IS NULL
24 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)
25 "#
26 )
27 .fetch_all(db)
28 .await
29 {
30 Ok(rows) => rows,
31 Err(e) => {
32 error!("Failed to query repo_seq for genesis commit backfill: {}", e);
33 return;
34 }
35 };
36
37 if broken_genesis_commits.is_empty() {
38 debug!("No genesis commits need blocks_cids backfill");
39 return;
40 }
41
42 info!(
43 count = broken_genesis_commits.len(),
44 "Backfilling blocks_cids for genesis commits"
45 );
46
47 let mut success = 0;
48 let mut failed = 0;
49
50 for commit_row in broken_genesis_commits {
51 let commit_cid_str = match &commit_row.commit_cid {
52 Some(c) => c.clone(),
53 None => {
54 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid");
55 failed += 1;
56 continue;
57 }
58 };
59
60 let commit_cid = match Cid::from_str(&commit_cid_str) {
61 Ok(c) => c,
62 Err(_) => {
63 warn!(seq = commit_row.seq, "Invalid commit CID");
64 failed += 1;
65 continue;
66 }
67 };
68
69 let block = match block_store.get(&commit_cid).await {
70 Ok(Some(b)) => b,
71 Ok(None) => {
72 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store");
73 failed += 1;
74 continue;
75 }
76 Err(e) => {
77 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block");
78 failed += 1;
79 continue;
80 }
81 };
82
83 let commit = match Commit::from_cbor(&block) {
84 Ok(c) => c,
85 Err(e) => {
86 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit");
87 failed += 1;
88 continue;
89 }
90 };
91
92 let mst_root_cid = commit.data;
93 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
94
95 if let Err(e) = sqlx::query!(
96 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
97 &blocks_cids,
98 commit_row.seq
99 )
100 .execute(db)
101 .await
102 {
103 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids");
104 failed += 1;
105 } else {
106 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids");
107 success += 1;
108 }
109 }
110
111 info!(
112 success,
113 failed, "Completed genesis commit blocks_cids backfill"
114 );
115}
116
117pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
118 let repos_missing_rev =
119 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL")
120 .fetch_all(db)
121 .await
122 {
123 Ok(rows) => rows,
124 Err(e) => {
125 error!("Failed to query repos for backfill: {}", e);
126 return;
127 }
128 };
129
130 if repos_missing_rev.is_empty() {
131 debug!("No repos need repo_rev backfill");
132 return;
133 }
134
135 info!(
136 count = repos_missing_rev.len(),
137 "Backfilling repo_rev for existing repos"
138 );
139
140 let mut success = 0;
141 let mut failed = 0;
142
143 for repo in repos_missing_rev {
144 let cid = match Cid::from_str(&repo.repo_root_cid) {
145 Ok(c) => c,
146 Err(_) => {
147 failed += 1;
148 continue;
149 }
150 };
151
152 let block = match block_store.get(&cid).await {
153 Ok(Some(b)) => b,
154 _ => {
155 failed += 1;
156 continue;
157 }
158 };
159
160 let commit = match Commit::from_cbor(&block) {
161 Ok(c) => c,
162 Err(_) => {
163 failed += 1;
164 continue;
165 }
166 };
167
168 let rev = commit.rev().to_string();
169
170 if let Err(e) = sqlx::query!(
171 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
172 rev,
173 repo.user_id
174 )
175 .execute(db)
176 .await
177 {
178 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
179 failed += 1;
180 } else {
181 success += 1;
182 }
183 }
184
185 info!(success, failed, "Completed repo_rev backfill");
186}
187
188pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
189 let users_without_blocks = match sqlx::query!(
190 r#"
191 SELECT u.id as user_id, r.repo_root_cid
192 FROM users u
193 JOIN repos r ON r.user_id = u.id
194 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
195 "#
196 )
197 .fetch_all(db)
198 .await
199 {
200 Ok(rows) => rows,
201 Err(e) => {
202 error!("Failed to query users for user_blocks backfill: {}", e);
203 return;
204 }
205 };
206
207 if users_without_blocks.is_empty() {
208 debug!("No users need user_blocks backfill");
209 return;
210 }
211
212 info!(
213 count = users_without_blocks.len(),
214 "Backfilling user_blocks for existing repos"
215 );
216
217 let mut success = 0;
218 let mut failed = 0;
219
220 for user in users_without_blocks {
221 let root_cid = match Cid::from_str(&user.repo_root_cid) {
222 Ok(c) => c,
223 Err(_) => {
224 failed += 1;
225 continue;
226 }
227 };
228
229 let mut block_cids: Vec<Vec<u8>> = Vec::new();
230 let mut to_visit = vec![root_cid];
231 let mut visited = std::collections::HashSet::new();
232
233 while let Some(cid) = to_visit.pop() {
234 if visited.contains(&cid) {
235 continue;
236 }
237 visited.insert(cid);
238 block_cids.push(cid.to_bytes());
239
240 let block = match block_store.get(&cid).await {
241 Ok(Some(b)) => b,
242 _ => continue,
243 };
244
245 if let Ok(commit) = Commit::from_cbor(&block) {
246 to_visit.push(commit.data);
247 if let Some(prev) = commit.prev {
248 to_visit.push(prev);
249 }
250 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
251 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
252 to_visit.push(*left_cid);
253 }
254 if let Some(Ipld::List(entries)) = obj.get("e") {
255 for entry in entries {
256 if let Ipld::Map(entry_obj) = entry {
257 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
258 to_visit.push(*tree_cid);
259 }
260 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") {
261 to_visit.push(*val_cid);
262 }
263 }
264 }
265 }
266 }
267 }
268
269 if block_cids.is_empty() {
270 failed += 1;
271 continue;
272 }
273
274 if let Err(e) = sqlx::query!(
275 r#"
276 INSERT INTO user_blocks (user_id, block_cid)
277 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
278 ON CONFLICT (user_id, block_cid) DO NOTHING
279 "#,
280 user.user_id,
281 &block_cids
282 )
283 .execute(db)
284 .await
285 {
286 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
287 failed += 1;
288 } else {
289 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
290 success += 1;
291 }
292 }
293
294 info!(success, failed, "Completed user_blocks backfill");
295}
296
297pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
298 let users_needing_backfill = match sqlx::query!(
299 r#"
300 SELECT DISTINCT u.id as user_id, u.did
301 FROM users u
302 JOIN records r ON r.repo_id = u.id
303 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
304 LIMIT 100
305 "#
306 )
307 .fetch_all(db)
308 .await
309 {
310 Ok(rows) => rows,
311 Err(e) => {
312 error!("Failed to query users for record_blobs backfill: {}", e);
313 return;
314 }
315 };
316
317 if users_needing_backfill.is_empty() {
318 debug!("No users need record_blobs backfill");
319 return;
320 }
321
322 info!(
323 count = users_needing_backfill.len(),
324 "Backfilling record_blobs for existing repos"
325 );
326
327 let mut success = 0;
328 let mut failed = 0;
329
330 for user in users_needing_backfill {
331 let records = match sqlx::query!(
332 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
333 user.user_id
334 )
335 .fetch_all(db)
336 .await
337 {
338 Ok(r) => r,
339 Err(e) => {
340 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
341 failed += 1;
342 continue;
343 }
344 };
345
346 let mut blob_refs_found = 0;
347 for record in records {
348 let record_cid = match Cid::from_str(&record.record_cid) {
349 Ok(c) => c,
350 Err(_) => continue,
351 };
352
353 let block_bytes = match block_store.get(&record_cid).await {
354 Ok(Some(b)) => b,
355 _ => continue,
356 };
357
358 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
359 Ok(v) => v,
360 Err(_) => continue,
361 };
362
363 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
364 for blob_ref in blob_refs {
365 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey);
366 if let Err(e) = sqlx::query!(
367 r#"
368 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
369 VALUES ($1, $2, $3)
370 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
371 "#,
372 user.user_id,
373 record_uri,
374 blob_ref.cid
375 )
376 .execute(db)
377 .await
378 {
379 warn!(error = %e, "Failed to insert record_blob during backfill");
380 } else {
381 blob_refs_found += 1;
382 }
383 }
384 }
385
386 if blob_refs_found > 0 {
387 info!(
388 user_id = %user.user_id,
389 did = %user.did,
390 blob_refs = blob_refs_found,
391 "Backfilled record_blobs"
392 );
393 }
394 success += 1;
395 }
396
397 info!(success, failed, "Completed record_blobs backfill");
398}
399
400pub async fn start_scheduled_tasks(
401 db: PgPool,
402 blob_store: Arc<dyn BlobStorage>,
403 mut shutdown_rx: watch::Receiver<bool>,
404) {
405 let check_interval = Duration::from_secs(
406 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
407 .ok()
408 .and_then(|s| s.parse().ok())
409 .unwrap_or(3600),
410 );
411
412 info!(
413 check_interval_secs = check_interval.as_secs(),
414 "Starting scheduled tasks service"
415 );
416
417 let mut ticker = interval(check_interval);
418 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
419
420 loop {
421 tokio::select! {
422 _ = shutdown_rx.changed() => {
423 if *shutdown_rx.borrow() {
424 info!("Scheduled tasks service shutting down");
425 break;
426 }
427 }
428 _ = ticker.tick() => {
429 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await {
430 error!("Error processing scheduled deletions: {}", e);
431 }
432 }
433 }
434 }
435}
436
437async fn process_scheduled_deletions(
438 db: &PgPool,
439 blob_store: &Arc<dyn BlobStorage>,
440) -> Result<(), String> {
441 let accounts_to_delete = sqlx::query!(
442 r#"
443 SELECT did, handle
444 FROM users
445 WHERE delete_after IS NOT NULL
446 AND delete_after < NOW()
447 AND deactivated_at IS NOT NULL
448 LIMIT 100
449 "#
450 )
451 .fetch_all(db)
452 .await
453 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
454
455 if accounts_to_delete.is_empty() {
456 debug!("No accounts scheduled for deletion");
457 return Ok(());
458 }
459
460 info!(
461 count = accounts_to_delete.len(),
462 "Processing scheduled account deletions"
463 );
464
465 for account in accounts_to_delete {
466 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
467 warn!(
468 did = %account.did,
469 handle = %account.handle,
470 error = %e,
471 "Failed to delete scheduled account"
472 );
473 } else {
474 info!(
475 did = %account.did,
476 handle = %account.handle,
477 "Successfully deleted scheduled account"
478 );
479 }
480 }
481
482 Ok(())
483}
484
485async fn delete_account_data(
486 db: &PgPool,
487 blob_store: &Arc<dyn BlobStorage>,
488 did: &str,
489 _handle: &str,
490) -> Result<(), String> {
491 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
492 .fetch_one(db)
493 .await
494 .map_err(|e| format!("DB error fetching user: {}", e))?;
495
496 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
497 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
498 user_id
499 )
500 .fetch_all(db)
501 .await
502 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
503
504 for storage_key in &blob_storage_keys {
505 if let Err(e) = blob_store.delete(storage_key).await {
506 warn!(
507 storage_key = %storage_key,
508 error = %e,
509 "Failed to delete blob from storage (continuing anyway)"
510 );
511 }
512 }
513
514 let mut tx = db
515 .begin()
516 .await
517 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
518
519 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
520 .execute(&mut *tx)
521 .await
522 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
523
524 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
525 .execute(&mut *tx)
526 .await
527 .map_err(|e| format!("Failed to delete user: {}", e))?;
528
529 let account_seq = sqlx::query_scalar!(
530 r#"
531 INSERT INTO repo_seq (did, event_type, active, status)
532 VALUES ($1, 'account', false, 'deleted')
533 RETURNING seq
534 "#,
535 did
536 )
537 .fetch_one(&mut *tx)
538 .await
539 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
540
541 sqlx::query!(
542 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
543 did,
544 account_seq
545 )
546 .execute(&mut *tx)
547 .await
548 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
549
550 tx.commit()
551 .await
552 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
553
554 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
555 .execute(db)
556 .await
557 .map_err(|e| format!("Failed to notify: {}", e))?;
558
559 info!(
560 did = %did,
561 blob_count = blob_storage_keys.len(),
562 "Deleted account data including blobs from storage"
563 );
564
565 Ok(())
566}
567
568pub async fn start_backup_tasks(
569 db: PgPool,
570 block_store: PostgresBlockStore,
571 backup_storage: Arc<BackupStorage>,
572 mut shutdown_rx: watch::Receiver<bool>,
573) {
574 let backup_interval = Duration::from_secs(BackupStorage::interval_secs());
575
576 info!(
577 interval_secs = backup_interval.as_secs(),
578 retention_count = BackupStorage::retention_count(),
579 "Starting backup service"
580 );
581
582 let mut ticker = interval(backup_interval);
583 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
584
585 loop {
586 tokio::select! {
587 _ = shutdown_rx.changed() => {
588 if *shutdown_rx.borrow() {
589 info!("Backup service shutting down");
590 break;
591 }
592 }
593 _ = ticker.tick() => {
594 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await {
595 error!("Error processing scheduled backups: {}", e);
596 }
597 }
598 }
599 }
600}
601
602async fn process_scheduled_backups(
603 db: &PgPool,
604 block_store: &PostgresBlockStore,
605 backup_storage: &BackupStorage,
606) -> Result<(), String> {
607 let backup_interval_secs = BackupStorage::interval_secs() as i64;
608 let retention_count = BackupStorage::retention_count();
609
610 let users_needing_backup = sqlx::query!(
611 r#"
612 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev
613 FROM users u
614 JOIN repos r ON r.user_id = u.id
615 WHERE u.backup_enabled = true
616 AND u.deactivated_at IS NULL
617 AND (
618 NOT EXISTS (
619 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id
620 )
621 OR (
622 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id
623 ) < NOW() - make_interval(secs => $1)
624 )
625 LIMIT 50
626 "#,
627 backup_interval_secs as f64
628 )
629 .fetch_all(db)
630 .await
631 .map_err(|e| format!("DB error fetching users for backup: {}", e))?;
632
633 if users_needing_backup.is_empty() {
634 debug!("No accounts need backup");
635 return Ok(());
636 }
637
638 info!(
639 count = users_needing_backup.len(),
640 "Processing scheduled backups"
641 );
642
643 for user in users_needing_backup {
644 let repo_root_cid = user.repo_root_cid.clone();
645
646 let repo_rev = match &user.repo_rev {
647 Some(rev) => rev.clone(),
648 None => {
649 warn!(did = %user.did, "User has no repo_rev, skipping backup");
650 continue;
651 }
652 };
653
654 let head_cid = match Cid::from_str(&repo_root_cid) {
655 Ok(c) => c,
656 Err(e) => {
657 warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup");
658 continue;
659 }
660 };
661
662 let car_result = generate_full_backup(block_store, &head_cid).await;
663 let car_bytes = match car_result {
664 Ok(bytes) => bytes,
665 Err(e) => {
666 warn!(did = %user.did, error = %e, "Failed to generate CAR for backup");
667 continue;
668 }
669 };
670
671 let block_count = count_car_blocks(&car_bytes);
672 let size_bytes = car_bytes.len() as i64;
673
674 let storage_key = match backup_storage
675 .put_backup(&user.did, &repo_rev, &car_bytes)
676 .await
677 {
678 Ok(key) => key,
679 Err(e) => {
680 warn!(did = %user.did, error = %e, "Failed to upload backup to storage");
681 continue;
682 }
683 };
684
685 if let Err(e) = sqlx::query!(
686 r#"
687 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)
688 VALUES ($1, $2, $3, $4, $5, $6)
689 "#,
690 user.user_id,
691 storage_key,
692 repo_root_cid,
693 repo_rev,
694 block_count,
695 size_bytes
696 )
697 .execute(db)
698 .await
699 {
700 warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload");
701 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await {
702 error!(
703 did = %user.did,
704 storage_key = %storage_key,
705 error = %rollback_err,
706 "Failed to rollback orphaned backup from S3"
707 );
708 }
709 continue;
710 }
711
712 info!(
713 did = %user.did,
714 rev = %repo_rev,
715 size_bytes,
716 block_count,
717 "Created backup"
718 );
719
720 if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await
721 {
722 warn!(did = %user.did, error = %e, "Failed to cleanup old backups");
723 }
724 }
725
726 Ok(())
727}
728
729pub async fn generate_repo_car(
730 block_store: &PostgresBlockStore,
731 head_cid: &Cid,
732) -> Result<Vec<u8>, String> {
733 use jacquard_repo::storage::BlockStore;
734 use std::io::Write;
735
736 let mut car_bytes =
737 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?;
738
739 let mut stack = vec![*head_cid];
740 let mut visited = std::collections::HashSet::new();
741
742 while let Some(cid) = stack.pop() {
743 if visited.contains(&cid) {
744 continue;
745 }
746 visited.insert(cid);
747
748 if let Ok(Some(block)) = block_store.get(&cid).await {
749 let cid_bytes = cid.to_bytes();
750 let total_len = cid_bytes.len() + block.len();
751 let mut writer = Vec::new();
752 crate::sync::car::write_varint(&mut writer, total_len as u64)
753 .expect("Writing to Vec<u8> should never fail");
754 writer
755 .write_all(&cid_bytes)
756 .expect("Writing to Vec<u8> should never fail");
757 writer
758 .write_all(&block)
759 .expect("Writing to Vec<u8> should never fail");
760 car_bytes.extend_from_slice(&writer);
761
762 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
763 extract_links(&value, &mut stack);
764 }
765 }
766 }
767
768 Ok(car_bytes)
769}
770
771pub async fn generate_full_backup(
772 block_store: &PostgresBlockStore,
773 head_cid: &Cid,
774) -> Result<Vec<u8>, String> {
775 generate_repo_car(block_store, head_cid).await
776}
777
778fn extract_links(value: &Ipld, stack: &mut Vec<Cid>) {
779 match value {
780 Ipld::Link(cid) => {
781 stack.push(*cid);
782 }
783 Ipld::Map(map) => {
784 for v in map.values() {
785 extract_links(v, stack);
786 }
787 }
788 Ipld::List(arr) => {
789 for v in arr {
790 extract_links(v, stack);
791 }
792 }
793 _ => {}
794 }
795}
796
797pub fn count_car_blocks(car_bytes: &[u8]) -> i32 {
798 let mut count = 0;
799 let mut pos = 0;
800
801 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) {
802 pos += header_varint_len + header_len as usize;
803 } else {
804 return 0;
805 }
806
807 while pos < car_bytes.len() {
808 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) {
809 pos += varint_len + block_len as usize;
810 count += 1;
811 } else {
812 break;
813 }
814 }
815
816 count
817}
818
819fn read_varint(data: &[u8]) -> Option<(u64, usize)> {
820 let mut value: u64 = 0;
821 let mut shift = 0;
822 let mut pos = 0;
823
824 while pos < data.len() && pos < 10 {
825 let byte = data[pos];
826 value |= ((byte & 0x7f) as u64) << shift;
827 pos += 1;
828 if byte & 0x80 == 0 {
829 return Some((value, pos));
830 }
831 shift += 7;
832 }
833
834 None
835}
836
837async fn cleanup_old_backups(
838 db: &PgPool,
839 backup_storage: &BackupStorage,
840 user_id: uuid::Uuid,
841 retention_count: u32,
842) -> Result<(), String> {
843 let old_backups = sqlx::query!(
844 r#"
845 SELECT id, storage_key
846 FROM account_backups
847 WHERE user_id = $1
848 ORDER BY created_at DESC
849 OFFSET $2
850 "#,
851 user_id,
852 retention_count as i64
853 )
854 .fetch_all(db)
855 .await
856 .map_err(|e| format!("DB error fetching old backups: {}", e))?;
857
858 for backup in old_backups {
859 if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await {
860 warn!(
861 storage_key = %backup.storage_key,
862 error = %e,
863 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan"
864 );
865 continue;
866 }
867
868 sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id)
869 .execute(db)
870 .await
871 .map_err(|e| format!("Failed to delete old backup record: {}", e))?;
872 }
873
874 Ok(())
875}