this repo has no description
1use cid::Cid;
2use ipld_core::ipld::Ipld;
3use jacquard_repo::commit::Commit;
4use jacquard_repo::storage::BlockStore;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::{BackupStorage, BlobStorage};
15use crate::sync::car::encode_car_header;
16
17pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
18 let broken_genesis_commits = match sqlx::query!(
19 r#"
20 SELECT seq, did, commit_cid
21 FROM repo_seq
22 WHERE event_type = 'commit'
23 AND prev_cid IS NULL
24 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)
25 "#
26 )
27 .fetch_all(db)
28 .await
29 {
30 Ok(rows) => rows,
31 Err(e) => {
32 error!("Failed to query repo_seq for genesis commit backfill: {}", e);
33 return;
34 }
35 };
36
37 if broken_genesis_commits.is_empty() {
38 debug!("No genesis commits need blocks_cids backfill");
39 return;
40 }
41
42 info!(
43 count = broken_genesis_commits.len(),
44 "Backfilling blocks_cids for genesis commits"
45 );
46
47 let mut success = 0;
48 let mut failed = 0;
49
50 for commit_row in broken_genesis_commits {
51 let commit_cid_str = match &commit_row.commit_cid {
52 Some(c) => c.clone(),
53 None => {
54 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid");
55 failed += 1;
56 continue;
57 }
58 };
59
60 let commit_cid = match Cid::from_str(&commit_cid_str) {
61 Ok(c) => c,
62 Err(_) => {
63 warn!(seq = commit_row.seq, "Invalid commit CID");
64 failed += 1;
65 continue;
66 }
67 };
68
69 let block = match block_store.get(&commit_cid).await {
70 Ok(Some(b)) => b,
71 Ok(None) => {
72 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store");
73 failed += 1;
74 continue;
75 }
76 Err(e) => {
77 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block");
78 failed += 1;
79 continue;
80 }
81 };
82
83 let commit = match Commit::from_cbor(&block) {
84 Ok(c) => c,
85 Err(e) => {
86 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit");
87 failed += 1;
88 continue;
89 }
90 };
91
92 let mst_root_cid = commit.data;
93 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
94
95 if let Err(e) = sqlx::query!(
96 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
97 &blocks_cids,
98 commit_row.seq
99 )
100 .execute(db)
101 .await
102 {
103 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids");
104 failed += 1;
105 } else {
106 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids");
107 success += 1;
108 }
109 }
110
111 info!(
112 success,
113 failed, "Completed genesis commit blocks_cids backfill"
114 );
115}
116
117pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
118 let repos_missing_rev =
119 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL")
120 .fetch_all(db)
121 .await
122 {
123 Ok(rows) => rows,
124 Err(e) => {
125 error!("Failed to query repos for backfill: {}", e);
126 return;
127 }
128 };
129
130 if repos_missing_rev.is_empty() {
131 debug!("No repos need repo_rev backfill");
132 return;
133 }
134
135 info!(
136 count = repos_missing_rev.len(),
137 "Backfilling repo_rev for existing repos"
138 );
139
140 let mut success = 0;
141 let mut failed = 0;
142
143 for repo in repos_missing_rev {
144 let cid = match Cid::from_str(&repo.repo_root_cid) {
145 Ok(c) => c,
146 Err(_) => {
147 failed += 1;
148 continue;
149 }
150 };
151
152 let block = match block_store.get(&cid).await {
153 Ok(Some(b)) => b,
154 _ => {
155 failed += 1;
156 continue;
157 }
158 };
159
160 let commit = match Commit::from_cbor(&block) {
161 Ok(c) => c,
162 Err(_) => {
163 failed += 1;
164 continue;
165 }
166 };
167
168 let rev = commit.rev().to_string();
169
170 if let Err(e) = sqlx::query!(
171 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
172 rev,
173 repo.user_id
174 )
175 .execute(db)
176 .await
177 {
178 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
179 failed += 1;
180 } else {
181 success += 1;
182 }
183 }
184
185 info!(success, failed, "Completed repo_rev backfill");
186}
187
188pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
189 let users_without_blocks = match sqlx::query!(
190 r#"
191 SELECT u.id as user_id, r.repo_root_cid
192 FROM users u
193 JOIN repos r ON r.user_id = u.id
194 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
195 "#
196 )
197 .fetch_all(db)
198 .await
199 {
200 Ok(rows) => rows,
201 Err(e) => {
202 error!("Failed to query users for user_blocks backfill: {}", e);
203 return;
204 }
205 };
206
207 if users_without_blocks.is_empty() {
208 debug!("No users need user_blocks backfill");
209 return;
210 }
211
212 info!(
213 count = users_without_blocks.len(),
214 "Backfilling user_blocks for existing repos"
215 );
216
217 let mut success = 0;
218 let mut failed = 0;
219
220 for user in users_without_blocks {
221 let root_cid = match Cid::from_str(&user.repo_root_cid) {
222 Ok(c) => c,
223 Err(_) => {
224 failed += 1;
225 continue;
226 }
227 };
228
229 let mut block_cids: Vec<Vec<u8>> = Vec::new();
230 let mut to_visit = vec![root_cid];
231 let mut visited = std::collections::HashSet::new();
232
233 while let Some(cid) = to_visit.pop() {
234 if visited.contains(&cid) {
235 continue;
236 }
237 visited.insert(cid);
238 block_cids.push(cid.to_bytes());
239
240 let block = match block_store.get(&cid).await {
241 Ok(Some(b)) => b,
242 _ => continue,
243 };
244
245 if let Ok(commit) = Commit::from_cbor(&block) {
246 to_visit.push(commit.data);
247 if let Some(prev) = commit.prev {
248 to_visit.push(prev);
249 }
250 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
251 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
252 to_visit.push(*left_cid);
253 }
254 if let Some(Ipld::List(entries)) = obj.get("e") {
255 for entry in entries {
256 if let Ipld::Map(entry_obj) = entry {
257 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
258 to_visit.push(*tree_cid);
259 }
260 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") {
261 to_visit.push(*val_cid);
262 }
263 }
264 }
265 }
266 }
267 }
268
269 if block_cids.is_empty() {
270 failed += 1;
271 continue;
272 }
273
274 if let Err(e) = sqlx::query!(
275 r#"
276 INSERT INTO user_blocks (user_id, block_cid)
277 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
278 ON CONFLICT (user_id, block_cid) DO NOTHING
279 "#,
280 user.user_id,
281 &block_cids
282 )
283 .execute(db)
284 .await
285 {
286 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
287 failed += 1;
288 } else {
289 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
290 success += 1;
291 }
292 }
293
294 info!(success, failed, "Completed user_blocks backfill");
295}
296
297pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
298 let users_needing_backfill = match sqlx::query!(
299 r#"
300 SELECT DISTINCT u.id as user_id, u.did
301 FROM users u
302 JOIN records r ON r.repo_id = u.id
303 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
304 LIMIT 100
305 "#
306 )
307 .fetch_all(db)
308 .await
309 {
310 Ok(rows) => rows,
311 Err(e) => {
312 error!("Failed to query users for record_blobs backfill: {}", e);
313 return;
314 }
315 };
316
317 if users_needing_backfill.is_empty() {
318 debug!("No users need record_blobs backfill");
319 return;
320 }
321
322 info!(
323 count = users_needing_backfill.len(),
324 "Backfilling record_blobs for existing repos"
325 );
326
327 let mut success = 0;
328 let mut failed = 0;
329
330 for user in users_needing_backfill {
331 let records = match sqlx::query!(
332 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
333 user.user_id
334 )
335 .fetch_all(db)
336 .await
337 {
338 Ok(r) => r,
339 Err(e) => {
340 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
341 failed += 1;
342 continue;
343 }
344 };
345
346 let mut batch_record_uris: Vec<String> = Vec::new();
347 let mut batch_blob_cids: Vec<String> = Vec::new();
348
349 for record in records {
350 let record_cid = match Cid::from_str(&record.record_cid) {
351 Ok(c) => c,
352 Err(_) => continue,
353 };
354
355 let block_bytes = match block_store.get(&record_cid).await {
356 Ok(Some(b)) => b,
357 _ => continue,
358 };
359
360 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
361 Ok(v) => v,
362 Err(_) => continue,
363 };
364
365 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
366 for blob_ref in blob_refs {
367 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey);
368 batch_record_uris.push(record_uri);
369 batch_blob_cids.push(blob_ref.cid);
370 }
371 }
372
373 let blob_refs_found = batch_record_uris.len();
374 if !batch_record_uris.is_empty() {
375 if let Err(e) = sqlx::query!(
376 r#"
377 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
378 SELECT $1, record_uri, blob_cid
379 FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)
380 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
381 "#,
382 user.user_id,
383 &batch_record_uris,
384 &batch_blob_cids
385 )
386 .execute(db)
387 .await
388 {
389 warn!(error = %e, "Failed to batch insert record_blobs during backfill");
390 } else {
391 info!(
392 user_id = %user.user_id,
393 did = %user.did,
394 blob_refs = blob_refs_found,
395 "Backfilled record_blobs"
396 );
397 }
398 }
399 success += 1;
400 }
401
402 info!(success, failed, "Completed record_blobs backfill");
403}
404
405pub async fn start_scheduled_tasks(
406 db: PgPool,
407 blob_store: Arc<dyn BlobStorage>,
408 mut shutdown_rx: watch::Receiver<bool>,
409) {
410 let check_interval = Duration::from_secs(
411 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
412 .ok()
413 .and_then(|s| s.parse().ok())
414 .unwrap_or(3600),
415 );
416
417 info!(
418 check_interval_secs = check_interval.as_secs(),
419 "Starting scheduled tasks service"
420 );
421
422 let mut ticker = interval(check_interval);
423 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
424
425 loop {
426 tokio::select! {
427 _ = shutdown_rx.changed() => {
428 if *shutdown_rx.borrow() {
429 info!("Scheduled tasks service shutting down");
430 break;
431 }
432 }
433 _ = ticker.tick() => {
434 if let Err(e) = process_scheduled_deletions(&db, blob_store.as_ref()).await {
435 error!("Error processing scheduled deletions: {}", e);
436 }
437 }
438 }
439 }
440}
441
442async fn process_scheduled_deletions(
443 db: &PgPool,
444 blob_store: &dyn BlobStorage,
445) -> Result<(), String> {
446 let accounts_to_delete = sqlx::query!(
447 r#"
448 SELECT did, handle
449 FROM users
450 WHERE delete_after IS NOT NULL
451 AND delete_after < NOW()
452 AND deactivated_at IS NOT NULL
453 LIMIT 100
454 "#
455 )
456 .fetch_all(db)
457 .await
458 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
459
460 if accounts_to_delete.is_empty() {
461 debug!("No accounts scheduled for deletion");
462 return Ok(());
463 }
464
465 info!(
466 count = accounts_to_delete.len(),
467 "Processing scheduled account deletions"
468 );
469
470 for account in accounts_to_delete {
471 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
472 warn!(
473 did = %account.did,
474 handle = %account.handle,
475 error = %e,
476 "Failed to delete scheduled account"
477 );
478 } else {
479 info!(
480 did = %account.did,
481 handle = %account.handle,
482 "Successfully deleted scheduled account"
483 );
484 }
485 }
486
487 Ok(())
488}
489
490async fn delete_account_data(
491 db: &PgPool,
492 blob_store: &dyn BlobStorage,
493 did: &str,
494 _handle: &str,
495) -> Result<(), String> {
496 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
497 .fetch_one(db)
498 .await
499 .map_err(|e| format!("DB error fetching user: {}", e))?;
500
501 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
502 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
503 user_id
504 )
505 .fetch_all(db)
506 .await
507 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
508
509 for storage_key in &blob_storage_keys {
510 if let Err(e) = blob_store.delete(storage_key).await {
511 warn!(
512 storage_key = %storage_key,
513 error = %e,
514 "Failed to delete blob from storage (continuing anyway)"
515 );
516 }
517 }
518
519 let mut tx = db
520 .begin()
521 .await
522 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
523
524 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
525 .execute(&mut *tx)
526 .await
527 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
528
529 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
530 .execute(&mut *tx)
531 .await
532 .map_err(|e| format!("Failed to delete user: {}", e))?;
533
534 let account_seq = sqlx::query_scalar!(
535 r#"
536 INSERT INTO repo_seq (did, event_type, active, status)
537 VALUES ($1, 'account', false, 'deleted')
538 RETURNING seq
539 "#,
540 did
541 )
542 .fetch_one(&mut *tx)
543 .await
544 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
545
546 sqlx::query!(
547 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
548 did,
549 account_seq
550 )
551 .execute(&mut *tx)
552 .await
553 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
554
555 tx.commit()
556 .await
557 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
558
559 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
560 .execute(db)
561 .await
562 .map_err(|e| format!("Failed to notify: {}", e))?;
563
564 info!(
565 did = %did,
566 blob_count = blob_storage_keys.len(),
567 "Deleted account data including blobs from storage"
568 );
569
570 Ok(())
571}
572
573pub async fn start_backup_tasks(
574 db: PgPool,
575 block_store: PostgresBlockStore,
576 backup_storage: Arc<BackupStorage>,
577 mut shutdown_rx: watch::Receiver<bool>,
578) {
579 let backup_interval = Duration::from_secs(BackupStorage::interval_secs());
580
581 info!(
582 interval_secs = backup_interval.as_secs(),
583 retention_count = BackupStorage::retention_count(),
584 "Starting backup service"
585 );
586
587 let mut ticker = interval(backup_interval);
588 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
589
590 loop {
591 tokio::select! {
592 _ = shutdown_rx.changed() => {
593 if *shutdown_rx.borrow() {
594 info!("Backup service shutting down");
595 break;
596 }
597 }
598 _ = ticker.tick() => {
599 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await {
600 error!("Error processing scheduled backups: {}", e);
601 }
602 }
603 }
604 }
605}
606
607async fn process_scheduled_backups(
608 db: &PgPool,
609 block_store: &PostgresBlockStore,
610 backup_storage: &BackupStorage,
611) -> Result<(), String> {
612 let backup_interval_secs = BackupStorage::interval_secs() as i64;
613 let retention_count = BackupStorage::retention_count();
614
615 let users_needing_backup = sqlx::query!(
616 r#"
617 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev
618 FROM users u
619 JOIN repos r ON r.user_id = u.id
620 WHERE u.backup_enabled = true
621 AND u.deactivated_at IS NULL
622 AND (
623 NOT EXISTS (
624 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id
625 )
626 OR (
627 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id
628 ) < NOW() - make_interval(secs => $1)
629 )
630 LIMIT 50
631 "#,
632 backup_interval_secs as f64
633 )
634 .fetch_all(db)
635 .await
636 .map_err(|e| format!("DB error fetching users for backup: {}", e))?;
637
638 if users_needing_backup.is_empty() {
639 debug!("No accounts need backup");
640 return Ok(());
641 }
642
643 info!(
644 count = users_needing_backup.len(),
645 "Processing scheduled backups"
646 );
647
648 for user in users_needing_backup {
649 let repo_root_cid = user.repo_root_cid.clone();
650
651 let repo_rev = match &user.repo_rev {
652 Some(rev) => rev.clone(),
653 None => {
654 warn!(did = %user.did, "User has no repo_rev, skipping backup");
655 continue;
656 }
657 };
658
659 let head_cid = match Cid::from_str(&repo_root_cid) {
660 Ok(c) => c,
661 Err(e) => {
662 warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup");
663 continue;
664 }
665 };
666
667 let car_result = generate_full_backup(block_store, &head_cid).await;
668 let car_bytes = match car_result {
669 Ok(bytes) => bytes,
670 Err(e) => {
671 warn!(did = %user.did, error = %e, "Failed to generate CAR for backup");
672 continue;
673 }
674 };
675
676 let block_count = count_car_blocks(&car_bytes);
677 let size_bytes = car_bytes.len() as i64;
678
679 let storage_key = match backup_storage
680 .put_backup(&user.did, &repo_rev, &car_bytes)
681 .await
682 {
683 Ok(key) => key,
684 Err(e) => {
685 warn!(did = %user.did, error = %e, "Failed to upload backup to storage");
686 continue;
687 }
688 };
689
690 if let Err(e) = sqlx::query!(
691 r#"
692 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)
693 VALUES ($1, $2, $3, $4, $5, $6)
694 "#,
695 user.user_id,
696 storage_key,
697 repo_root_cid,
698 repo_rev,
699 block_count,
700 size_bytes
701 )
702 .execute(db)
703 .await
704 {
705 warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload");
706 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await {
707 error!(
708 did = %user.did,
709 storage_key = %storage_key,
710 error = %rollback_err,
711 "Failed to rollback orphaned backup from S3"
712 );
713 }
714 continue;
715 }
716
717 info!(
718 did = %user.did,
719 rev = %repo_rev,
720 size_bytes,
721 block_count,
722 "Created backup"
723 );
724
725 if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await
726 {
727 warn!(did = %user.did, error = %e, "Failed to cleanup old backups");
728 }
729 }
730
731 Ok(())
732}
733
734pub async fn generate_repo_car(
735 block_store: &PostgresBlockStore,
736 head_cid: &Cid,
737) -> Result<Vec<u8>, String> {
738 use jacquard_repo::storage::BlockStore;
739 use std::io::Write;
740
741 let mut car_bytes =
742 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?;
743
744 let mut stack = vec![*head_cid];
745 let mut visited = std::collections::HashSet::new();
746
747 while let Some(cid) = stack.pop() {
748 if visited.contains(&cid) {
749 continue;
750 }
751 visited.insert(cid);
752
753 if let Ok(Some(block)) = block_store.get(&cid).await {
754 let cid_bytes = cid.to_bytes();
755 let total_len = cid_bytes.len() + block.len();
756 let mut writer = Vec::new();
757 crate::sync::car::write_varint(&mut writer, total_len as u64)
758 .expect("Writing to Vec<u8> should never fail");
759 writer
760 .write_all(&cid_bytes)
761 .expect("Writing to Vec<u8> should never fail");
762 writer
763 .write_all(&block)
764 .expect("Writing to Vec<u8> should never fail");
765 car_bytes.extend_from_slice(&writer);
766
767 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
768 extract_links(&value, &mut stack);
769 }
770 }
771 }
772
773 Ok(car_bytes)
774}
775
776pub async fn generate_full_backup(
777 block_store: &PostgresBlockStore,
778 head_cid: &Cid,
779) -> Result<Vec<u8>, String> {
780 generate_repo_car(block_store, head_cid).await
781}
782
783fn extract_links(value: &Ipld, stack: &mut Vec<Cid>) {
784 match value {
785 Ipld::Link(cid) => {
786 stack.push(*cid);
787 }
788 Ipld::Map(map) => {
789 for v in map.values() {
790 extract_links(v, stack);
791 }
792 }
793 Ipld::List(arr) => {
794 for v in arr {
795 extract_links(v, stack);
796 }
797 }
798 _ => {}
799 }
800}
801
802pub fn count_car_blocks(car_bytes: &[u8]) -> i32 {
803 let mut count = 0;
804 let mut pos = 0;
805
806 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) {
807 pos += header_varint_len + header_len as usize;
808 } else {
809 return 0;
810 }
811
812 while pos < car_bytes.len() {
813 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) {
814 pos += varint_len + block_len as usize;
815 count += 1;
816 } else {
817 break;
818 }
819 }
820
821 count
822}
823
824fn read_varint(data: &[u8]) -> Option<(u64, usize)> {
825 let mut value: u64 = 0;
826 let mut shift = 0;
827 let mut pos = 0;
828
829 while pos < data.len() && pos < 10 {
830 let byte = data[pos];
831 value |= ((byte & 0x7f) as u64) << shift;
832 pos += 1;
833 if byte & 0x80 == 0 {
834 return Some((value, pos));
835 }
836 shift += 7;
837 }
838
839 None
840}
841
842async fn cleanup_old_backups(
843 db: &PgPool,
844 backup_storage: &BackupStorage,
845 user_id: uuid::Uuid,
846 retention_count: u32,
847) -> Result<(), String> {
848 let old_backups = sqlx::query!(
849 r#"
850 SELECT id, storage_key
851 FROM account_backups
852 WHERE user_id = $1
853 ORDER BY created_at DESC
854 OFFSET $2
855 "#,
856 user_id,
857 retention_count as i64
858 )
859 .fetch_all(db)
860 .await
861 .map_err(|e| format!("DB error fetching old backups: {}", e))?;
862
863 for backup in old_backups {
864 if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await {
865 warn!(
866 storage_key = %backup.storage_key,
867 error = %e,
868 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan"
869 );
870 continue;
871 }
872
873 sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id)
874 .execute(db)
875 .await
876 .map_err(|e| format!("Failed to delete old backup record: {}", e))?;
877 }
878
879 Ok(())
880}