this repo has no description
at main 30 kB view raw
1use cid::Cid; 2use ipld_core::ipld::Ipld; 3use jacquard_repo::commit::Commit; 4use jacquard_repo::storage::BlockStore; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::{BackupStorage, BlobStorage}; 15use crate::sync::car::encode_car_header; 16 17async fn update_genesis_blocks_cids(db: &PgPool, blocks_cids: &[String], seq: i64) -> Result<(), sqlx::Error> { 18 sqlx::query!( 19 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 20 blocks_cids, 21 seq 22 ) 23 .execute(db) 24 .await?; 25 Ok(()) 26} 27 28async fn update_repo_rev(db: &PgPool, rev: &str, user_id: uuid::Uuid) -> Result<(), sqlx::Error> { 29 sqlx::query!( 30 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 31 rev, 32 user_id 33 ) 34 .execute(db) 35 .await?; 36 Ok(()) 37} 38 39async fn insert_user_blocks(db: &PgPool, user_id: uuid::Uuid, block_cids: &[Vec<u8>]) -> Result<(), sqlx::Error> { 40 sqlx::query!( 41 r#" 42 INSERT INTO user_blocks (user_id, block_cid) 43 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 44 ON CONFLICT (user_id, block_cid) DO NOTHING 45 "#, 46 user_id, 47 block_cids 48 ) 49 .execute(db) 50 .await?; 51 Ok(()) 52} 53 54async fn fetch_user_records(db: &PgPool, user_id: uuid::Uuid) -> Result<Vec<(String, String, String)>, sqlx::Error> { 55 let rows = sqlx::query!( 56 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 57 user_id 58 ) 59 .fetch_all(db) 60 .await?; 61 Ok(rows.into_iter().map(|r| (r.collection, r.rkey, r.record_cid)).collect()) 62} 63 64async fn insert_record_blobs(db: &PgPool, user_id: uuid::Uuid, record_uris: &[String], blob_cids: &[String]) -> Result<(), sqlx::Error> { 65 sqlx::query!( 66 r#" 67 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 68 SELECT $1, record_uri, blob_cid 69 FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid) 70 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 71 "#, 72 user_id, 73 record_uris, 74 blob_cids 75 ) 76 .execute(db) 77 .await?; 78 Ok(()) 79} 80 81async fn delete_backup_record(db: &PgPool, id: uuid::Uuid) -> Result<(), sqlx::Error> { 82 sqlx::query!("DELETE FROM account_backups WHERE id = $1", id) 83 .execute(db) 84 .await?; 85 Ok(()) 86} 87 88async fn fetch_old_backups( 89 db: &PgPool, 90 user_id: uuid::Uuid, 91 retention_count: i64, 92) -> Result<Vec<(uuid::Uuid, String)>, sqlx::Error> { 93 let rows = sqlx::query!( 94 r#" 95 SELECT id, storage_key 96 FROM account_backups 97 WHERE user_id = $1 98 ORDER BY created_at DESC 99 OFFSET $2 100 "#, 101 user_id, 102 retention_count 103 ) 104 .fetch_all(db) 105 .await?; 106 Ok(rows.into_iter().map(|r| (r.id, r.storage_key)).collect()) 107} 108 109async fn insert_backup_record( 110 db: &PgPool, 111 user_id: uuid::Uuid, 112 storage_key: &str, 113 repo_root_cid: &str, 114 repo_rev: &str, 115 block_count: i32, 116 size_bytes: i64, 117) -> Result<(), sqlx::Error> { 118 sqlx::query!( 119 r#" 120 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) 121 VALUES ($1, $2, $3, $4, $5, $6) 122 "#, 123 user_id, 124 storage_key, 125 repo_root_cid, 126 repo_rev, 127 block_count, 128 size_bytes 129 ) 130 .execute(db) 131 .await?; 132 Ok(()) 133} 134 135struct GenesisCommitRow { 136 seq: i64, 137 did: String, 138 commit_cid: Option<String>, 139} 140 141async fn process_genesis_commit( 142 db: &PgPool, 143 block_store: &PostgresBlockStore, 144 row: GenesisCommitRow, 145) -> Result<(String, i64), (i64, &'static str)> { 146 let commit_cid_str = row.commit_cid.ok_or((row.seq, "missing commit_cid"))?; 147 let commit_cid = Cid::from_str(&commit_cid_str).map_err(|_| (row.seq, "invalid CID"))?; 148 let block = block_store 149 .get(&commit_cid) 150 .await 151 .map_err(|_| (row.seq, "failed to fetch block"))? 152 .ok_or((row.seq, "block not found"))?; 153 let commit = Commit::from_cbor(&block).map_err(|_| (row.seq, "failed to parse commit"))?; 154 let blocks_cids = vec![commit.data.to_string(), commit_cid.to_string()]; 155 update_genesis_blocks_cids(db, &blocks_cids, row.seq) 156 .await 157 .map_err(|_| (row.seq, "failed to update"))?; 158 Ok((row.did, row.seq)) 159} 160 161pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 162 let broken_genesis_commits = match sqlx::query!( 163 r#" 164 SELECT seq, did, commit_cid 165 FROM repo_seq 166 WHERE event_type = 'commit' 167 AND prev_cid IS NULL 168 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 169 "# 170 ) 171 .fetch_all(db) 172 .await 173 { 174 Ok(rows) => rows, 175 Err(e) => { 176 error!("Failed to query repo_seq for genesis commit backfill: {}", e); 177 return; 178 } 179 }; 180 181 if broken_genesis_commits.is_empty() { 182 debug!("No genesis commits need blocks_cids backfill"); 183 return; 184 } 185 186 info!( 187 count = broken_genesis_commits.len(), 188 "Backfilling blocks_cids for genesis commits" 189 ); 190 191 let results = futures::future::join_all(broken_genesis_commits.into_iter().map(|row| { 192 process_genesis_commit( 193 db, 194 &block_store, 195 GenesisCommitRow { 196 seq: row.seq, 197 did: row.did, 198 commit_cid: row.commit_cid, 199 }, 200 ) 201 })) 202 .await; 203 204 let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { 205 Ok((did, seq)) => { 206 info!(seq = seq, did = %did, "Fixed genesis commit blocks_cids"); 207 (s + 1, f) 208 } 209 Err((seq, reason)) => { 210 warn!(seq = seq, reason = reason, "Failed to process genesis commit"); 211 (s, f + 1) 212 } 213 }); 214 215 info!( 216 success, 217 failed, "Completed genesis commit blocks_cids backfill" 218 ); 219} 220 221async fn process_repo_rev( 222 db: &PgPool, 223 block_store: &PostgresBlockStore, 224 user_id: uuid::Uuid, 225 repo_root_cid: String, 226) -> Result<uuid::Uuid, uuid::Uuid> { 227 let cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?; 228 let block = block_store 229 .get(&cid) 230 .await 231 .ok() 232 .flatten() 233 .ok_or(user_id)?; 234 let commit = Commit::from_cbor(&block).map_err(|_| user_id)?; 235 let rev = commit.rev().to_string(); 236 update_repo_rev(db, &rev, user_id) 237 .await 238 .map_err(|_| user_id)?; 239 Ok(user_id) 240} 241 242pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 243 let repos_missing_rev = 244 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL") 245 .fetch_all(db) 246 .await 247 { 248 Ok(rows) => rows, 249 Err(e) => { 250 error!("Failed to query repos for backfill: {}", e); 251 return; 252 } 253 }; 254 255 if repos_missing_rev.is_empty() { 256 debug!("No repos need repo_rev backfill"); 257 return; 258 } 259 260 info!( 261 count = repos_missing_rev.len(), 262 "Backfilling repo_rev for existing repos" 263 ); 264 265 let results = futures::future::join_all(repos_missing_rev.into_iter().map(|repo| { 266 process_repo_rev(db, &block_store, repo.user_id, repo.repo_root_cid) 267 })) 268 .await; 269 270 let (success, failed) = results 271 .iter() 272 .fold((0, 0), |(s, f), r| match r { 273 Ok(_) => (s + 1, f), 274 Err(user_id) => { 275 warn!(user_id = %user_id, "Failed to update repo_rev"); 276 (s, f + 1) 277 } 278 }); 279 280 info!(success, failed, "Completed repo_rev backfill"); 281} 282 283async fn process_user_blocks( 284 db: &PgPool, 285 block_store: &PostgresBlockStore, 286 user_id: uuid::Uuid, 287 repo_root_cid: String, 288) -> Result<(uuid::Uuid, usize), uuid::Uuid> { 289 let root_cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?; 290 let block_cids = collect_current_repo_blocks(block_store, &root_cid) 291 .await 292 .map_err(|_| user_id)?; 293 if block_cids.is_empty() { 294 return Err(user_id); 295 } 296 let count = block_cids.len(); 297 insert_user_blocks(db, user_id, &block_cids) 298 .await 299 .map_err(|_| user_id)?; 300 Ok((user_id, count)) 301} 302 303pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 304 let users_without_blocks = match sqlx::query!( 305 r#" 306 SELECT u.id as user_id, r.repo_root_cid 307 FROM users u 308 JOIN repos r ON r.user_id = u.id 309 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 310 "# 311 ) 312 .fetch_all(db) 313 .await 314 { 315 Ok(rows) => rows, 316 Err(e) => { 317 error!("Failed to query users for user_blocks backfill: {}", e); 318 return; 319 } 320 }; 321 322 if users_without_blocks.is_empty() { 323 debug!("No users need user_blocks backfill"); 324 return; 325 } 326 327 info!( 328 count = users_without_blocks.len(), 329 "Backfilling user_blocks for existing repos" 330 ); 331 332 let results = futures::future::join_all(users_without_blocks.into_iter().map(|user| { 333 process_user_blocks(db, &block_store, user.user_id, user.repo_root_cid) 334 })) 335 .await; 336 337 let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { 338 Ok((user_id, count)) => { 339 info!(user_id = %user_id, block_count = count, "Backfilled user_blocks"); 340 (s + 1, f) 341 } 342 Err(user_id) => { 343 warn!(user_id = %user_id, "Failed to backfill user_blocks"); 344 (s, f + 1) 345 } 346 }); 347 348 info!(success, failed, "Completed user_blocks backfill"); 349} 350 351pub async fn collect_current_repo_blocks( 352 block_store: &PostgresBlockStore, 353 head_cid: &Cid, 354) -> Result<Vec<Vec<u8>>, String> { 355 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 356 let mut to_visit = vec![*head_cid]; 357 let mut visited = std::collections::HashSet::new(); 358 359 while let Some(cid) = to_visit.pop() { 360 if visited.contains(&cid) { 361 continue; 362 } 363 visited.insert(cid); 364 block_cids.push(cid.to_bytes()); 365 366 let block = match block_store.get(&cid).await { 367 Ok(Some(b)) => b, 368 Ok(None) => continue, 369 Err(e) => return Err(format!("Failed to get block {}: {:?}", cid, e)), 370 }; 371 372 if let Ok(commit) = Commit::from_cbor(&block) { 373 to_visit.push(commit.data); 374 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 375 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 376 to_visit.push(*left_cid); 377 } 378 if let Some(Ipld::List(entries)) = obj.get("e") { 379 to_visit.extend( 380 entries 381 .iter() 382 .filter_map(|entry| match entry { 383 Ipld::Map(entry_obj) => Some(entry_obj), 384 _ => None, 385 }) 386 .flat_map(|entry_obj| { 387 [entry_obj.get("t"), entry_obj.get("v")] 388 .into_iter() 389 .flatten() 390 .filter_map(|v| match v { 391 Ipld::Link(cid) => Some(*cid), 392 _ => None, 393 }) 394 }), 395 ); 396 } 397 } 398 } 399 400 Ok(block_cids) 401} 402 403async fn process_record_blobs( 404 db: &PgPool, 405 block_store: &PostgresBlockStore, 406 user_id: uuid::Uuid, 407 did: String, 408) -> Result<(uuid::Uuid, String, usize), (uuid::Uuid, &'static str)> { 409 let records = fetch_user_records(db, user_id) 410 .await 411 .map_err(|_| (user_id, "failed to fetch records"))?; 412 413 let mut batch_record_uris: Vec<String> = Vec::new(); 414 let mut batch_blob_cids: Vec<String> = Vec::new(); 415 416 futures::future::join_all(records.into_iter().map(|(collection, rkey, record_cid)| { 417 let did = did.clone(); 418 async move { 419 let cid = Cid::from_str(&record_cid).ok()?; 420 let block_bytes = block_store.get(&cid).await.ok()??; 421 let record_ipld: Ipld = serde_ipld_dagcbor::from_slice(&block_bytes).ok()?; 422 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 423 Some( 424 blob_refs 425 .into_iter() 426 .map(|blob_ref| { 427 let record_uri = format!("at://{}/{}/{}", did, collection, rkey); 428 (record_uri, blob_ref.cid) 429 }) 430 .collect::<Vec<_>>(), 431 ) 432 } 433 })) 434 .await 435 .into_iter() 436 .flatten() 437 .flatten() 438 .for_each(|(uri, cid)| { 439 batch_record_uris.push(uri); 440 batch_blob_cids.push(cid); 441 }); 442 443 let blob_refs_found = batch_record_uris.len(); 444 if !batch_record_uris.is_empty() { 445 insert_record_blobs(db, user_id, &batch_record_uris, &batch_blob_cids) 446 .await 447 .map_err(|_| (user_id, "failed to insert"))?; 448 } 449 Ok((user_id, did, blob_refs_found)) 450} 451 452pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { 453 let users_needing_backfill = match sqlx::query!( 454 r#" 455 SELECT DISTINCT u.id as user_id, u.did 456 FROM users u 457 JOIN records r ON r.repo_id = u.id 458 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id) 459 LIMIT 100 460 "# 461 ) 462 .fetch_all(db) 463 .await 464 { 465 Ok(rows) => rows, 466 Err(e) => { 467 error!("Failed to query users for record_blobs backfill: {}", e); 468 return; 469 } 470 }; 471 472 if users_needing_backfill.is_empty() { 473 debug!("No users need record_blobs backfill"); 474 return; 475 } 476 477 info!( 478 count = users_needing_backfill.len(), 479 "Backfilling record_blobs for existing repos" 480 ); 481 482 let results = futures::future::join_all(users_needing_backfill.into_iter().map(|user| { 483 process_record_blobs(db, &block_store, user.user_id, user.did) 484 })) 485 .await; 486 487 let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { 488 Ok((user_id, did, blob_refs)) => { 489 if *blob_refs > 0 { 490 info!(user_id = %user_id, did = %did, blob_refs = blob_refs, "Backfilled record_blobs"); 491 } 492 (s + 1, f) 493 } 494 Err((user_id, reason)) => { 495 warn!(user_id = %user_id, reason = reason, "Failed to backfill record_blobs"); 496 (s, f + 1) 497 } 498 }); 499 500 info!(success, failed, "Completed record_blobs backfill"); 501} 502 503pub async fn start_scheduled_tasks( 504 db: PgPool, 505 blob_store: Arc<dyn BlobStorage>, 506 mut shutdown_rx: watch::Receiver<bool>, 507) { 508 let check_interval = Duration::from_secs( 509 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 510 .ok() 511 .and_then(|s| s.parse().ok()) 512 .unwrap_or(3600), 513 ); 514 515 info!( 516 check_interval_secs = check_interval.as_secs(), 517 "Starting scheduled tasks service" 518 ); 519 520 let mut ticker = interval(check_interval); 521 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 522 523 loop { 524 tokio::select! { 525 _ = shutdown_rx.changed() => { 526 if *shutdown_rx.borrow() { 527 info!("Scheduled tasks service shutting down"); 528 break; 529 } 530 } 531 _ = ticker.tick() => { 532 if let Err(e) = process_scheduled_deletions(&db, blob_store.as_ref()).await { 533 error!("Error processing scheduled deletions: {}", e); 534 } 535 } 536 } 537 } 538} 539 540async fn process_scheduled_deletions( 541 db: &PgPool, 542 blob_store: &dyn BlobStorage, 543) -> Result<(), String> { 544 let accounts_to_delete = sqlx::query!( 545 r#" 546 SELECT did, handle 547 FROM users 548 WHERE delete_after IS NOT NULL 549 AND delete_after < NOW() 550 AND deactivated_at IS NOT NULL 551 LIMIT 100 552 "# 553 ) 554 .fetch_all(db) 555 .await 556 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 557 558 if accounts_to_delete.is_empty() { 559 debug!("No accounts scheduled for deletion"); 560 return Ok(()); 561 } 562 563 info!( 564 count = accounts_to_delete.len(), 565 "Processing scheduled account deletions" 566 ); 567 568 futures::future::join_all(accounts_to_delete.into_iter().map(|account| async move { 569 let result = delete_account_data(db, blob_store, &account.did, &account.handle).await; 570 (account.did, account.handle, result) 571 })) 572 .await 573 .into_iter() 574 .for_each(|(did, handle, result)| match result { 575 Ok(()) => info!(did = %did, handle = %handle, "Successfully deleted scheduled account"), 576 Err(e) => warn!(did = %did, handle = %handle, error = %e, "Failed to delete scheduled account"), 577 }); 578 579 Ok(()) 580} 581 582async fn delete_account_data( 583 db: &PgPool, 584 blob_store: &dyn BlobStorage, 585 did: &str, 586 _handle: &str, 587) -> Result<(), String> { 588 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 589 .fetch_one(db) 590 .await 591 .map_err(|e| format!("DB error fetching user: {}", e))?; 592 593 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 594 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 595 user_id 596 ) 597 .fetch_all(db) 598 .await 599 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 600 601 futures::future::join_all(blob_storage_keys.iter().map(|storage_key| async move { 602 (storage_key, blob_store.delete(storage_key).await) 603 })) 604 .await 605 .into_iter() 606 .filter_map(|(key, result)| result.err().map(|e| (key, e))) 607 .for_each(|(key, e)| { 608 warn!(storage_key = %key, error = %e, "Failed to delete blob from storage (continuing anyway)"); 609 }); 610 611 let mut tx = db 612 .begin() 613 .await 614 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 615 616 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 617 .execute(&mut *tx) 618 .await 619 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 620 621 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 622 .execute(&mut *tx) 623 .await 624 .map_err(|e| format!("Failed to delete user: {}", e))?; 625 626 let account_seq = sqlx::query_scalar!( 627 r#" 628 INSERT INTO repo_seq (did, event_type, active, status) 629 VALUES ($1, 'account', false, 'deleted') 630 RETURNING seq 631 "#, 632 did 633 ) 634 .fetch_one(&mut *tx) 635 .await 636 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 637 638 sqlx::query!( 639 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 640 did, 641 account_seq 642 ) 643 .execute(&mut *tx) 644 .await 645 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 646 647 tx.commit() 648 .await 649 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 650 651 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 652 .execute(db) 653 .await 654 .map_err(|e| format!("Failed to notify: {}", e))?; 655 656 info!( 657 did = %did, 658 blob_count = blob_storage_keys.len(), 659 "Deleted account data including blobs from storage" 660 ); 661 662 Ok(()) 663} 664 665pub async fn start_backup_tasks( 666 db: PgPool, 667 block_store: PostgresBlockStore, 668 backup_storage: Arc<BackupStorage>, 669 mut shutdown_rx: watch::Receiver<bool>, 670) { 671 let backup_interval = Duration::from_secs(BackupStorage::interval_secs()); 672 673 info!( 674 interval_secs = backup_interval.as_secs(), 675 retention_count = BackupStorage::retention_count(), 676 "Starting backup service" 677 ); 678 679 let mut ticker = interval(backup_interval); 680 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 681 682 loop { 683 tokio::select! { 684 _ = shutdown_rx.changed() => { 685 if *shutdown_rx.borrow() { 686 info!("Backup service shutting down"); 687 break; 688 } 689 } 690 _ = ticker.tick() => { 691 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await { 692 error!("Error processing scheduled backups: {}", e); 693 } 694 } 695 } 696 } 697} 698 699struct BackupResult { 700 did: String, 701 repo_rev: String, 702 size_bytes: i64, 703 block_count: i32, 704 user_id: uuid::Uuid, 705} 706 707enum BackupOutcome { 708 Success(BackupResult), 709 Skipped(String, &'static str), 710 Failed(String, String), 711} 712 713async fn process_single_backup( 714 db: &PgPool, 715 block_store: &PostgresBlockStore, 716 backup_storage: &BackupStorage, 717 user_id: uuid::Uuid, 718 did: String, 719 repo_root_cid: String, 720 repo_rev: Option<String>, 721) -> BackupOutcome { 722 let repo_rev = match repo_rev { 723 Some(rev) => rev, 724 None => return BackupOutcome::Skipped(did, "no repo_rev"), 725 }; 726 727 let head_cid = match Cid::from_str(&repo_root_cid) { 728 Ok(c) => c, 729 Err(_) => return BackupOutcome::Skipped(did, "invalid repo_root_cid"), 730 }; 731 732 let car_bytes = match generate_full_backup(db, block_store, user_id, &head_cid).await { 733 Ok(bytes) => bytes, 734 Err(e) => return BackupOutcome::Failed(did, format!("CAR generation: {}", e)), 735 }; 736 737 let block_count = count_car_blocks(&car_bytes); 738 let size_bytes = car_bytes.len() as i64; 739 740 let storage_key = match backup_storage.put_backup(&did, &repo_rev, &car_bytes).await { 741 Ok(key) => key, 742 Err(e) => return BackupOutcome::Failed(did, format!("S3 upload: {}", e)), 743 }; 744 745 if let Err(e) = insert_backup_record( 746 db, 747 user_id, 748 &storage_key, 749 &repo_root_cid, 750 &repo_rev, 751 block_count, 752 size_bytes, 753 ) 754 .await 755 { 756 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { 757 error!( 758 did = %did, 759 storage_key = %storage_key, 760 error = %rollback_err, 761 "Failed to rollback orphaned backup from S3" 762 ); 763 } 764 return BackupOutcome::Failed(did, format!("DB insert: {}", e)); 765 } 766 767 BackupOutcome::Success(BackupResult { 768 did, 769 repo_rev, 770 size_bytes, 771 block_count, 772 user_id, 773 }) 774} 775 776async fn process_scheduled_backups( 777 db: &PgPool, 778 block_store: &PostgresBlockStore, 779 backup_storage: &BackupStorage, 780) -> Result<(), String> { 781 let backup_interval_secs = BackupStorage::interval_secs() as i64; 782 let retention_count = BackupStorage::retention_count(); 783 784 let users_needing_backup = sqlx::query!( 785 r#" 786 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev 787 FROM users u 788 JOIN repos r ON r.user_id = u.id 789 WHERE u.backup_enabled = true 790 AND u.deactivated_at IS NULL 791 AND ( 792 NOT EXISTS ( 793 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id 794 ) 795 OR ( 796 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id 797 ) < NOW() - make_interval(secs => $1) 798 ) 799 LIMIT 50 800 "#, 801 backup_interval_secs as f64 802 ) 803 .fetch_all(db) 804 .await 805 .map_err(|e| format!("DB error fetching users for backup: {}", e))?; 806 807 if users_needing_backup.is_empty() { 808 debug!("No accounts need backup"); 809 return Ok(()); 810 } 811 812 info!( 813 count = users_needing_backup.len(), 814 "Processing scheduled backups" 815 ); 816 817 let results = futures::future::join_all(users_needing_backup.into_iter().map(|user| { 818 process_single_backup( 819 db, 820 block_store, 821 backup_storage, 822 user.user_id, 823 user.did, 824 user.repo_root_cid, 825 user.repo_rev, 826 ) 827 })) 828 .await; 829 830 futures::future::join_all(results.into_iter().map(|outcome| async move { 831 match outcome { 832 BackupOutcome::Success(result) => { 833 info!( 834 did = %result.did, 835 rev = %result.repo_rev, 836 size_bytes = result.size_bytes, 837 block_count = result.block_count, 838 "Created backup" 839 ); 840 if let Err(e) = 841 cleanup_old_backups(db, backup_storage, result.user_id, retention_count).await 842 { 843 warn!(did = %result.did, error = %e, "Failed to cleanup old backups"); 844 } 845 } 846 BackupOutcome::Skipped(did, reason) => { 847 warn!(did = %did, reason = reason, "Skipped backup"); 848 } 849 BackupOutcome::Failed(did, error) => { 850 warn!(did = %did, error = %error, "Failed backup"); 851 } 852 } 853 })) 854 .await; 855 856 Ok(()) 857} 858 859pub async fn generate_repo_car( 860 block_store: &PostgresBlockStore, 861 head_cid: &Cid, 862) -> Result<Vec<u8>, String> { 863 use jacquard_repo::storage::BlockStore; 864 865 let block_cids_bytes = collect_current_repo_blocks(block_store, head_cid).await?; 866 let block_cids: Vec<Cid> = block_cids_bytes 867 .iter() 868 .filter_map(|b| Cid::try_from(b.as_slice()).ok()) 869 .collect(); 870 871 let car_bytes = 872 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 873 874 let blocks = block_store 875 .get_many(&block_cids) 876 .await 877 .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?; 878 879 let car_bytes = block_cids 880 .iter() 881 .zip(blocks.iter()) 882 .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 883 .fold(car_bytes, |mut acc, (cid, block)| { 884 acc.extend(encode_car_block(cid, block)); 885 acc 886 }); 887 888 Ok(car_bytes) 889} 890 891fn encode_car_block(cid: &Cid, block: &[u8]) -> Vec<u8> { 892 use std::io::Write; 893 let cid_bytes = cid.to_bytes(); 894 let total_len = cid_bytes.len() + block.len(); 895 let mut writer = Vec::new(); 896 crate::sync::car::write_varint(&mut writer, total_len as u64) 897 .expect("Writing to Vec<u8> should never fail"); 898 writer 899 .write_all(&cid_bytes) 900 .expect("Writing to Vec<u8> should never fail"); 901 writer 902 .write_all(block) 903 .expect("Writing to Vec<u8> should never fail"); 904 writer 905} 906 907pub async fn generate_repo_car_from_user_blocks( 908 db: &PgPool, 909 block_store: &PostgresBlockStore, 910 user_id: uuid::Uuid, 911 _head_cid: &Cid, 912) -> Result<Vec<u8>, String> { 913 use std::str::FromStr; 914 915 let repo_root_cid_str: String = sqlx::query_scalar!( 916 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 917 user_id 918 ) 919 .fetch_optional(db) 920 .await 921 .map_err(|e| format!("Failed to fetch repo: {}", e))? 922 .ok_or_else(|| "Repository not found".to_string())?; 923 924 let actual_head_cid = 925 Cid::from_str(&repo_root_cid_str).map_err(|e| format!("Invalid repo_root_cid: {}", e))?; 926 927 generate_repo_car(block_store, &actual_head_cid).await 928} 929 930pub async fn generate_full_backup( 931 db: &PgPool, 932 block_store: &PostgresBlockStore, 933 user_id: uuid::Uuid, 934 head_cid: &Cid, 935) -> Result<Vec<u8>, String> { 936 generate_repo_car_from_user_blocks(db, block_store, user_id, head_cid).await 937} 938 939pub fn count_car_blocks(car_bytes: &[u8]) -> i32 { 940 let mut count = 0; 941 let mut pos = 0; 942 943 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) { 944 pos += header_varint_len + header_len as usize; 945 } else { 946 return 0; 947 } 948 949 while pos < car_bytes.len() { 950 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) { 951 pos += varint_len + block_len as usize; 952 count += 1; 953 } else { 954 break; 955 } 956 } 957 958 count 959} 960 961fn read_varint(data: &[u8]) -> Option<(u64, usize)> { 962 let mut value: u64 = 0; 963 let mut shift = 0; 964 let mut pos = 0; 965 966 while pos < data.len() && pos < 10 { 967 let byte = data[pos]; 968 value |= ((byte & 0x7f) as u64) << shift; 969 pos += 1; 970 if byte & 0x80 == 0 { 971 return Some((value, pos)); 972 } 973 shift += 7; 974 } 975 976 None 977} 978 979async fn cleanup_old_backups( 980 db: &PgPool, 981 backup_storage: &BackupStorage, 982 user_id: uuid::Uuid, 983 retention_count: u32, 984) -> Result<(), String> { 985 let old_backups = fetch_old_backups(db, user_id, retention_count as i64) 986 .await 987 .map_err(|e| format!("DB error fetching old backups: {}", e))?; 988 989 let results = futures::future::join_all(old_backups.into_iter().map(|(id, storage_key)| async move { 990 match backup_storage.delete_backup(&storage_key).await { 991 Ok(()) => match delete_backup_record(db, id).await { 992 Ok(()) => Ok(()), 993 Err(e) => Err(format!("DB delete failed for {}: {}", storage_key, e)), 994 }, 995 Err(e) => { 996 warn!( 997 storage_key = %storage_key, 998 error = %e, 999 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" 1000 ); 1001 Ok(()) 1002 } 1003 } 1004 })) 1005 .await; 1006 1007 results 1008 .into_iter() 1009 .find_map(|r| r.err()) 1010 .map_or(Ok(()), Err) 1011}