this repo has no description
1use cid::Cid; 2use jacquard_repo::commit::Commit; 3use jacquard_repo::storage::BlockStore; 4use ipld_core::ipld::Ipld; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::BlobStorage; 15 16pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 17 let broken_genesis_commits = match sqlx::query!( 18 r#" 19 SELECT seq, did, commit_cid 20 FROM repo_seq 21 WHERE event_type = 'commit' 22 AND prev_cid IS NULL 23 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 24 "# 25 ) 26 .fetch_all(db) 27 .await 28 { 29 Ok(rows) => rows, 30 Err(e) => { 31 error!("Failed to query repo_seq for genesis commit backfill: {}", e); 32 return; 33 } 34 }; 35 36 if broken_genesis_commits.is_empty() { 37 debug!("No genesis commits need blocks_cids backfill"); 38 return; 39 } 40 41 info!( 42 count = broken_genesis_commits.len(), 43 "Backfilling blocks_cids for genesis commits" 44 ); 45 46 let mut success = 0; 47 let mut failed = 0; 48 49 for commit_row in broken_genesis_commits { 50 let commit_cid_str = match &commit_row.commit_cid { 51 Some(c) => c.clone(), 52 None => { 53 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 54 failed += 1; 55 continue; 56 } 57 }; 58 59 let commit_cid = match Cid::from_str(&commit_cid_str) { 60 Ok(c) => c, 61 Err(_) => { 62 warn!(seq = commit_row.seq, "Invalid commit CID"); 63 failed += 1; 64 continue; 65 } 66 }; 67 68 let block = match block_store.get(&commit_cid).await { 69 Ok(Some(b)) => b, 70 Ok(None) => { 71 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 72 failed += 1; 73 continue; 74 } 75 Err(e) => { 76 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 77 failed += 1; 78 continue; 79 } 80 }; 81 82 let commit = match Commit::from_cbor(&block) { 83 Ok(c) => c, 84 Err(e) => { 85 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 86 failed += 1; 87 continue; 88 } 89 }; 90 91 let mst_root_cid = commit.data; 92 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 93 94 if let Err(e) = sqlx::query!( 95 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 96 &blocks_cids, 97 commit_row.seq 98 ) 99 .execute(db) 100 .await 101 { 102 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 103 failed += 1; 104 } else { 105 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 106 success += 1; 107 } 108 } 109 110 info!(success, failed, "Completed genesis commit blocks_cids backfill"); 111} 112 113pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 114 let repos_missing_rev = match sqlx::query!( 115 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL" 116 ) 117 .fetch_all(db) 118 .await 119 { 120 Ok(rows) => rows, 121 Err(e) => { 122 error!("Failed to query repos for backfill: {}", e); 123 return; 124 } 125 }; 126 127 if repos_missing_rev.is_empty() { 128 debug!("No repos need repo_rev backfill"); 129 return; 130 } 131 132 info!( 133 count = repos_missing_rev.len(), 134 "Backfilling repo_rev for existing repos" 135 ); 136 137 let mut success = 0; 138 let mut failed = 0; 139 140 for repo in repos_missing_rev { 141 let cid = match Cid::from_str(&repo.repo_root_cid) { 142 Ok(c) => c, 143 Err(_) => { 144 failed += 1; 145 continue; 146 } 147 }; 148 149 let block = match block_store.get(&cid).await { 150 Ok(Some(b)) => b, 151 _ => { 152 failed += 1; 153 continue; 154 } 155 }; 156 157 let commit = match Commit::from_cbor(&block) { 158 Ok(c) => c, 159 Err(_) => { 160 failed += 1; 161 continue; 162 } 163 }; 164 165 let rev = commit.rev().to_string(); 166 167 if let Err(e) = sqlx::query!( 168 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 169 rev, 170 repo.user_id 171 ) 172 .execute(db) 173 .await 174 { 175 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 176 failed += 1; 177 } else { 178 success += 1; 179 } 180 } 181 182 info!(success, failed, "Completed repo_rev backfill"); 183} 184 185pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 186 let users_without_blocks = match sqlx::query!( 187 r#" 188 SELECT u.id as user_id, r.repo_root_cid 189 FROM users u 190 JOIN repos r ON r.user_id = u.id 191 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 192 "# 193 ) 194 .fetch_all(db) 195 .await 196 { 197 Ok(rows) => rows, 198 Err(e) => { 199 error!("Failed to query users for user_blocks backfill: {}", e); 200 return; 201 } 202 }; 203 204 if users_without_blocks.is_empty() { 205 debug!("No users need user_blocks backfill"); 206 return; 207 } 208 209 info!( 210 count = users_without_blocks.len(), 211 "Backfilling user_blocks for existing repos" 212 ); 213 214 let mut success = 0; 215 let mut failed = 0; 216 217 for user in users_without_blocks { 218 let root_cid = match Cid::from_str(&user.repo_root_cid) { 219 Ok(c) => c, 220 Err(_) => { 221 failed += 1; 222 continue; 223 } 224 }; 225 226 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 227 let mut to_visit = vec![root_cid]; 228 let mut visited = std::collections::HashSet::new(); 229 230 while let Some(cid) = to_visit.pop() { 231 if visited.contains(&cid) { 232 continue; 233 } 234 visited.insert(cid); 235 block_cids.push(cid.to_bytes()); 236 237 let block = match block_store.get(&cid).await { 238 Ok(Some(b)) => b, 239 _ => continue, 240 }; 241 242 if let Ok(commit) = Commit::from_cbor(&block) { 243 to_visit.push(commit.data); 244 if let Some(prev) = commit.prev { 245 to_visit.push(prev); 246 } 247 } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 248 if let Ipld::Map(ref obj) = ipld { 249 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 250 to_visit.push(*left_cid); 251 } 252 if let Some(Ipld::List(entries)) = obj.get("e") { 253 for entry in entries { 254 if let Ipld::Map(entry_obj) = entry { 255 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 256 to_visit.push(*tree_cid); 257 } 258 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 259 to_visit.push(*val_cid); 260 } 261 } 262 } 263 } 264 } 265 } 266 } 267 268 if block_cids.is_empty() { 269 failed += 1; 270 continue; 271 } 272 273 if let Err(e) = sqlx::query!( 274 r#" 275 INSERT INTO user_blocks (user_id, block_cid) 276 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 277 ON CONFLICT (user_id, block_cid) DO NOTHING 278 "#, 279 user.user_id, 280 &block_cids 281 ) 282 .execute(db) 283 .await 284 { 285 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 286 failed += 1; 287 } else { 288 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 289 success += 1; 290 } 291 } 292 293 info!(success, failed, "Completed user_blocks backfill"); 294} 295 296pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { 297 let users_needing_backfill = match sqlx::query!( 298 r#" 299 SELECT DISTINCT u.id as user_id, u.did 300 FROM users u 301 JOIN records r ON r.repo_id = u.id 302 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id) 303 LIMIT 100 304 "# 305 ) 306 .fetch_all(db) 307 .await 308 { 309 Ok(rows) => rows, 310 Err(e) => { 311 error!("Failed to query users for record_blobs backfill: {}", e); 312 return; 313 } 314 }; 315 316 if users_needing_backfill.is_empty() { 317 debug!("No users need record_blobs backfill"); 318 return; 319 } 320 321 info!( 322 count = users_needing_backfill.len(), 323 "Backfilling record_blobs for existing repos" 324 ); 325 326 let mut success = 0; 327 let mut failed = 0; 328 329 for user in users_needing_backfill { 330 let records = match sqlx::query!( 331 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 332 user.user_id 333 ) 334 .fetch_all(db) 335 .await 336 { 337 Ok(r) => r, 338 Err(e) => { 339 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill"); 340 failed += 1; 341 continue; 342 } 343 }; 344 345 let mut blob_refs_found = 0; 346 for record in records { 347 let record_cid = match Cid::from_str(&record.record_cid) { 348 Ok(c) => c, 349 Err(_) => continue, 350 }; 351 352 let block_bytes = match block_store.get(&record_cid).await { 353 Ok(Some(b)) => b, 354 _ => continue, 355 }; 356 357 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { 358 Ok(v) => v, 359 Err(_) => continue, 360 }; 361 362 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 363 for blob_ref in blob_refs { 364 let record_uri = format!( 365 "at://{}/{}/{}", 366 user.did, record.collection, record.rkey 367 ); 368 if let Err(e) = sqlx::query!( 369 r#" 370 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 371 VALUES ($1, $2, $3) 372 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 373 "#, 374 user.user_id, 375 record_uri, 376 blob_ref.cid 377 ) 378 .execute(db) 379 .await 380 { 381 warn!(error = %e, "Failed to insert record_blob during backfill"); 382 } else { 383 blob_refs_found += 1; 384 } 385 } 386 } 387 388 if blob_refs_found > 0 { 389 info!( 390 user_id = %user.user_id, 391 did = %user.did, 392 blob_refs = blob_refs_found, 393 "Backfilled record_blobs" 394 ); 395 } 396 success += 1; 397 } 398 399 info!(success, failed, "Completed record_blobs backfill"); 400} 401 402pub async fn start_scheduled_tasks( 403 db: PgPool, 404 blob_store: Arc<dyn BlobStorage>, 405 mut shutdown_rx: watch::Receiver<bool>, 406) { 407 let check_interval = Duration::from_secs( 408 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 409 .ok() 410 .and_then(|s| s.parse().ok()) 411 .unwrap_or(3600), 412 ); 413 414 info!( 415 check_interval_secs = check_interval.as_secs(), 416 "Starting scheduled tasks service" 417 ); 418 419 let mut ticker = interval(check_interval); 420 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 421 422 loop { 423 tokio::select! { 424 _ = shutdown_rx.changed() => { 425 if *shutdown_rx.borrow() { 426 info!("Scheduled tasks service shutting down"); 427 break; 428 } 429 } 430 _ = ticker.tick() => { 431 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await { 432 error!("Error processing scheduled deletions: {}", e); 433 } 434 } 435 } 436 } 437} 438 439async fn process_scheduled_deletions( 440 db: &PgPool, 441 blob_store: &Arc<dyn BlobStorage>, 442) -> Result<(), String> { 443 let accounts_to_delete = sqlx::query!( 444 r#" 445 SELECT did, handle 446 FROM users 447 WHERE delete_after IS NOT NULL 448 AND delete_after < NOW() 449 AND deactivated_at IS NOT NULL 450 LIMIT 100 451 "# 452 ) 453 .fetch_all(db) 454 .await 455 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 456 457 if accounts_to_delete.is_empty() { 458 debug!("No accounts scheduled for deletion"); 459 return Ok(()); 460 } 461 462 info!( 463 count = accounts_to_delete.len(), 464 "Processing scheduled account deletions" 465 ); 466 467 for account in accounts_to_delete { 468 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 469 warn!( 470 did = %account.did, 471 handle = %account.handle, 472 error = %e, 473 "Failed to delete scheduled account" 474 ); 475 } else { 476 info!( 477 did = %account.did, 478 handle = %account.handle, 479 "Successfully deleted scheduled account" 480 ); 481 } 482 } 483 484 Ok(()) 485} 486 487async fn delete_account_data( 488 db: &PgPool, 489 blob_store: &Arc<dyn BlobStorage>, 490 did: &str, 491 _handle: &str, 492) -> Result<(), String> { 493 let user_id: uuid::Uuid = sqlx::query_scalar!( 494 "SELECT id FROM users WHERE did = $1", 495 did 496 ) 497 .fetch_one(db) 498 .await 499 .map_err(|e| format!("DB error fetching user: {}", e))?; 500 501 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 502 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 503 user_id 504 ) 505 .fetch_all(db) 506 .await 507 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 508 509 for storage_key in &blob_storage_keys { 510 if let Err(e) = blob_store.delete(storage_key).await { 511 warn!( 512 storage_key = %storage_key, 513 error = %e, 514 "Failed to delete blob from storage (continuing anyway)" 515 ); 516 } 517 } 518 519 let mut tx = db 520 .begin() 521 .await 522 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 523 524 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 525 .execute(&mut *tx) 526 .await 527 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 528 529 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 530 .execute(&mut *tx) 531 .await 532 .map_err(|e| format!("Failed to delete user: {}", e))?; 533 534 let account_seq = sqlx::query_scalar!( 535 r#" 536 INSERT INTO repo_seq (did, event_type, active, status) 537 VALUES ($1, 'account', false, 'deleted') 538 RETURNING seq 539 "#, 540 did 541 ) 542 .fetch_one(&mut *tx) 543 .await 544 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 545 546 sqlx::query!( 547 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 548 did, 549 account_seq 550 ) 551 .execute(&mut *tx) 552 .await 553 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 554 555 tx.commit() 556 .await 557 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 558 559 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 560 .execute(db) 561 .await 562 .map_err(|e| format!("Failed to notify: {}", e))?; 563 564 info!( 565 did = %did, 566 blob_count = blob_storage_keys.len(), 567 "Deleted account data including blobs from storage" 568 ); 569 570 Ok(()) 571}