this repo has no description
1use cid::Cid; 2use ipld_core::ipld::Ipld; 3use jacquard_repo::commit::Commit; 4use jacquard_repo::storage::BlockStore; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::{BackupStorage, BlobStorage}; 15use crate::sync::car::encode_car_header; 16 17pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 18 let broken_genesis_commits = match sqlx::query!( 19 r#" 20 SELECT seq, did, commit_cid 21 FROM repo_seq 22 WHERE event_type = 'commit' 23 AND prev_cid IS NULL 24 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 25 "# 26 ) 27 .fetch_all(db) 28 .await 29 { 30 Ok(rows) => rows, 31 Err(e) => { 32 error!("Failed to query repo_seq for genesis commit backfill: {}", e); 33 return; 34 } 35 }; 36 37 if broken_genesis_commits.is_empty() { 38 debug!("No genesis commits need blocks_cids backfill"); 39 return; 40 } 41 42 info!( 43 count = broken_genesis_commits.len(), 44 "Backfilling blocks_cids for genesis commits" 45 ); 46 47 let mut success = 0; 48 let mut failed = 0; 49 50 for commit_row in broken_genesis_commits { 51 let commit_cid_str = match &commit_row.commit_cid { 52 Some(c) => c.clone(), 53 None => { 54 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 55 failed += 1; 56 continue; 57 } 58 }; 59 60 let commit_cid = match Cid::from_str(&commit_cid_str) { 61 Ok(c) => c, 62 Err(_) => { 63 warn!(seq = commit_row.seq, "Invalid commit CID"); 64 failed += 1; 65 continue; 66 } 67 }; 68 69 let block = match block_store.get(&commit_cid).await { 70 Ok(Some(b)) => b, 71 Ok(None) => { 72 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 73 failed += 1; 74 continue; 75 } 76 Err(e) => { 77 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 78 failed += 1; 79 continue; 80 } 81 }; 82 83 let commit = match Commit::from_cbor(&block) { 84 Ok(c) => c, 85 Err(e) => { 86 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 87 failed += 1; 88 continue; 89 } 90 }; 91 92 let mst_root_cid = commit.data; 93 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 94 95 if let Err(e) = sqlx::query!( 96 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 97 &blocks_cids, 98 commit_row.seq 99 ) 100 .execute(db) 101 .await 102 { 103 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 104 failed += 1; 105 } else { 106 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 107 success += 1; 108 } 109 } 110 111 info!( 112 success, 113 failed, "Completed genesis commit blocks_cids backfill" 114 ); 115} 116 117pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 118 let repos_missing_rev = 119 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL") 120 .fetch_all(db) 121 .await 122 { 123 Ok(rows) => rows, 124 Err(e) => { 125 error!("Failed to query repos for backfill: {}", e); 126 return; 127 } 128 }; 129 130 if repos_missing_rev.is_empty() { 131 debug!("No repos need repo_rev backfill"); 132 return; 133 } 134 135 info!( 136 count = repos_missing_rev.len(), 137 "Backfilling repo_rev for existing repos" 138 ); 139 140 let mut success = 0; 141 let mut failed = 0; 142 143 for repo in repos_missing_rev { 144 let cid = match Cid::from_str(&repo.repo_root_cid) { 145 Ok(c) => c, 146 Err(_) => { 147 failed += 1; 148 continue; 149 } 150 }; 151 152 let block = match block_store.get(&cid).await { 153 Ok(Some(b)) => b, 154 _ => { 155 failed += 1; 156 continue; 157 } 158 }; 159 160 let commit = match Commit::from_cbor(&block) { 161 Ok(c) => c, 162 Err(_) => { 163 failed += 1; 164 continue; 165 } 166 }; 167 168 let rev = commit.rev().to_string(); 169 170 if let Err(e) = sqlx::query!( 171 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 172 rev, 173 repo.user_id 174 ) 175 .execute(db) 176 .await 177 { 178 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 179 failed += 1; 180 } else { 181 success += 1; 182 } 183 } 184 185 info!(success, failed, "Completed repo_rev backfill"); 186} 187 188pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 189 let users_without_blocks = match sqlx::query!( 190 r#" 191 SELECT u.id as user_id, r.repo_root_cid 192 FROM users u 193 JOIN repos r ON r.user_id = u.id 194 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 195 "# 196 ) 197 .fetch_all(db) 198 .await 199 { 200 Ok(rows) => rows, 201 Err(e) => { 202 error!("Failed to query users for user_blocks backfill: {}", e); 203 return; 204 } 205 }; 206 207 if users_without_blocks.is_empty() { 208 debug!("No users need user_blocks backfill"); 209 return; 210 } 211 212 info!( 213 count = users_without_blocks.len(), 214 "Backfilling user_blocks for existing repos" 215 ); 216 217 let mut success = 0; 218 let mut failed = 0; 219 220 for user in users_without_blocks { 221 let root_cid = match Cid::from_str(&user.repo_root_cid) { 222 Ok(c) => c, 223 Err(_) => { 224 failed += 1; 225 continue; 226 } 227 }; 228 229 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 230 let mut to_visit = vec![root_cid]; 231 let mut visited = std::collections::HashSet::new(); 232 233 while let Some(cid) = to_visit.pop() { 234 if visited.contains(&cid) { 235 continue; 236 } 237 visited.insert(cid); 238 block_cids.push(cid.to_bytes()); 239 240 let block = match block_store.get(&cid).await { 241 Ok(Some(b)) => b, 242 _ => continue, 243 }; 244 245 if let Ok(commit) = Commit::from_cbor(&block) { 246 to_visit.push(commit.data); 247 if let Some(prev) = commit.prev { 248 to_visit.push(prev); 249 } 250 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 251 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 252 to_visit.push(*left_cid); 253 } 254 if let Some(Ipld::List(entries)) = obj.get("e") { 255 for entry in entries { 256 if let Ipld::Map(entry_obj) = entry { 257 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 258 to_visit.push(*tree_cid); 259 } 260 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 261 to_visit.push(*val_cid); 262 } 263 } 264 } 265 } 266 } 267 } 268 269 if block_cids.is_empty() { 270 failed += 1; 271 continue; 272 } 273 274 if let Err(e) = sqlx::query!( 275 r#" 276 INSERT INTO user_blocks (user_id, block_cid) 277 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 278 ON CONFLICT (user_id, block_cid) DO NOTHING 279 "#, 280 user.user_id, 281 &block_cids 282 ) 283 .execute(db) 284 .await 285 { 286 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 287 failed += 1; 288 } else { 289 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 290 success += 1; 291 } 292 } 293 294 info!(success, failed, "Completed user_blocks backfill"); 295} 296 297pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { 298 let users_needing_backfill = match sqlx::query!( 299 r#" 300 SELECT DISTINCT u.id as user_id, u.did 301 FROM users u 302 JOIN records r ON r.repo_id = u.id 303 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id) 304 LIMIT 100 305 "# 306 ) 307 .fetch_all(db) 308 .await 309 { 310 Ok(rows) => rows, 311 Err(e) => { 312 error!("Failed to query users for record_blobs backfill: {}", e); 313 return; 314 } 315 }; 316 317 if users_needing_backfill.is_empty() { 318 debug!("No users need record_blobs backfill"); 319 return; 320 } 321 322 info!( 323 count = users_needing_backfill.len(), 324 "Backfilling record_blobs for existing repos" 325 ); 326 327 let mut success = 0; 328 let mut failed = 0; 329 330 for user in users_needing_backfill { 331 let records = match sqlx::query!( 332 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 333 user.user_id 334 ) 335 .fetch_all(db) 336 .await 337 { 338 Ok(r) => r, 339 Err(e) => { 340 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill"); 341 failed += 1; 342 continue; 343 } 344 }; 345 346 let mut blob_refs_found = 0; 347 for record in records { 348 let record_cid = match Cid::from_str(&record.record_cid) { 349 Ok(c) => c, 350 Err(_) => continue, 351 }; 352 353 let block_bytes = match block_store.get(&record_cid).await { 354 Ok(Some(b)) => b, 355 _ => continue, 356 }; 357 358 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { 359 Ok(v) => v, 360 Err(_) => continue, 361 }; 362 363 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 364 for blob_ref in blob_refs { 365 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey); 366 if let Err(e) = sqlx::query!( 367 r#" 368 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 369 VALUES ($1, $2, $3) 370 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 371 "#, 372 user.user_id, 373 record_uri, 374 blob_ref.cid 375 ) 376 .execute(db) 377 .await 378 { 379 warn!(error = %e, "Failed to insert record_blob during backfill"); 380 } else { 381 blob_refs_found += 1; 382 } 383 } 384 } 385 386 if blob_refs_found > 0 { 387 info!( 388 user_id = %user.user_id, 389 did = %user.did, 390 blob_refs = blob_refs_found, 391 "Backfilled record_blobs" 392 ); 393 } 394 success += 1; 395 } 396 397 info!(success, failed, "Completed record_blobs backfill"); 398} 399 400pub async fn start_scheduled_tasks( 401 db: PgPool, 402 blob_store: Arc<dyn BlobStorage>, 403 mut shutdown_rx: watch::Receiver<bool>, 404) { 405 let check_interval = Duration::from_secs( 406 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 407 .ok() 408 .and_then(|s| s.parse().ok()) 409 .unwrap_or(3600), 410 ); 411 412 info!( 413 check_interval_secs = check_interval.as_secs(), 414 "Starting scheduled tasks service" 415 ); 416 417 let mut ticker = interval(check_interval); 418 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 419 420 loop { 421 tokio::select! { 422 _ = shutdown_rx.changed() => { 423 if *shutdown_rx.borrow() { 424 info!("Scheduled tasks service shutting down"); 425 break; 426 } 427 } 428 _ = ticker.tick() => { 429 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await { 430 error!("Error processing scheduled deletions: {}", e); 431 } 432 } 433 } 434 } 435} 436 437async fn process_scheduled_deletions( 438 db: &PgPool, 439 blob_store: &Arc<dyn BlobStorage>, 440) -> Result<(), String> { 441 let accounts_to_delete = sqlx::query!( 442 r#" 443 SELECT did, handle 444 FROM users 445 WHERE delete_after IS NOT NULL 446 AND delete_after < NOW() 447 AND deactivated_at IS NOT NULL 448 LIMIT 100 449 "# 450 ) 451 .fetch_all(db) 452 .await 453 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 454 455 if accounts_to_delete.is_empty() { 456 debug!("No accounts scheduled for deletion"); 457 return Ok(()); 458 } 459 460 info!( 461 count = accounts_to_delete.len(), 462 "Processing scheduled account deletions" 463 ); 464 465 for account in accounts_to_delete { 466 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 467 warn!( 468 did = %account.did, 469 handle = %account.handle, 470 error = %e, 471 "Failed to delete scheduled account" 472 ); 473 } else { 474 info!( 475 did = %account.did, 476 handle = %account.handle, 477 "Successfully deleted scheduled account" 478 ); 479 } 480 } 481 482 Ok(()) 483} 484 485async fn delete_account_data( 486 db: &PgPool, 487 blob_store: &Arc<dyn BlobStorage>, 488 did: &str, 489 _handle: &str, 490) -> Result<(), String> { 491 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 492 .fetch_one(db) 493 .await 494 .map_err(|e| format!("DB error fetching user: {}", e))?; 495 496 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 497 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 498 user_id 499 ) 500 .fetch_all(db) 501 .await 502 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 503 504 for storage_key in &blob_storage_keys { 505 if let Err(e) = blob_store.delete(storage_key).await { 506 warn!( 507 storage_key = %storage_key, 508 error = %e, 509 "Failed to delete blob from storage (continuing anyway)" 510 ); 511 } 512 } 513 514 let mut tx = db 515 .begin() 516 .await 517 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 518 519 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 520 .execute(&mut *tx) 521 .await 522 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 523 524 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 525 .execute(&mut *tx) 526 .await 527 .map_err(|e| format!("Failed to delete user: {}", e))?; 528 529 let account_seq = sqlx::query_scalar!( 530 r#" 531 INSERT INTO repo_seq (did, event_type, active, status) 532 VALUES ($1, 'account', false, 'deleted') 533 RETURNING seq 534 "#, 535 did 536 ) 537 .fetch_one(&mut *tx) 538 .await 539 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 540 541 sqlx::query!( 542 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 543 did, 544 account_seq 545 ) 546 .execute(&mut *tx) 547 .await 548 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 549 550 tx.commit() 551 .await 552 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 553 554 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 555 .execute(db) 556 .await 557 .map_err(|e| format!("Failed to notify: {}", e))?; 558 559 info!( 560 did = %did, 561 blob_count = blob_storage_keys.len(), 562 "Deleted account data including blobs from storage" 563 ); 564 565 Ok(()) 566} 567 568pub async fn start_backup_tasks( 569 db: PgPool, 570 block_store: PostgresBlockStore, 571 backup_storage: Arc<BackupStorage>, 572 mut shutdown_rx: watch::Receiver<bool>, 573) { 574 let backup_interval = Duration::from_secs(BackupStorage::interval_secs()); 575 576 info!( 577 interval_secs = backup_interval.as_secs(), 578 retention_count = BackupStorage::retention_count(), 579 "Starting backup service" 580 ); 581 582 let mut ticker = interval(backup_interval); 583 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 584 585 loop { 586 tokio::select! { 587 _ = shutdown_rx.changed() => { 588 if *shutdown_rx.borrow() { 589 info!("Backup service shutting down"); 590 break; 591 } 592 } 593 _ = ticker.tick() => { 594 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await { 595 error!("Error processing scheduled backups: {}", e); 596 } 597 } 598 } 599 } 600} 601 602async fn process_scheduled_backups( 603 db: &PgPool, 604 block_store: &PostgresBlockStore, 605 backup_storage: &BackupStorage, 606) -> Result<(), String> { 607 let backup_interval_secs = BackupStorage::interval_secs() as i64; 608 let retention_count = BackupStorage::retention_count(); 609 610 let users_needing_backup = sqlx::query!( 611 r#" 612 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev 613 FROM users u 614 JOIN repos r ON r.user_id = u.id 615 WHERE u.backup_enabled = true 616 AND u.deactivated_at IS NULL 617 AND ( 618 NOT EXISTS ( 619 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id 620 ) 621 OR ( 622 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id 623 ) < NOW() - make_interval(secs => $1) 624 ) 625 LIMIT 50 626 "#, 627 backup_interval_secs as f64 628 ) 629 .fetch_all(db) 630 .await 631 .map_err(|e| format!("DB error fetching users for backup: {}", e))?; 632 633 if users_needing_backup.is_empty() { 634 debug!("No accounts need backup"); 635 return Ok(()); 636 } 637 638 info!( 639 count = users_needing_backup.len(), 640 "Processing scheduled backups" 641 ); 642 643 for user in users_needing_backup { 644 let repo_root_cid = user.repo_root_cid.clone(); 645 646 let repo_rev = match &user.repo_rev { 647 Some(rev) => rev.clone(), 648 None => { 649 warn!(did = %user.did, "User has no repo_rev, skipping backup"); 650 continue; 651 } 652 }; 653 654 let head_cid = match Cid::from_str(&repo_root_cid) { 655 Ok(c) => c, 656 Err(e) => { 657 warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup"); 658 continue; 659 } 660 }; 661 662 let car_result = generate_full_backup(block_store, &head_cid).await; 663 let car_bytes = match car_result { 664 Ok(bytes) => bytes, 665 Err(e) => { 666 warn!(did = %user.did, error = %e, "Failed to generate CAR for backup"); 667 continue; 668 } 669 }; 670 671 let block_count = count_car_blocks(&car_bytes); 672 let size_bytes = car_bytes.len() as i64; 673 674 let storage_key = match backup_storage 675 .put_backup(&user.did, &repo_rev, &car_bytes) 676 .await 677 { 678 Ok(key) => key, 679 Err(e) => { 680 warn!(did = %user.did, error = %e, "Failed to upload backup to storage"); 681 continue; 682 } 683 }; 684 685 if let Err(e) = sqlx::query!( 686 r#" 687 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) 688 VALUES ($1, $2, $3, $4, $5, $6) 689 "#, 690 user.user_id, 691 storage_key, 692 repo_root_cid, 693 repo_rev, 694 block_count, 695 size_bytes 696 ) 697 .execute(db) 698 .await 699 { 700 warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload"); 701 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { 702 error!( 703 did = %user.did, 704 storage_key = %storage_key, 705 error = %rollback_err, 706 "Failed to rollback orphaned backup from S3" 707 ); 708 } 709 continue; 710 } 711 712 info!( 713 did = %user.did, 714 rev = %repo_rev, 715 size_bytes, 716 block_count, 717 "Created backup" 718 ); 719 720 if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await 721 { 722 warn!(did = %user.did, error = %e, "Failed to cleanup old backups"); 723 } 724 } 725 726 Ok(()) 727} 728 729pub async fn generate_repo_car( 730 block_store: &PostgresBlockStore, 731 head_cid: &Cid, 732) -> Result<Vec<u8>, String> { 733 use jacquard_repo::storage::BlockStore; 734 use std::io::Write; 735 736 let mut car_bytes = 737 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 738 739 let mut stack = vec![*head_cid]; 740 let mut visited = std::collections::HashSet::new(); 741 742 while let Some(cid) = stack.pop() { 743 if visited.contains(&cid) { 744 continue; 745 } 746 visited.insert(cid); 747 748 if let Ok(Some(block)) = block_store.get(&cid).await { 749 let cid_bytes = cid.to_bytes(); 750 let total_len = cid_bytes.len() + block.len(); 751 let mut writer = Vec::new(); 752 crate::sync::car::write_varint(&mut writer, total_len as u64) 753 .expect("Writing to Vec<u8> should never fail"); 754 writer 755 .write_all(&cid_bytes) 756 .expect("Writing to Vec<u8> should never fail"); 757 writer 758 .write_all(&block) 759 .expect("Writing to Vec<u8> should never fail"); 760 car_bytes.extend_from_slice(&writer); 761 762 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 763 extract_links(&value, &mut stack); 764 } 765 } 766 } 767 768 Ok(car_bytes) 769} 770 771pub async fn generate_full_backup( 772 block_store: &PostgresBlockStore, 773 head_cid: &Cid, 774) -> Result<Vec<u8>, String> { 775 generate_repo_car(block_store, head_cid).await 776} 777 778fn extract_links(value: &Ipld, stack: &mut Vec<Cid>) { 779 match value { 780 Ipld::Link(cid) => { 781 stack.push(*cid); 782 } 783 Ipld::Map(map) => { 784 for v in map.values() { 785 extract_links(v, stack); 786 } 787 } 788 Ipld::List(arr) => { 789 for v in arr { 790 extract_links(v, stack); 791 } 792 } 793 _ => {} 794 } 795} 796 797pub fn count_car_blocks(car_bytes: &[u8]) -> i32 { 798 let mut count = 0; 799 let mut pos = 0; 800 801 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) { 802 pos += header_varint_len + header_len as usize; 803 } else { 804 return 0; 805 } 806 807 while pos < car_bytes.len() { 808 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) { 809 pos += varint_len + block_len as usize; 810 count += 1; 811 } else { 812 break; 813 } 814 } 815 816 count 817} 818 819fn read_varint(data: &[u8]) -> Option<(u64, usize)> { 820 let mut value: u64 = 0; 821 let mut shift = 0; 822 let mut pos = 0; 823 824 while pos < data.len() && pos < 10 { 825 let byte = data[pos]; 826 value |= ((byte & 0x7f) as u64) << shift; 827 pos += 1; 828 if byte & 0x80 == 0 { 829 return Some((value, pos)); 830 } 831 shift += 7; 832 } 833 834 None 835} 836 837async fn cleanup_old_backups( 838 db: &PgPool, 839 backup_storage: &BackupStorage, 840 user_id: uuid::Uuid, 841 retention_count: u32, 842) -> Result<(), String> { 843 let old_backups = sqlx::query!( 844 r#" 845 SELECT id, storage_key 846 FROM account_backups 847 WHERE user_id = $1 848 ORDER BY created_at DESC 849 OFFSET $2 850 "#, 851 user_id, 852 retention_count as i64 853 ) 854 .fetch_all(db) 855 .await 856 .map_err(|e| format!("DB error fetching old backups: {}", e))?; 857 858 for backup in old_backups { 859 if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await { 860 warn!( 861 storage_key = %backup.storage_key, 862 error = %e, 863 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" 864 ); 865 continue; 866 } 867 868 sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id) 869 .execute(db) 870 .await 871 .map_err(|e| format!("Failed to delete old backup record: {}", e))?; 872 } 873 874 Ok(()) 875}