this repo has no description
1use cid::Cid; 2use ipld_core::ipld::Ipld; 3use jacquard_repo::commit::Commit; 4use jacquard_repo::storage::BlockStore; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::{BackupStorage, BlobStorage}; 15use crate::sync::car::encode_car_header; 16 17pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 18 let broken_genesis_commits = match sqlx::query!( 19 r#" 20 SELECT seq, did, commit_cid 21 FROM repo_seq 22 WHERE event_type = 'commit' 23 AND prev_cid IS NULL 24 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 25 "# 26 ) 27 .fetch_all(db) 28 .await 29 { 30 Ok(rows) => rows, 31 Err(e) => { 32 error!("Failed to query repo_seq for genesis commit backfill: {}", e); 33 return; 34 } 35 }; 36 37 if broken_genesis_commits.is_empty() { 38 debug!("No genesis commits need blocks_cids backfill"); 39 return; 40 } 41 42 info!( 43 count = broken_genesis_commits.len(), 44 "Backfilling blocks_cids for genesis commits" 45 ); 46 47 let mut success = 0; 48 let mut failed = 0; 49 50 for commit_row in broken_genesis_commits { 51 let commit_cid_str = match &commit_row.commit_cid { 52 Some(c) => c.clone(), 53 None => { 54 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 55 failed += 1; 56 continue; 57 } 58 }; 59 60 let commit_cid = match Cid::from_str(&commit_cid_str) { 61 Ok(c) => c, 62 Err(_) => { 63 warn!(seq = commit_row.seq, "Invalid commit CID"); 64 failed += 1; 65 continue; 66 } 67 }; 68 69 let block = match block_store.get(&commit_cid).await { 70 Ok(Some(b)) => b, 71 Ok(None) => { 72 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 73 failed += 1; 74 continue; 75 } 76 Err(e) => { 77 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 78 failed += 1; 79 continue; 80 } 81 }; 82 83 let commit = match Commit::from_cbor(&block) { 84 Ok(c) => c, 85 Err(e) => { 86 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 87 failed += 1; 88 continue; 89 } 90 }; 91 92 let mst_root_cid = commit.data; 93 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 94 95 if let Err(e) = sqlx::query!( 96 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 97 &blocks_cids, 98 commit_row.seq 99 ) 100 .execute(db) 101 .await 102 { 103 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 104 failed += 1; 105 } else { 106 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 107 success += 1; 108 } 109 } 110 111 info!( 112 success, 113 failed, "Completed genesis commit blocks_cids backfill" 114 ); 115} 116 117pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 118 let repos_missing_rev = 119 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL") 120 .fetch_all(db) 121 .await 122 { 123 Ok(rows) => rows, 124 Err(e) => { 125 error!("Failed to query repos for backfill: {}", e); 126 return; 127 } 128 }; 129 130 if repos_missing_rev.is_empty() { 131 debug!("No repos need repo_rev backfill"); 132 return; 133 } 134 135 info!( 136 count = repos_missing_rev.len(), 137 "Backfilling repo_rev for existing repos" 138 ); 139 140 let mut success = 0; 141 let mut failed = 0; 142 143 for repo in repos_missing_rev { 144 let cid = match Cid::from_str(&repo.repo_root_cid) { 145 Ok(c) => c, 146 Err(_) => { 147 failed += 1; 148 continue; 149 } 150 }; 151 152 let block = match block_store.get(&cid).await { 153 Ok(Some(b)) => b, 154 _ => { 155 failed += 1; 156 continue; 157 } 158 }; 159 160 let commit = match Commit::from_cbor(&block) { 161 Ok(c) => c, 162 Err(_) => { 163 failed += 1; 164 continue; 165 } 166 }; 167 168 let rev = commit.rev().to_string(); 169 170 if let Err(e) = sqlx::query!( 171 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 172 rev, 173 repo.user_id 174 ) 175 .execute(db) 176 .await 177 { 178 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 179 failed += 1; 180 } else { 181 success += 1; 182 } 183 } 184 185 info!(success, failed, "Completed repo_rev backfill"); 186} 187 188pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 189 let users_without_blocks = match sqlx::query!( 190 r#" 191 SELECT u.id as user_id, r.repo_root_cid 192 FROM users u 193 JOIN repos r ON r.user_id = u.id 194 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 195 "# 196 ) 197 .fetch_all(db) 198 .await 199 { 200 Ok(rows) => rows, 201 Err(e) => { 202 error!("Failed to query users for user_blocks backfill: {}", e); 203 return; 204 } 205 }; 206 207 if users_without_blocks.is_empty() { 208 debug!("No users need user_blocks backfill"); 209 return; 210 } 211 212 info!( 213 count = users_without_blocks.len(), 214 "Backfilling user_blocks for existing repos" 215 ); 216 217 let mut success = 0; 218 let mut failed = 0; 219 220 for user in users_without_blocks { 221 let root_cid = match Cid::from_str(&user.repo_root_cid) { 222 Ok(c) => c, 223 Err(_) => { 224 failed += 1; 225 continue; 226 } 227 }; 228 229 match collect_current_repo_blocks(&block_store, &root_cid).await { 230 Ok(block_cids) => { 231 if block_cids.is_empty() { 232 failed += 1; 233 continue; 234 } 235 236 if let Err(e) = sqlx::query!( 237 r#" 238 INSERT INTO user_blocks (user_id, block_cid) 239 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 240 ON CONFLICT (user_id, block_cid) DO NOTHING 241 "#, 242 user.user_id, 243 &block_cids 244 ) 245 .execute(db) 246 .await 247 { 248 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 249 failed += 1; 250 } else { 251 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 252 success += 1; 253 } 254 } 255 Err(e) => { 256 warn!(user_id = %user.user_id, error = %e, "Failed to collect repo blocks for backfill"); 257 failed += 1; 258 } 259 } 260 } 261 262 info!(success, failed, "Completed user_blocks backfill"); 263} 264 265pub async fn collect_current_repo_blocks( 266 block_store: &PostgresBlockStore, 267 head_cid: &Cid, 268) -> Result<Vec<Vec<u8>>, String> { 269 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 270 let mut to_visit = vec![*head_cid]; 271 let mut visited = std::collections::HashSet::new(); 272 273 while let Some(cid) = to_visit.pop() { 274 if visited.contains(&cid) { 275 continue; 276 } 277 visited.insert(cid); 278 block_cids.push(cid.to_bytes()); 279 280 let block = match block_store.get(&cid).await { 281 Ok(Some(b)) => b, 282 Ok(None) => continue, 283 Err(e) => return Err(format!("Failed to get block {}: {:?}", cid, e)), 284 }; 285 286 if let Ok(commit) = Commit::from_cbor(&block) { 287 to_visit.push(commit.data); 288 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 289 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 290 to_visit.push(*left_cid); 291 } 292 if let Some(Ipld::List(entries)) = obj.get("e") { 293 to_visit.extend( 294 entries 295 .iter() 296 .filter_map(|entry| match entry { 297 Ipld::Map(entry_obj) => Some(entry_obj), 298 _ => None, 299 }) 300 .flat_map(|entry_obj| { 301 [entry_obj.get("t"), entry_obj.get("v")] 302 .into_iter() 303 .flatten() 304 .filter_map(|v| match v { 305 Ipld::Link(cid) => Some(*cid), 306 _ => None, 307 }) 308 }), 309 ); 310 } 311 } 312 } 313 314 Ok(block_cids) 315} 316 317pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { 318 let users_needing_backfill = match sqlx::query!( 319 r#" 320 SELECT DISTINCT u.id as user_id, u.did 321 FROM users u 322 JOIN records r ON r.repo_id = u.id 323 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id) 324 LIMIT 100 325 "# 326 ) 327 .fetch_all(db) 328 .await 329 { 330 Ok(rows) => rows, 331 Err(e) => { 332 error!("Failed to query users for record_blobs backfill: {}", e); 333 return; 334 } 335 }; 336 337 if users_needing_backfill.is_empty() { 338 debug!("No users need record_blobs backfill"); 339 return; 340 } 341 342 info!( 343 count = users_needing_backfill.len(), 344 "Backfilling record_blobs for existing repos" 345 ); 346 347 let mut success = 0; 348 let mut failed = 0; 349 350 for user in users_needing_backfill { 351 let records = match sqlx::query!( 352 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 353 user.user_id 354 ) 355 .fetch_all(db) 356 .await 357 { 358 Ok(r) => r, 359 Err(e) => { 360 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill"); 361 failed += 1; 362 continue; 363 } 364 }; 365 366 let mut batch_record_uris: Vec<String> = Vec::new(); 367 let mut batch_blob_cids: Vec<String> = Vec::new(); 368 369 for record in records { 370 let record_cid = match Cid::from_str(&record.record_cid) { 371 Ok(c) => c, 372 Err(_) => continue, 373 }; 374 375 let block_bytes = match block_store.get(&record_cid).await { 376 Ok(Some(b)) => b, 377 _ => continue, 378 }; 379 380 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { 381 Ok(v) => v, 382 Err(_) => continue, 383 }; 384 385 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 386 for blob_ref in blob_refs { 387 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey); 388 batch_record_uris.push(record_uri); 389 batch_blob_cids.push(blob_ref.cid); 390 } 391 } 392 393 let blob_refs_found = batch_record_uris.len(); 394 if !batch_record_uris.is_empty() { 395 if let Err(e) = sqlx::query!( 396 r#" 397 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 398 SELECT $1, record_uri, blob_cid 399 FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid) 400 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 401 "#, 402 user.user_id, 403 &batch_record_uris, 404 &batch_blob_cids 405 ) 406 .execute(db) 407 .await 408 { 409 warn!(error = %e, "Failed to batch insert record_blobs during backfill"); 410 } else { 411 info!( 412 user_id = %user.user_id, 413 did = %user.did, 414 blob_refs = blob_refs_found, 415 "Backfilled record_blobs" 416 ); 417 } 418 } 419 success += 1; 420 } 421 422 info!(success, failed, "Completed record_blobs backfill"); 423} 424 425pub async fn start_scheduled_tasks( 426 db: PgPool, 427 blob_store: Arc<dyn BlobStorage>, 428 mut shutdown_rx: watch::Receiver<bool>, 429) { 430 let check_interval = Duration::from_secs( 431 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 432 .ok() 433 .and_then(|s| s.parse().ok()) 434 .unwrap_or(3600), 435 ); 436 437 info!( 438 check_interval_secs = check_interval.as_secs(), 439 "Starting scheduled tasks service" 440 ); 441 442 let mut ticker = interval(check_interval); 443 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 444 445 loop { 446 tokio::select! { 447 _ = shutdown_rx.changed() => { 448 if *shutdown_rx.borrow() { 449 info!("Scheduled tasks service shutting down"); 450 break; 451 } 452 } 453 _ = ticker.tick() => { 454 if let Err(e) = process_scheduled_deletions(&db, blob_store.as_ref()).await { 455 error!("Error processing scheduled deletions: {}", e); 456 } 457 } 458 } 459 } 460} 461 462async fn process_scheduled_deletions( 463 db: &PgPool, 464 blob_store: &dyn BlobStorage, 465) -> Result<(), String> { 466 let accounts_to_delete = sqlx::query!( 467 r#" 468 SELECT did, handle 469 FROM users 470 WHERE delete_after IS NOT NULL 471 AND delete_after < NOW() 472 AND deactivated_at IS NOT NULL 473 LIMIT 100 474 "# 475 ) 476 .fetch_all(db) 477 .await 478 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 479 480 if accounts_to_delete.is_empty() { 481 debug!("No accounts scheduled for deletion"); 482 return Ok(()); 483 } 484 485 info!( 486 count = accounts_to_delete.len(), 487 "Processing scheduled account deletions" 488 ); 489 490 for account in accounts_to_delete { 491 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 492 warn!( 493 did = %account.did, 494 handle = %account.handle, 495 error = %e, 496 "Failed to delete scheduled account" 497 ); 498 } else { 499 info!( 500 did = %account.did, 501 handle = %account.handle, 502 "Successfully deleted scheduled account" 503 ); 504 } 505 } 506 507 Ok(()) 508} 509 510async fn delete_account_data( 511 db: &PgPool, 512 blob_store: &dyn BlobStorage, 513 did: &str, 514 _handle: &str, 515) -> Result<(), String> { 516 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 517 .fetch_one(db) 518 .await 519 .map_err(|e| format!("DB error fetching user: {}", e))?; 520 521 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 522 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 523 user_id 524 ) 525 .fetch_all(db) 526 .await 527 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 528 529 for storage_key in &blob_storage_keys { 530 if let Err(e) = blob_store.delete(storage_key).await { 531 warn!( 532 storage_key = %storage_key, 533 error = %e, 534 "Failed to delete blob from storage (continuing anyway)" 535 ); 536 } 537 } 538 539 let mut tx = db 540 .begin() 541 .await 542 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 543 544 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 545 .execute(&mut *tx) 546 .await 547 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 548 549 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 550 .execute(&mut *tx) 551 .await 552 .map_err(|e| format!("Failed to delete user: {}", e))?; 553 554 let account_seq = sqlx::query_scalar!( 555 r#" 556 INSERT INTO repo_seq (did, event_type, active, status) 557 VALUES ($1, 'account', false, 'deleted') 558 RETURNING seq 559 "#, 560 did 561 ) 562 .fetch_one(&mut *tx) 563 .await 564 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 565 566 sqlx::query!( 567 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 568 did, 569 account_seq 570 ) 571 .execute(&mut *tx) 572 .await 573 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 574 575 tx.commit() 576 .await 577 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 578 579 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 580 .execute(db) 581 .await 582 .map_err(|e| format!("Failed to notify: {}", e))?; 583 584 info!( 585 did = %did, 586 blob_count = blob_storage_keys.len(), 587 "Deleted account data including blobs from storage" 588 ); 589 590 Ok(()) 591} 592 593pub async fn start_backup_tasks( 594 db: PgPool, 595 block_store: PostgresBlockStore, 596 backup_storage: Arc<BackupStorage>, 597 mut shutdown_rx: watch::Receiver<bool>, 598) { 599 let backup_interval = Duration::from_secs(BackupStorage::interval_secs()); 600 601 info!( 602 interval_secs = backup_interval.as_secs(), 603 retention_count = BackupStorage::retention_count(), 604 "Starting backup service" 605 ); 606 607 let mut ticker = interval(backup_interval); 608 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 609 610 loop { 611 tokio::select! { 612 _ = shutdown_rx.changed() => { 613 if *shutdown_rx.borrow() { 614 info!("Backup service shutting down"); 615 break; 616 } 617 } 618 _ = ticker.tick() => { 619 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await { 620 error!("Error processing scheduled backups: {}", e); 621 } 622 } 623 } 624 } 625} 626 627async fn process_scheduled_backups( 628 db: &PgPool, 629 block_store: &PostgresBlockStore, 630 backup_storage: &BackupStorage, 631) -> Result<(), String> { 632 let backup_interval_secs = BackupStorage::interval_secs() as i64; 633 let retention_count = BackupStorage::retention_count(); 634 635 let users_needing_backup = sqlx::query!( 636 r#" 637 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev 638 FROM users u 639 JOIN repos r ON r.user_id = u.id 640 WHERE u.backup_enabled = true 641 AND u.deactivated_at IS NULL 642 AND ( 643 NOT EXISTS ( 644 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id 645 ) 646 OR ( 647 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id 648 ) < NOW() - make_interval(secs => $1) 649 ) 650 LIMIT 50 651 "#, 652 backup_interval_secs as f64 653 ) 654 .fetch_all(db) 655 .await 656 .map_err(|e| format!("DB error fetching users for backup: {}", e))?; 657 658 if users_needing_backup.is_empty() { 659 debug!("No accounts need backup"); 660 return Ok(()); 661 } 662 663 info!( 664 count = users_needing_backup.len(), 665 "Processing scheduled backups" 666 ); 667 668 for user in users_needing_backup { 669 let repo_root_cid = user.repo_root_cid.clone(); 670 671 let repo_rev = match &user.repo_rev { 672 Some(rev) => rev.clone(), 673 None => { 674 warn!(did = %user.did, "User has no repo_rev, skipping backup"); 675 continue; 676 } 677 }; 678 679 let head_cid = match Cid::from_str(&repo_root_cid) { 680 Ok(c) => c, 681 Err(e) => { 682 warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup"); 683 continue; 684 } 685 }; 686 687 let car_result = generate_full_backup(db, block_store, user.user_id, &head_cid).await; 688 let car_bytes = match car_result { 689 Ok(bytes) => bytes, 690 Err(e) => { 691 warn!(did = %user.did, error = %e, "Failed to generate CAR for backup"); 692 continue; 693 } 694 }; 695 696 let block_count = count_car_blocks(&car_bytes); 697 let size_bytes = car_bytes.len() as i64; 698 699 let storage_key = match backup_storage 700 .put_backup(&user.did, &repo_rev, &car_bytes) 701 .await 702 { 703 Ok(key) => key, 704 Err(e) => { 705 warn!(did = %user.did, error = %e, "Failed to upload backup to storage"); 706 continue; 707 } 708 }; 709 710 if let Err(e) = sqlx::query!( 711 r#" 712 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) 713 VALUES ($1, $2, $3, $4, $5, $6) 714 "#, 715 user.user_id, 716 storage_key, 717 repo_root_cid, 718 repo_rev, 719 block_count, 720 size_bytes 721 ) 722 .execute(db) 723 .await 724 { 725 warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload"); 726 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { 727 error!( 728 did = %user.did, 729 storage_key = %storage_key, 730 error = %rollback_err, 731 "Failed to rollback orphaned backup from S3" 732 ); 733 } 734 continue; 735 } 736 737 info!( 738 did = %user.did, 739 rev = %repo_rev, 740 size_bytes, 741 block_count, 742 "Created backup" 743 ); 744 745 if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await 746 { 747 warn!(did = %user.did, error = %e, "Failed to cleanup old backups"); 748 } 749 } 750 751 Ok(()) 752} 753 754pub async fn generate_repo_car( 755 block_store: &PostgresBlockStore, 756 head_cid: &Cid, 757) -> Result<Vec<u8>, String> { 758 use jacquard_repo::storage::BlockStore; 759 760 let block_cids_bytes = collect_current_repo_blocks(block_store, head_cid).await?; 761 let block_cids: Vec<Cid> = block_cids_bytes 762 .iter() 763 .filter_map(|b| Cid::try_from(b.as_slice()).ok()) 764 .collect(); 765 766 let car_bytes = 767 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 768 769 let blocks = block_store 770 .get_many(&block_cids) 771 .await 772 .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?; 773 774 let car_bytes = block_cids 775 .iter() 776 .zip(blocks.iter()) 777 .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 778 .fold(car_bytes, |mut acc, (cid, block)| { 779 acc.extend(encode_car_block(cid, block)); 780 acc 781 }); 782 783 Ok(car_bytes) 784} 785 786fn encode_car_block(cid: &Cid, block: &[u8]) -> Vec<u8> { 787 use std::io::Write; 788 let cid_bytes = cid.to_bytes(); 789 let total_len = cid_bytes.len() + block.len(); 790 let mut writer = Vec::new(); 791 crate::sync::car::write_varint(&mut writer, total_len as u64) 792 .expect("Writing to Vec<u8> should never fail"); 793 writer 794 .write_all(&cid_bytes) 795 .expect("Writing to Vec<u8> should never fail"); 796 writer 797 .write_all(block) 798 .expect("Writing to Vec<u8> should never fail"); 799 writer 800} 801 802pub async fn generate_repo_car_from_user_blocks( 803 db: &PgPool, 804 block_store: &PostgresBlockStore, 805 user_id: uuid::Uuid, 806 _head_cid: &Cid, 807) -> Result<Vec<u8>, String> { 808 use std::str::FromStr; 809 810 let repo_root_cid_str: String = sqlx::query_scalar!( 811 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 812 user_id 813 ) 814 .fetch_optional(db) 815 .await 816 .map_err(|e| format!("Failed to fetch repo: {}", e))? 817 .ok_or_else(|| "Repository not found".to_string())?; 818 819 let actual_head_cid = Cid::from_str(&repo_root_cid_str) 820 .map_err(|e| format!("Invalid repo_root_cid: {}", e))?; 821 822 generate_repo_car(block_store, &actual_head_cid).await 823} 824 825pub async fn generate_full_backup( 826 db: &PgPool, 827 block_store: &PostgresBlockStore, 828 user_id: uuid::Uuid, 829 head_cid: &Cid, 830) -> Result<Vec<u8>, String> { 831 generate_repo_car_from_user_blocks(db, block_store, user_id, head_cid).await 832} 833 834pub fn count_car_blocks(car_bytes: &[u8]) -> i32 { 835 let mut count = 0; 836 let mut pos = 0; 837 838 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) { 839 pos += header_varint_len + header_len as usize; 840 } else { 841 return 0; 842 } 843 844 while pos < car_bytes.len() { 845 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) { 846 pos += varint_len + block_len as usize; 847 count += 1; 848 } else { 849 break; 850 } 851 } 852 853 count 854} 855 856fn read_varint(data: &[u8]) -> Option<(u64, usize)> { 857 let mut value: u64 = 0; 858 let mut shift = 0; 859 let mut pos = 0; 860 861 while pos < data.len() && pos < 10 { 862 let byte = data[pos]; 863 value |= ((byte & 0x7f) as u64) << shift; 864 pos += 1; 865 if byte & 0x80 == 0 { 866 return Some((value, pos)); 867 } 868 shift += 7; 869 } 870 871 None 872} 873 874async fn cleanup_old_backups( 875 db: &PgPool, 876 backup_storage: &BackupStorage, 877 user_id: uuid::Uuid, 878 retention_count: u32, 879) -> Result<(), String> { 880 let old_backups = sqlx::query!( 881 r#" 882 SELECT id, storage_key 883 FROM account_backups 884 WHERE user_id = $1 885 ORDER BY created_at DESC 886 OFFSET $2 887 "#, 888 user_id, 889 retention_count as i64 890 ) 891 .fetch_all(db) 892 .await 893 .map_err(|e| format!("DB error fetching old backups: {}", e))?; 894 895 for backup in old_backups { 896 if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await { 897 warn!( 898 storage_key = %backup.storage_key, 899 error = %e, 900 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" 901 ); 902 continue; 903 } 904 905 sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id) 906 .execute(db) 907 .await 908 .map_err(|e| format!("Failed to delete old backup record: {}", e))?; 909 } 910 911 Ok(()) 912}