this repo has no description
1use cid::Cid;
2use ipld_core::ipld::Ipld;
3use jacquard_repo::commit::Commit;
4use jacquard_repo::storage::BlockStore;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::{BackupStorage, BlobStorage};
15use crate::sync::car::encode_car_header;
16
17pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
18 let broken_genesis_commits = match sqlx::query!(
19 r#"
20 SELECT seq, did, commit_cid
21 FROM repo_seq
22 WHERE event_type = 'commit'
23 AND prev_cid IS NULL
24 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)
25 "#
26 )
27 .fetch_all(db)
28 .await
29 {
30 Ok(rows) => rows,
31 Err(e) => {
32 error!("Failed to query repo_seq for genesis commit backfill: {}", e);
33 return;
34 }
35 };
36
37 if broken_genesis_commits.is_empty() {
38 debug!("No genesis commits need blocks_cids backfill");
39 return;
40 }
41
42 info!(
43 count = broken_genesis_commits.len(),
44 "Backfilling blocks_cids for genesis commits"
45 );
46
47 let mut success = 0;
48 let mut failed = 0;
49
50 for commit_row in broken_genesis_commits {
51 let commit_cid_str = match &commit_row.commit_cid {
52 Some(c) => c.clone(),
53 None => {
54 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid");
55 failed += 1;
56 continue;
57 }
58 };
59
60 let commit_cid = match Cid::from_str(&commit_cid_str) {
61 Ok(c) => c,
62 Err(_) => {
63 warn!(seq = commit_row.seq, "Invalid commit CID");
64 failed += 1;
65 continue;
66 }
67 };
68
69 let block = match block_store.get(&commit_cid).await {
70 Ok(Some(b)) => b,
71 Ok(None) => {
72 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store");
73 failed += 1;
74 continue;
75 }
76 Err(e) => {
77 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block");
78 failed += 1;
79 continue;
80 }
81 };
82
83 let commit = match Commit::from_cbor(&block) {
84 Ok(c) => c,
85 Err(e) => {
86 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit");
87 failed += 1;
88 continue;
89 }
90 };
91
92 let mst_root_cid = commit.data;
93 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
94
95 if let Err(e) = sqlx::query!(
96 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
97 &blocks_cids,
98 commit_row.seq
99 )
100 .execute(db)
101 .await
102 {
103 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids");
104 failed += 1;
105 } else {
106 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids");
107 success += 1;
108 }
109 }
110
111 info!(
112 success,
113 failed, "Completed genesis commit blocks_cids backfill"
114 );
115}
116
117pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
118 let repos_missing_rev =
119 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL")
120 .fetch_all(db)
121 .await
122 {
123 Ok(rows) => rows,
124 Err(e) => {
125 error!("Failed to query repos for backfill: {}", e);
126 return;
127 }
128 };
129
130 if repos_missing_rev.is_empty() {
131 debug!("No repos need repo_rev backfill");
132 return;
133 }
134
135 info!(
136 count = repos_missing_rev.len(),
137 "Backfilling repo_rev for existing repos"
138 );
139
140 let mut success = 0;
141 let mut failed = 0;
142
143 for repo in repos_missing_rev {
144 let cid = match Cid::from_str(&repo.repo_root_cid) {
145 Ok(c) => c,
146 Err(_) => {
147 failed += 1;
148 continue;
149 }
150 };
151
152 let block = match block_store.get(&cid).await {
153 Ok(Some(b)) => b,
154 _ => {
155 failed += 1;
156 continue;
157 }
158 };
159
160 let commit = match Commit::from_cbor(&block) {
161 Ok(c) => c,
162 Err(_) => {
163 failed += 1;
164 continue;
165 }
166 };
167
168 let rev = commit.rev().to_string();
169
170 if let Err(e) = sqlx::query!(
171 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
172 rev,
173 repo.user_id
174 )
175 .execute(db)
176 .await
177 {
178 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
179 failed += 1;
180 } else {
181 success += 1;
182 }
183 }
184
185 info!(success, failed, "Completed repo_rev backfill");
186}
187
188pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
189 let users_without_blocks = match sqlx::query!(
190 r#"
191 SELECT u.id as user_id, r.repo_root_cid
192 FROM users u
193 JOIN repos r ON r.user_id = u.id
194 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
195 "#
196 )
197 .fetch_all(db)
198 .await
199 {
200 Ok(rows) => rows,
201 Err(e) => {
202 error!("Failed to query users for user_blocks backfill: {}", e);
203 return;
204 }
205 };
206
207 if users_without_blocks.is_empty() {
208 debug!("No users need user_blocks backfill");
209 return;
210 }
211
212 info!(
213 count = users_without_blocks.len(),
214 "Backfilling user_blocks for existing repos"
215 );
216
217 let mut success = 0;
218 let mut failed = 0;
219
220 for user in users_without_blocks {
221 let root_cid = match Cid::from_str(&user.repo_root_cid) {
222 Ok(c) => c,
223 Err(_) => {
224 failed += 1;
225 continue;
226 }
227 };
228
229 match collect_current_repo_blocks(&block_store, &root_cid).await {
230 Ok(block_cids) => {
231 if block_cids.is_empty() {
232 failed += 1;
233 continue;
234 }
235
236 if let Err(e) = sqlx::query!(
237 r#"
238 INSERT INTO user_blocks (user_id, block_cid)
239 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
240 ON CONFLICT (user_id, block_cid) DO NOTHING
241 "#,
242 user.user_id,
243 &block_cids
244 )
245 .execute(db)
246 .await
247 {
248 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
249 failed += 1;
250 } else {
251 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
252 success += 1;
253 }
254 }
255 Err(e) => {
256 warn!(user_id = %user.user_id, error = %e, "Failed to collect repo blocks for backfill");
257 failed += 1;
258 }
259 }
260 }
261
262 info!(success, failed, "Completed user_blocks backfill");
263}
264
265pub async fn collect_current_repo_blocks(
266 block_store: &PostgresBlockStore,
267 head_cid: &Cid,
268) -> Result<Vec<Vec<u8>>, String> {
269 let mut block_cids: Vec<Vec<u8>> = Vec::new();
270 let mut to_visit = vec![*head_cid];
271 let mut visited = std::collections::HashSet::new();
272
273 while let Some(cid) = to_visit.pop() {
274 if visited.contains(&cid) {
275 continue;
276 }
277 visited.insert(cid);
278 block_cids.push(cid.to_bytes());
279
280 let block = match block_store.get(&cid).await {
281 Ok(Some(b)) => b,
282 Ok(None) => continue,
283 Err(e) => return Err(format!("Failed to get block {}: {:?}", cid, e)),
284 };
285
286 if let Ok(commit) = Commit::from_cbor(&block) {
287 to_visit.push(commit.data);
288 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
289 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
290 to_visit.push(*left_cid);
291 }
292 if let Some(Ipld::List(entries)) = obj.get("e") {
293 to_visit.extend(
294 entries
295 .iter()
296 .filter_map(|entry| match entry {
297 Ipld::Map(entry_obj) => Some(entry_obj),
298 _ => None,
299 })
300 .flat_map(|entry_obj| {
301 [entry_obj.get("t"), entry_obj.get("v")]
302 .into_iter()
303 .flatten()
304 .filter_map(|v| match v {
305 Ipld::Link(cid) => Some(*cid),
306 _ => None,
307 })
308 }),
309 );
310 }
311 }
312 }
313
314 Ok(block_cids)
315}
316
317pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
318 let users_needing_backfill = match sqlx::query!(
319 r#"
320 SELECT DISTINCT u.id as user_id, u.did
321 FROM users u
322 JOIN records r ON r.repo_id = u.id
323 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
324 LIMIT 100
325 "#
326 )
327 .fetch_all(db)
328 .await
329 {
330 Ok(rows) => rows,
331 Err(e) => {
332 error!("Failed to query users for record_blobs backfill: {}", e);
333 return;
334 }
335 };
336
337 if users_needing_backfill.is_empty() {
338 debug!("No users need record_blobs backfill");
339 return;
340 }
341
342 info!(
343 count = users_needing_backfill.len(),
344 "Backfilling record_blobs for existing repos"
345 );
346
347 let mut success = 0;
348 let mut failed = 0;
349
350 for user in users_needing_backfill {
351 let records = match sqlx::query!(
352 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
353 user.user_id
354 )
355 .fetch_all(db)
356 .await
357 {
358 Ok(r) => r,
359 Err(e) => {
360 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
361 failed += 1;
362 continue;
363 }
364 };
365
366 let mut batch_record_uris: Vec<String> = Vec::new();
367 let mut batch_blob_cids: Vec<String> = Vec::new();
368
369 for record in records {
370 let record_cid = match Cid::from_str(&record.record_cid) {
371 Ok(c) => c,
372 Err(_) => continue,
373 };
374
375 let block_bytes = match block_store.get(&record_cid).await {
376 Ok(Some(b)) => b,
377 _ => continue,
378 };
379
380 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
381 Ok(v) => v,
382 Err(_) => continue,
383 };
384
385 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
386 for blob_ref in blob_refs {
387 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey);
388 batch_record_uris.push(record_uri);
389 batch_blob_cids.push(blob_ref.cid);
390 }
391 }
392
393 let blob_refs_found = batch_record_uris.len();
394 if !batch_record_uris.is_empty() {
395 if let Err(e) = sqlx::query!(
396 r#"
397 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
398 SELECT $1, record_uri, blob_cid
399 FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)
400 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
401 "#,
402 user.user_id,
403 &batch_record_uris,
404 &batch_blob_cids
405 )
406 .execute(db)
407 .await
408 {
409 warn!(error = %e, "Failed to batch insert record_blobs during backfill");
410 } else {
411 info!(
412 user_id = %user.user_id,
413 did = %user.did,
414 blob_refs = blob_refs_found,
415 "Backfilled record_blobs"
416 );
417 }
418 }
419 success += 1;
420 }
421
422 info!(success, failed, "Completed record_blobs backfill");
423}
424
425pub async fn start_scheduled_tasks(
426 db: PgPool,
427 blob_store: Arc<dyn BlobStorage>,
428 mut shutdown_rx: watch::Receiver<bool>,
429) {
430 let check_interval = Duration::from_secs(
431 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
432 .ok()
433 .and_then(|s| s.parse().ok())
434 .unwrap_or(3600),
435 );
436
437 info!(
438 check_interval_secs = check_interval.as_secs(),
439 "Starting scheduled tasks service"
440 );
441
442 let mut ticker = interval(check_interval);
443 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
444
445 loop {
446 tokio::select! {
447 _ = shutdown_rx.changed() => {
448 if *shutdown_rx.borrow() {
449 info!("Scheduled tasks service shutting down");
450 break;
451 }
452 }
453 _ = ticker.tick() => {
454 if let Err(e) = process_scheduled_deletions(&db, blob_store.as_ref()).await {
455 error!("Error processing scheduled deletions: {}", e);
456 }
457 }
458 }
459 }
460}
461
462async fn process_scheduled_deletions(
463 db: &PgPool,
464 blob_store: &dyn BlobStorage,
465) -> Result<(), String> {
466 let accounts_to_delete = sqlx::query!(
467 r#"
468 SELECT did, handle
469 FROM users
470 WHERE delete_after IS NOT NULL
471 AND delete_after < NOW()
472 AND deactivated_at IS NOT NULL
473 LIMIT 100
474 "#
475 )
476 .fetch_all(db)
477 .await
478 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
479
480 if accounts_to_delete.is_empty() {
481 debug!("No accounts scheduled for deletion");
482 return Ok(());
483 }
484
485 info!(
486 count = accounts_to_delete.len(),
487 "Processing scheduled account deletions"
488 );
489
490 for account in accounts_to_delete {
491 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
492 warn!(
493 did = %account.did,
494 handle = %account.handle,
495 error = %e,
496 "Failed to delete scheduled account"
497 );
498 } else {
499 info!(
500 did = %account.did,
501 handle = %account.handle,
502 "Successfully deleted scheduled account"
503 );
504 }
505 }
506
507 Ok(())
508}
509
510async fn delete_account_data(
511 db: &PgPool,
512 blob_store: &dyn BlobStorage,
513 did: &str,
514 _handle: &str,
515) -> Result<(), String> {
516 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
517 .fetch_one(db)
518 .await
519 .map_err(|e| format!("DB error fetching user: {}", e))?;
520
521 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
522 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
523 user_id
524 )
525 .fetch_all(db)
526 .await
527 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
528
529 for storage_key in &blob_storage_keys {
530 if let Err(e) = blob_store.delete(storage_key).await {
531 warn!(
532 storage_key = %storage_key,
533 error = %e,
534 "Failed to delete blob from storage (continuing anyway)"
535 );
536 }
537 }
538
539 let mut tx = db
540 .begin()
541 .await
542 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
543
544 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
545 .execute(&mut *tx)
546 .await
547 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
548
549 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
550 .execute(&mut *tx)
551 .await
552 .map_err(|e| format!("Failed to delete user: {}", e))?;
553
554 let account_seq = sqlx::query_scalar!(
555 r#"
556 INSERT INTO repo_seq (did, event_type, active, status)
557 VALUES ($1, 'account', false, 'deleted')
558 RETURNING seq
559 "#,
560 did
561 )
562 .fetch_one(&mut *tx)
563 .await
564 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
565
566 sqlx::query!(
567 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
568 did,
569 account_seq
570 )
571 .execute(&mut *tx)
572 .await
573 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
574
575 tx.commit()
576 .await
577 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
578
579 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
580 .execute(db)
581 .await
582 .map_err(|e| format!("Failed to notify: {}", e))?;
583
584 info!(
585 did = %did,
586 blob_count = blob_storage_keys.len(),
587 "Deleted account data including blobs from storage"
588 );
589
590 Ok(())
591}
592
593pub async fn start_backup_tasks(
594 db: PgPool,
595 block_store: PostgresBlockStore,
596 backup_storage: Arc<BackupStorage>,
597 mut shutdown_rx: watch::Receiver<bool>,
598) {
599 let backup_interval = Duration::from_secs(BackupStorage::interval_secs());
600
601 info!(
602 interval_secs = backup_interval.as_secs(),
603 retention_count = BackupStorage::retention_count(),
604 "Starting backup service"
605 );
606
607 let mut ticker = interval(backup_interval);
608 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
609
610 loop {
611 tokio::select! {
612 _ = shutdown_rx.changed() => {
613 if *shutdown_rx.borrow() {
614 info!("Backup service shutting down");
615 break;
616 }
617 }
618 _ = ticker.tick() => {
619 if let Err(e) = process_scheduled_backups(&db, &block_store, &backup_storage).await {
620 error!("Error processing scheduled backups: {}", e);
621 }
622 }
623 }
624 }
625}
626
627async fn process_scheduled_backups(
628 db: &PgPool,
629 block_store: &PostgresBlockStore,
630 backup_storage: &BackupStorage,
631) -> Result<(), String> {
632 let backup_interval_secs = BackupStorage::interval_secs() as i64;
633 let retention_count = BackupStorage::retention_count();
634
635 let users_needing_backup = sqlx::query!(
636 r#"
637 SELECT u.id as user_id, u.did, r.repo_root_cid, r.repo_rev
638 FROM users u
639 JOIN repos r ON r.user_id = u.id
640 WHERE u.backup_enabled = true
641 AND u.deactivated_at IS NULL
642 AND (
643 NOT EXISTS (
644 SELECT 1 FROM account_backups ab WHERE ab.user_id = u.id
645 )
646 OR (
647 SELECT MAX(ab.created_at) FROM account_backups ab WHERE ab.user_id = u.id
648 ) < NOW() - make_interval(secs => $1)
649 )
650 LIMIT 50
651 "#,
652 backup_interval_secs as f64
653 )
654 .fetch_all(db)
655 .await
656 .map_err(|e| format!("DB error fetching users for backup: {}", e))?;
657
658 if users_needing_backup.is_empty() {
659 debug!("No accounts need backup");
660 return Ok(());
661 }
662
663 info!(
664 count = users_needing_backup.len(),
665 "Processing scheduled backups"
666 );
667
668 for user in users_needing_backup {
669 let repo_root_cid = user.repo_root_cid.clone();
670
671 let repo_rev = match &user.repo_rev {
672 Some(rev) => rev.clone(),
673 None => {
674 warn!(did = %user.did, "User has no repo_rev, skipping backup");
675 continue;
676 }
677 };
678
679 let head_cid = match Cid::from_str(&repo_root_cid) {
680 Ok(c) => c,
681 Err(e) => {
682 warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup");
683 continue;
684 }
685 };
686
687 let car_result = generate_full_backup(db, block_store, user.user_id, &head_cid).await;
688 let car_bytes = match car_result {
689 Ok(bytes) => bytes,
690 Err(e) => {
691 warn!(did = %user.did, error = %e, "Failed to generate CAR for backup");
692 continue;
693 }
694 };
695
696 let block_count = count_car_blocks(&car_bytes);
697 let size_bytes = car_bytes.len() as i64;
698
699 let storage_key = match backup_storage
700 .put_backup(&user.did, &repo_rev, &car_bytes)
701 .await
702 {
703 Ok(key) => key,
704 Err(e) => {
705 warn!(did = %user.did, error = %e, "Failed to upload backup to storage");
706 continue;
707 }
708 };
709
710 if let Err(e) = sqlx::query!(
711 r#"
712 INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)
713 VALUES ($1, $2, $3, $4, $5, $6)
714 "#,
715 user.user_id,
716 storage_key,
717 repo_root_cid,
718 repo_rev,
719 block_count,
720 size_bytes
721 )
722 .execute(db)
723 .await
724 {
725 warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload");
726 if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await {
727 error!(
728 did = %user.did,
729 storage_key = %storage_key,
730 error = %rollback_err,
731 "Failed to rollback orphaned backup from S3"
732 );
733 }
734 continue;
735 }
736
737 info!(
738 did = %user.did,
739 rev = %repo_rev,
740 size_bytes,
741 block_count,
742 "Created backup"
743 );
744
745 if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await
746 {
747 warn!(did = %user.did, error = %e, "Failed to cleanup old backups");
748 }
749 }
750
751 Ok(())
752}
753
754pub async fn generate_repo_car(
755 block_store: &PostgresBlockStore,
756 head_cid: &Cid,
757) -> Result<Vec<u8>, String> {
758 use jacquard_repo::storage::BlockStore;
759
760 let block_cids_bytes = collect_current_repo_blocks(block_store, head_cid).await?;
761 let block_cids: Vec<Cid> = block_cids_bytes
762 .iter()
763 .filter_map(|b| Cid::try_from(b.as_slice()).ok())
764 .collect();
765
766 let car_bytes =
767 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?;
768
769 let blocks = block_store
770 .get_many(&block_cids)
771 .await
772 .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?;
773
774 let car_bytes = block_cids
775 .iter()
776 .zip(blocks.iter())
777 .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block)))
778 .fold(car_bytes, |mut acc, (cid, block)| {
779 acc.extend(encode_car_block(cid, block));
780 acc
781 });
782
783 Ok(car_bytes)
784}
785
786fn encode_car_block(cid: &Cid, block: &[u8]) -> Vec<u8> {
787 use std::io::Write;
788 let cid_bytes = cid.to_bytes();
789 let total_len = cid_bytes.len() + block.len();
790 let mut writer = Vec::new();
791 crate::sync::car::write_varint(&mut writer, total_len as u64)
792 .expect("Writing to Vec<u8> should never fail");
793 writer
794 .write_all(&cid_bytes)
795 .expect("Writing to Vec<u8> should never fail");
796 writer
797 .write_all(block)
798 .expect("Writing to Vec<u8> should never fail");
799 writer
800}
801
802pub async fn generate_repo_car_from_user_blocks(
803 db: &PgPool,
804 block_store: &PostgresBlockStore,
805 user_id: uuid::Uuid,
806 _head_cid: &Cid,
807) -> Result<Vec<u8>, String> {
808 use std::str::FromStr;
809
810 let repo_root_cid_str: String = sqlx::query_scalar!(
811 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
812 user_id
813 )
814 .fetch_optional(db)
815 .await
816 .map_err(|e| format!("Failed to fetch repo: {}", e))?
817 .ok_or_else(|| "Repository not found".to_string())?;
818
819 let actual_head_cid = Cid::from_str(&repo_root_cid_str)
820 .map_err(|e| format!("Invalid repo_root_cid: {}", e))?;
821
822 generate_repo_car(block_store, &actual_head_cid).await
823}
824
825pub async fn generate_full_backup(
826 db: &PgPool,
827 block_store: &PostgresBlockStore,
828 user_id: uuid::Uuid,
829 head_cid: &Cid,
830) -> Result<Vec<u8>, String> {
831 generate_repo_car_from_user_blocks(db, block_store, user_id, head_cid).await
832}
833
834pub fn count_car_blocks(car_bytes: &[u8]) -> i32 {
835 let mut count = 0;
836 let mut pos = 0;
837
838 if let Some((header_len, header_varint_len)) = read_varint(&car_bytes[pos..]) {
839 pos += header_varint_len + header_len as usize;
840 } else {
841 return 0;
842 }
843
844 while pos < car_bytes.len() {
845 if let Some((block_len, varint_len)) = read_varint(&car_bytes[pos..]) {
846 pos += varint_len + block_len as usize;
847 count += 1;
848 } else {
849 break;
850 }
851 }
852
853 count
854}
855
856fn read_varint(data: &[u8]) -> Option<(u64, usize)> {
857 let mut value: u64 = 0;
858 let mut shift = 0;
859 let mut pos = 0;
860
861 while pos < data.len() && pos < 10 {
862 let byte = data[pos];
863 value |= ((byte & 0x7f) as u64) << shift;
864 pos += 1;
865 if byte & 0x80 == 0 {
866 return Some((value, pos));
867 }
868 shift += 7;
869 }
870
871 None
872}
873
874async fn cleanup_old_backups(
875 db: &PgPool,
876 backup_storage: &BackupStorage,
877 user_id: uuid::Uuid,
878 retention_count: u32,
879) -> Result<(), String> {
880 let old_backups = sqlx::query!(
881 r#"
882 SELECT id, storage_key
883 FROM account_backups
884 WHERE user_id = $1
885 ORDER BY created_at DESC
886 OFFSET $2
887 "#,
888 user_id,
889 retention_count as i64
890 )
891 .fetch_all(db)
892 .await
893 .map_err(|e| format!("DB error fetching old backups: {}", e))?;
894
895 for backup in old_backups {
896 if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await {
897 warn!(
898 storage_key = %backup.storage_key,
899 error = %e,
900 "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan"
901 );
902 continue;
903 }
904
905 sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id)
906 .execute(db)
907 .await
908 .map_err(|e| format!("Failed to delete old backup record: {}", e))?;
909 }
910
911 Ok(())
912}