this repo has no description
1use cid::Cid; 2use ipld_core::ipld::Ipld; 3use jacquard_repo::commit::Commit; 4use jacquard_repo::storage::BlockStore; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::{BackupStorage, BlobStorage}; 15use crate::sync::car::encode_car_header; 16 17pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 18 let broken_genesis_commits = match sqlx::query!( 19 r#" 20 SELECT seq, did, commit_cid 21 FROM repo_seq 22 WHERE event_type = 'commit' 23 AND prev_cid IS NULL 24 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 25 "# 26 ) 27 .fetch_all(db) 28 .await 29 { 30 Ok(rows) => rows, 31 Err(e) => { 32 error!("Failed to query repo_seq for genesis commit backfill: {}", e); 33 return; 34 } 35 }; 36 37 if broken_genesis_commits.is_empty() { 38 debug!("No genesis commits need blocks_cids backfill"); 39 return; 40 } 41 42 info!( 43 count = broken_genesis_commits.len(), 44 "Backfilling blocks_cids for genesis commits" 45 ); 46 47 let mut success = 0; 48 let mut failed = 0; 49 50 for commit_row in broken_genesis_commits { 51 let commit_cid_str = match &commit_row.commit_cid { 52 Some(c) => c.clone(), 53 None => { 54 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 55 failed += 1; 56 continue; 57 } 58 }; 59 60 let commit_cid = match Cid::from_str(&commit_cid_str) { 61 Ok(c) => c, 62 Err(_) => { 63 warn!(seq = commit_row.seq, "Invalid commit CID"); 64 failed += 1; 65 continue; 66 } 67 }; 68 69 let block = match block_store.get(&commit_cid).await { 70 Ok(Some(b)) => b, 71 Ok(None) => { 72 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 73 failed += 1; 74 continue; 75 } 76 Err(e) => { 77 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 78 failed += 1; 79 continue; 80 } 81 }; 82 83 let commit = match Commit::from_cbor(&block) { 84 Ok(c) => c, 85 Err(e) => { 86 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 87 failed += 1; 88 continue; 89 } 90 }; 91 92 let mst_root_cid = commit.data; 93 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 94 95 if let Err(e) = sqlx::query!( 96 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 97 &blocks_cids, 98 commit_row.seq 99 ) 100 .execute(db) 101 .await 102 { 103 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 104 failed += 1; 105 } else { 106 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 107 success += 1; 108 } 109 } 110 111 info!( 112 success, 113 failed, "Completed genesis commit blocks_cids backfill" 114 ); 115} 116 117pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 118 let repos_missing_rev = 119 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL") 120 .fetch_all(db) 121 .await 122 { 123 Ok(rows) => rows, 124 Err(e) => { 125 error!("Failed to query repos for backfill: {}", e); 126 return; 127 } 128 }; 129 130 if repos_missing_rev.is_empty() { 131 debug!("No repos need repo_rev backfill"); 132 return; 133 } 134 135 info!( 136 count = repos_missing_rev.len(), 137 "Backfilling repo_rev for existing repos" 138 ); 139 140 let mut success = 0; 141 let mut failed = 0; 142 143 for repo in repos_missing_rev { 144 let cid = match Cid::from_str(&repo.repo_root_cid) { 145 Ok(c) => c, 146 Err(_) => { 147 failed += 1; 148 continue; 149 } 150 }; 151 152 let block = match block_store.get(&cid).await { 153 Ok(Some(b)) => b, 154 _ => { 155 failed += 1; 156 continue; 157 } 158 }; 159 160 let commit = match Commit::from_cbor(&block) { 161 Ok(c) => c, 162 Err(_) => { 163 failed += 1; 164 continue; 165 } 166 }; 167 168 let rev = commit.rev().to_string(); 169 170 if let Err(e) = sqlx::query!( 171 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 172 rev, 173 repo.user_id 174 ) 175 .execute(db) 176 .await 177 { 178 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 179 failed += 1; 180 } else { 181 success += 1; 182 } 183 } 184 185 info!(success, failed, "Completed repo_rev backfill"); 186} 187 188pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 189 let users_without_blocks = match sqlx::query!( 190 r#" 191 SELECT u.id as user_id, r.repo_root_cid 192 FROM users u 193 JOIN repos r ON r.user_id = u.id 194 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 195 "# 196 ) 197 .fetch_all(db) 198 .await 199 { 200 Ok(rows) => rows, 201 Err(e) => { 202 error!("Failed to query users for user_blocks backfill: {}", e); 203 return; 204 } 205 }; 206 207 if users_without_blocks.is_empty() { 208 debug!("No users need user_blocks backfill"); 209 return; 210 } 211 212 info!( 213 count = users_without_blocks.len(), 214 "Backfilling user_blocks for existing repos" 215 ); 216 217 let mut success = 0; 218 let mut failed = 0; 219 220 for user in users_without_blocks { 221 let root_cid = match Cid::from_str(&user.repo_root_cid) { 222 Ok(c) => c, 223 Err(_) => { 224 failed += 1; 225 continue; 226 } 227 }; 228 229 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 230 let mut to_visit = vec![root_cid]; 231 let mut visited = std::collections::HashSet::new(); 232 233 while let Some(cid) = to_visit.pop() { 234 if visited.contains(&cid) { 235 continue; 236 } 237 visited.insert(cid); 238 block_cids.push(cid.to_bytes()); 239 240 let block = match block_store.get(&cid).await { 241 Ok(Some(b)) => b, 242 _ => continue, 243 }; 244 245 if let Ok(commit) = Commit::from_cbor(&block) { 246 to_visit.push(commit.data); 247 if let Some(prev) = commit.prev { 248 to_visit.push(prev); 249 } 250 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 251 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 252 to_visit.push(*left_cid); 253 } 254 if let Some(Ipld::List(entries)) = obj.get("e") { 255 for entry in entries { 256 if let Ipld::Map(entry_obj) = entry { 257 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 258 to_visit.push(*tree_cid); 259 } 260 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 261 to_visit.push(*val_cid); 262 } 263 } 264 } 265 } 266 } 267 } 268 269 if block_cids.is_empty() { 270 failed += 1; 271 continue; 272 } 273 274 if let Err(e) = sqlx::query!( 275 r#" 276 INSERT INTO user_blocks (user_id, block_cid) 277 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 278 ON CONFLICT (user_id, block_cid) DO NOTHING 279 "#, 280 user.user_id, 281 &block_cids 282 ) 283 .execute(db) 284 .await 285 { 286 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 287 failed += 1; 288 } else { 289 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 290 success += 1; 291 } 292 } 293 294 info!(success, failed, "Completed user_blocks backfill"); 295} 296 297pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { 298 let users_needing_backfill = match sqlx::query!( 299 r#" 300 SELECT DISTINCT u.id as user_id, u.did 301 FROM users u 302 JOIN records r ON r.repo_id = u.id 303 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id) 304 LIMIT 100 305 "# 306 ) 307 .fetch_all(db) 308 .await 309 { 310 Ok(rows) => rows, 311 Err(e) => { 312 error!("Failed to query users for record_blobs backfill: {}", e); 313 return; 314 } 315 }; 316 317 if users_needing_backfill.is_empty() { 318 debug!("No users need record_blobs backfill"); 319 return; 320 } 321 322 info!( 323 count = users_needing_backfill.len(), 324 "Backfilling record_blobs for existing repos" 325 ); 326 327 let mut success = 0; 328 let mut failed = 0; 329 330 for user in users_needing_backfill { 331 let records = match sqlx::query!( 332 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 333 user.user_id 334 ) 335 .fetch_all(db) 336 .await 337 { 338 Ok(r) => r, 339 Err(e) => { 340 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill"); 341 failed += 1; 342 continue; 343 } 344 }; 345 346 let mut batch_record_uris: Vec<String> = Vec::new(); 347 let mut batch_blob_cids: Vec<String> = Vec::new(); 348 349 for record in records { 350 let record_cid = match Cid::from_str(&record.record_cid) { 351 Ok(c) => c, 352 Err(_) => continue, 353 }; 354 355 let block_bytes = match block_store.get(&record_cid).await { 356 Ok(Some(b)) => b, 357 _ => continue, 358 }; 359 360 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { 361 Ok(v) => v, 362 Err(_) => continue, 363 }; 364 365 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 366 for blob_ref in blob_refs { 367 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey); 368 batch_record_uris.push(record_uri); 369 batch_blob_cids.push(blob_ref.cid); 370 } 371 } 372 373 let blob_refs_found = batch_record_uris.len(); 374 if !batch_record_uris.is_empty() { 375 if let Err(e) = sqlx::query!( 376 r#" 377 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 378 SELECT $1, record_uri, blob_cid 379 FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid) 380 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 381 "#, 382 user.user_id, 383 &batch_record_uris, 384 &batch_blob_cids 385 ) 386 .execute(db) 387 .await 388 { 389 warn!(error = %e, "Failed to batch insert record_blobs during backfill"); 390 } else { 391 info!( 392 user_id = %user.user_id, 393 did = %user.did, 394 blob_refs = blob_refs_found, 395 "Backfilled record_blobs" 396 ); 397 } 398 } 399 success += 1; 400 } 401 402 info!(success, failed, "Completed record_blobs backfill"); 403} 404 405pub async fn start_scheduled_tasks( 406 db: PgPool, 407 blob_store: Arc<dyn BlobStorage>, 408 mut shutdown_rx: watch::Receiver<bool>, 409) { 410 let check_interval = Duration::from_secs( 411 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 412 .ok() 413 .and_then(|s| s.parse().ok()) 414 .unwrap_or(3600), 415 ); 416 417 info!( 418 check_interval_secs = check_interval.as_secs(), 419 "Starting scheduled tasks service" 420 ); 421 422 let mut ticker = interval(check_interval); 423 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 424 425 loop { 426 tokio::select! { 427 _ = shutdown_rx.changed() => { 428 if *shutdown_rx.borrow() { 429 info!("Scheduled tasks service shutting down"); 430 break; 431 } 432 } 433 _ = ticker.tick() => { 434 if let Err(e) = process_scheduled_deletions(&db, blob_store.as_ref()).await { 435 error!("Error processing scheduled deletions: {}", e); 436 } 437 } 438 } 439 } 440} 441 442async fn process_scheduled_deletions( 443 db: &PgPool, 444 blob_store: &dyn BlobStorage, 445) -> Result<(), String> { 446 let accounts_to_delete = sqlx::query!( 447 r#" 448 SELECT did, handle 449 FROM users 450 WHERE delete_after IS NOT NULL 451 AND delete_after < NOW() 452 AND deactivated_at IS NOT NULL 453 LIMIT 100 454 "# 455 ) 456 .fetch_all(db) 457 .await 458 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 459 460 if accounts_to_delete.is_empty() { 461 debug!("No accounts scheduled for deletion"); 462 return Ok(()); 463 } 464 465 info!( 466 count = accounts_to_delete.len(), 467 "Processing scheduled account deletions" 468 ); 469 470 for account in accounts_to_delete { 471 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 472 warn!( 473 did = %account.did, 474 handle = %account.handle, 475 error = %e, 476 "Failed to delete scheduled account" 477 ); 478 } else { 479 info!( 480 did = %account.did, 481 handle = %account.handle, 482 "Successfully deleted scheduled account" 483 ); 484 } 485 } 486 487 Ok(()) 488} 489 490async fn delete_account_data( 491 db: &PgPool, 492 blob_store: &dyn BlobStorage, 493 did: &str, 494 _handle: &str, 495) -> Result<(), String> { 496 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 497 .fetch_one(db) 498 .await 499 .map_err(|e| format!("DB error fetching user: {}", e))?; 500 501 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 502 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 503 user_id 504 ) 505 .fetch_all(db) 506 .await 507 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 508 509 for storage_key in &blob_storage_keys { 510 if let Err(e) = blob_store.delete(storage_key).await { 511 warn!( 512 storage_key = %storage_key, 513 error = %e, 514 "Failed to delete blob from storage (continuing anyway)" 515 ); 516 } 517 } 518 519 let mut tx = db 520 .begin() 521 .await 522 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 523 524 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 525 .execute(&mut *tx) 526 .await 527 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 528 529 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 530 .execute(&mut *tx) 531 .await 532 .map_err(|e| format!("Failed to delete user: {}", e))?; 533 534 let account_seq = sqlx::query_scalar!( 535 r#" 536 INSERT INTO repo_seq (did, event_type, active, status) 537 VALUES ($1, 'account', false, 'deleted') 538 RETURNING seq 539 "#, 540 did 541 ) 542 .fetch_one(&mut *tx) 543 .await 544 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 545 546 sqlx::query!( 547 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 548 did, 549 account_seq 550 ) 551 .execute(&mut *tx) 552 .await 553 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 554 555 tx.commit() 556 .await 557 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 558 559 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 560 .execute(db) 561 .await 562 .map_err(|e| format!("Failed to notify: {}", e))?; 563 564 info!( 565 did = %did, 566 blob_count = blob_storage_keys.len(), 567 "Deleted account data including blobs from storage" 568 ); 569 570 Ok(()) 571} 572 573pub async fn start_backup_tasks( 574 db: PgPool, 575 block_store: PostgresBlockStore, 576 backup_storage: Arc<BackupStorage>, 577 mut shutdown_rx: watch::Receiver<bool>, 578) { 579 let backup_interval = Duration::from_secs(BackupStorage::interval_secs()); 580 581 info!( 582 interval_secs = backup_interval.as_secs(), 583 retention_count = BackupStorage::retention_count(), 584 "Starting backup service" 585 ); 586 587 let mut ticker = interval(backup_interval); 588 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 589 590 loop { 591 tokio::select! { 592 _ = shutdown_rx.changed() => { 593 if *shutdown_rx.borrow() { 594 info!("Backup service shutting down"); 595 break; 596 } 597 } 598 _ = ticker.tick() => { 599 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await { 600 error!("Error processing scheduled backups: {}", e); 601 } 602 } 603 } 604 } 605} 606 607async fn process_scheduled_backups( 608 db: &PgPool, 609 block_store: &PostgresBlockStore, 610 backup_storage: &BackupStorage, 611) -> Result<(), String> { 612 let backup_interval_secs = BackupStorage::interval_secs() as i64; 613 let retention_count = BackupStorage::retention_count(); 614 615 let users_needing_backup = sqlx::query!( 616 r#" 617 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev 618 FROM users u 619 JOIN repos r ON r.user_id = u.id 620 WHERE u.backup_enabled = true 621 AND u.deactivated_at IS NULL 622 AND ( 623 NOT EXISTS ( 624 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id 625 ) 626 OR ( 627 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id 628 ) < NOW() - make_interval(secs => $1) 629 ) 630 LIMIT 50 631 "#, 632 backup_interval_secs as f64 633 ) 634 .fetch_all(db) 635 .await 636 .map_err(|e| format!("DB error fetching users for backup: {}", e))?; 637 638 if users_needing_backup.is_empty() { 639 debug!("No accounts need backup"); 640 return Ok(()); 641 } 642 643 info!( 644 count = users_needing_backup.len(), 645 "Processing scheduled backups" 646 ); 647 648 for user in users_needing_backup { 649 let repo_root_cid = user.repo_root_cid.clone(); 650 651 let repo_rev = match &user.repo_rev { 652 Some(rev) => rev.clone(), 653 None => { 654 warn!(did = %user.did, "User has no repo_rev, skipping backup"); 655 continue; 656 } 657 }; 658 659 let head_cid = match Cid::from_str(&repo_root_cid) { 660 Ok(c) => c, 661 Err(e) => { 662 warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup"); 663 continue; 664 } 665 }; 666 667 let car_result = generate_full_backup(block_store, &head_cid).await; 668 let car_bytes = match car_result { 669 Ok(bytes) => bytes, 670 Err(e) => { 671 warn!(did = %user.did, error = %e, "Failed to generate CAR for backup"); 672 continue; 673 } 674 }; 675 676 let block_count = count_car_blocks(&car_bytes); 677 let size_bytes = car_bytes.len() as i64; 678 679 let storage_key = match backup_storage 680 .put_backup(&user.did, &repo_rev, &car_bytes) 681 .await 682 { 683 Ok(key) => key, 684 Err(e) => { 685 warn!(did = %user.did, error = %e, "Failed to upload backup to storage"); 686 continue; 687 } 688 }; 689 690 if let Err(e) = sqlx::query!( 691 r#" 692 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) 693 VALUES ($1, $2, $3, $4, $5, $6) 694 "#, 695 user.user_id, 696 storage_key, 697 repo_root_cid, 698 repo_rev, 699 block_count, 700 size_bytes 701 ) 702 .execute(db) 703 .await 704 { 705 warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload"); 706 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { 707 error!( 708 did = %user.did, 709 storage_key = %storage_key, 710 error = %rollback_err, 711 "Failed to rollback orphaned backup from S3" 712 ); 713 } 714 continue; 715 } 716 717 info!( 718 did = %user.did, 719 rev = %repo_rev, 720 size_bytes, 721 block_count, 722 "Created backup" 723 ); 724 725 if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await 726 { 727 warn!(did = %user.did, error = %e, "Failed to cleanup old backups"); 728 } 729 } 730 731 Ok(()) 732} 733 734pub async fn generate_repo_car( 735 block_store: &PostgresBlockStore, 736 head_cid: &Cid, 737) -> Result<Vec<u8>, String> { 738 use jacquard_repo::storage::BlockStore; 739 use std::io::Write; 740 741 let mut car_bytes = 742 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 743 744 let mut stack = vec![*head_cid]; 745 let mut visited = std::collections::HashSet::new(); 746 747 while let Some(cid) = stack.pop() { 748 if visited.contains(&cid) { 749 continue; 750 } 751 visited.insert(cid); 752 753 if let Ok(Some(block)) = block_store.get(&cid).await { 754 let cid_bytes = cid.to_bytes(); 755 let total_len = cid_bytes.len() + block.len(); 756 let mut writer = Vec::new(); 757 crate::sync::car::write_varint(&mut writer, total_len as u64) 758 .expect("Writing to Vec<u8> should never fail"); 759 writer 760 .write_all(&cid_bytes) 761 .expect("Writing to Vec<u8> should never fail"); 762 writer 763 .write_all(&block) 764 .expect("Writing to Vec<u8> should never fail"); 765 car_bytes.extend_from_slice(&writer); 766 767 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 768 extract_links(&value, &mut stack); 769 } 770 } 771 } 772 773 Ok(car_bytes) 774} 775 776pub async fn generate_full_backup( 777 block_store: &PostgresBlockStore, 778 head_cid: &Cid, 779) -> Result<Vec<u8>, String> { 780 generate_repo_car(block_store, head_cid).await 781} 782 783fn extract_links(value: &Ipld, stack: &mut Vec<Cid>) { 784 match value { 785 Ipld::Link(cid) => { 786 stack.push(*cid); 787 } 788 Ipld::Map(map) => { 789 for v in map.values() { 790 extract_links(v, stack); 791 } 792 } 793 Ipld::List(arr) => { 794 for v in arr { 795 extract_links(v, stack); 796 } 797 } 798 _ => {} 799 } 800} 801 802pub fn count_car_blocks(car_bytes: &[u8]) -> i32 { 803 let mut count = 0; 804 let mut pos = 0; 805 806 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) { 807 pos += header_varint_len + header_len as usize; 808 } else { 809 return 0; 810 } 811 812 while pos < car_bytes.len() { 813 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) { 814 pos += varint_len + block_len as usize; 815 count += 1; 816 } else { 817 break; 818 } 819 } 820 821 count 822} 823 824fn read_varint(data: &[u8]) -> Option<(u64, usize)> { 825 let mut value: u64 = 0; 826 let mut shift = 0; 827 let mut pos = 0; 828 829 while pos < data.len() && pos < 10 { 830 let byte = data[pos]; 831 value |= ((byte & 0x7f) as u64) << shift; 832 pos += 1; 833 if byte & 0x80 == 0 { 834 return Some((value, pos)); 835 } 836 shift += 7; 837 } 838 839 None 840} 841 842async fn cleanup_old_backups( 843 db: &PgPool, 844 backup_storage: &BackupStorage, 845 user_id: uuid::Uuid, 846 retention_count: u32, 847) -> Result<(), String> { 848 let old_backups = sqlx::query!( 849 r#" 850 SELECT id, storage_key 851 FROM account_backups 852 WHERE user_id = $1 853 ORDER BY created_at DESC 854 OFFSET $2 855 "#, 856 user_id, 857 retention_count as i64 858 ) 859 .fetch_all(db) 860 .await 861 .map_err(|e| format!("DB error fetching old backups: {}", e))?; 862 863 for backup in old_backups { 864 if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await { 865 warn!( 866 storage_key = %backup.storage_key, 867 error = %e, 868 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" 869 ); 870 continue; 871 } 872 873 sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id) 874 .execute(db) 875 .await 876 .map_err(|e| format!("Failed to delete old backup record: {}", e))?; 877 } 878 879 Ok(()) 880}