this repo has no description
1use cid::Cid; 2use ipld_core::ipld::Ipld; 3use jacquard_repo::commit::Commit; 4use jacquard_repo::storage::BlockStore; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::BlobStorage; 15 16pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 17 let broken_genesis_commits = match sqlx::query!( 18 r#" 19 SELECT seq, did, commit_cid 20 FROM repo_seq 21 WHERE event_type = 'commit' 22 AND prev_cid IS NULL 23 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 24 "# 25 ) 26 .fetch_all(db) 27 .await 28 { 29 Ok(rows) => rows, 30 Err(e) => { 31 error!("Failed to query repo_seq for genesis commit backfill: {}", e); 32 return; 33 } 34 }; 35 36 if broken_genesis_commits.is_empty() { 37 debug!("No genesis commits need blocks_cids backfill"); 38 return; 39 } 40 41 info!( 42 count = broken_genesis_commits.len(), 43 "Backfilling blocks_cids for genesis commits" 44 ); 45 46 let mut success = 0; 47 let mut failed = 0; 48 49 for commit_row in broken_genesis_commits { 50 let commit_cid_str = match &commit_row.commit_cid { 51 Some(c) => c.clone(), 52 None => { 53 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 54 failed += 1; 55 continue; 56 } 57 }; 58 59 let commit_cid = match Cid::from_str(&commit_cid_str) { 60 Ok(c) => c, 61 Err(_) => { 62 warn!(seq = commit_row.seq, "Invalid commit CID"); 63 failed += 1; 64 continue; 65 } 66 }; 67 68 let block = match block_store.get(&commit_cid).await { 69 Ok(Some(b)) => b, 70 Ok(None) => { 71 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 72 failed += 1; 73 continue; 74 } 75 Err(e) => { 76 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 77 failed += 1; 78 continue; 79 } 80 }; 81 82 let commit = match Commit::from_cbor(&block) { 83 Ok(c) => c, 84 Err(e) => { 85 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 86 failed += 1; 87 continue; 88 } 89 }; 90 91 let mst_root_cid = commit.data; 92 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 93 94 if let Err(e) = sqlx::query!( 95 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 96 &blocks_cids, 97 commit_row.seq 98 ) 99 .execute(db) 100 .await 101 { 102 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 103 failed += 1; 104 } else { 105 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 106 success += 1; 107 } 108 } 109 110 info!( 111 success, 112 failed, "Completed genesis commit blocks_cids backfill" 113 ); 114} 115 116pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 117 let repos_missing_rev = 118 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL") 119 .fetch_all(db) 120 .await 121 { 122 Ok(rows) => rows, 123 Err(e) => { 124 error!("Failed to query repos for backfill: {}", e); 125 return; 126 } 127 }; 128 129 if repos_missing_rev.is_empty() { 130 debug!("No repos need repo_rev backfill"); 131 return; 132 } 133 134 info!( 135 count = repos_missing_rev.len(), 136 "Backfilling repo_rev for existing repos" 137 ); 138 139 let mut success = 0; 140 let mut failed = 0; 141 142 for repo in repos_missing_rev { 143 let cid = match Cid::from_str(&repo.repo_root_cid) { 144 Ok(c) => c, 145 Err(_) => { 146 failed += 1; 147 continue; 148 } 149 }; 150 151 let block = match block_store.get(&cid).await { 152 Ok(Some(b)) => b, 153 _ => { 154 failed += 1; 155 continue; 156 } 157 }; 158 159 let commit = match Commit::from_cbor(&block) { 160 Ok(c) => c, 161 Err(_) => { 162 failed += 1; 163 continue; 164 } 165 }; 166 167 let rev = commit.rev().to_string(); 168 169 if let Err(e) = sqlx::query!( 170 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 171 rev, 172 repo.user_id 173 ) 174 .execute(db) 175 .await 176 { 177 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 178 failed += 1; 179 } else { 180 success += 1; 181 } 182 } 183 184 info!(success, failed, "Completed repo_rev backfill"); 185} 186 187pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 188 let users_without_blocks = match sqlx::query!( 189 r#" 190 SELECT u.id as user_id, r.repo_root_cid 191 FROM users u 192 JOIN repos r ON r.user_id = u.id 193 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 194 "# 195 ) 196 .fetch_all(db) 197 .await 198 { 199 Ok(rows) => rows, 200 Err(e) => { 201 error!("Failed to query users for user_blocks backfill: {}", e); 202 return; 203 } 204 }; 205 206 if users_without_blocks.is_empty() { 207 debug!("No users need user_blocks backfill"); 208 return; 209 } 210 211 info!( 212 count = users_without_blocks.len(), 213 "Backfilling user_blocks for existing repos" 214 ); 215 216 let mut success = 0; 217 let mut failed = 0; 218 219 for user in users_without_blocks { 220 let root_cid = match Cid::from_str(&user.repo_root_cid) { 221 Ok(c) => c, 222 Err(_) => { 223 failed += 1; 224 continue; 225 } 226 }; 227 228 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 229 let mut to_visit = vec![root_cid]; 230 let mut visited = std::collections::HashSet::new(); 231 232 while let Some(cid) = to_visit.pop() { 233 if visited.contains(&cid) { 234 continue; 235 } 236 visited.insert(cid); 237 block_cids.push(cid.to_bytes()); 238 239 let block = match block_store.get(&cid).await { 240 Ok(Some(b)) => b, 241 _ => continue, 242 }; 243 244 if let Ok(commit) = Commit::from_cbor(&block) { 245 to_visit.push(commit.data); 246 if let Some(prev) = commit.prev { 247 to_visit.push(prev); 248 } 249 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 250 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 251 to_visit.push(*left_cid); 252 } 253 if let Some(Ipld::List(entries)) = obj.get("e") { 254 for entry in entries { 255 if let Ipld::Map(entry_obj) = entry { 256 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 257 to_visit.push(*tree_cid); 258 } 259 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 260 to_visit.push(*val_cid); 261 } 262 } 263 } 264 } 265 } 266 } 267 268 if block_cids.is_empty() { 269 failed += 1; 270 continue; 271 } 272 273 if let Err(e) = sqlx::query!( 274 r#" 275 INSERT INTO user_blocks (user_id, block_cid) 276 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 277 ON CONFLICT (user_id, block_cid) DO NOTHING 278 "#, 279 user.user_id, 280 &block_cids 281 ) 282 .execute(db) 283 .await 284 { 285 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 286 failed += 1; 287 } else { 288 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 289 success += 1; 290 } 291 } 292 293 info!(success, failed, "Completed user_blocks backfill"); 294} 295 296pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { 297 let users_needing_backfill = match sqlx::query!( 298 r#" 299 SELECT DISTINCT u.id as user_id, u.did 300 FROM users u 301 JOIN records r ON r.repo_id = u.id 302 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id) 303 LIMIT 100 304 "# 305 ) 306 .fetch_all(db) 307 .await 308 { 309 Ok(rows) => rows, 310 Err(e) => { 311 error!("Failed to query users for record_blobs backfill: {}", e); 312 return; 313 } 314 }; 315 316 if users_needing_backfill.is_empty() { 317 debug!("No users need record_blobs backfill"); 318 return; 319 } 320 321 info!( 322 count = users_needing_backfill.len(), 323 "Backfilling record_blobs for existing repos" 324 ); 325 326 let mut success = 0; 327 let mut failed = 0; 328 329 for user in users_needing_backfill { 330 let records = match sqlx::query!( 331 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 332 user.user_id 333 ) 334 .fetch_all(db) 335 .await 336 { 337 Ok(r) => r, 338 Err(e) => { 339 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill"); 340 failed += 1; 341 continue; 342 } 343 }; 344 345 let mut blob_refs_found = 0; 346 for record in records { 347 let record_cid = match Cid::from_str(&record.record_cid) { 348 Ok(c) => c, 349 Err(_) => continue, 350 }; 351 352 let block_bytes = match block_store.get(&record_cid).await { 353 Ok(Some(b)) => b, 354 _ => continue, 355 }; 356 357 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { 358 Ok(v) => v, 359 Err(_) => continue, 360 }; 361 362 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 363 for blob_ref in blob_refs { 364 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey); 365 if let Err(e) = sqlx::query!( 366 r#" 367 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 368 VALUES ($1, $2, $3) 369 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 370 "#, 371 user.user_id, 372 record_uri, 373 blob_ref.cid 374 ) 375 .execute(db) 376 .await 377 { 378 warn!(error = %e, "Failed to insert record_blob during backfill"); 379 } else { 380 blob_refs_found += 1; 381 } 382 } 383 } 384 385 if blob_refs_found > 0 { 386 info!( 387 user_id = %user.user_id, 388 did = %user.did, 389 blob_refs = blob_refs_found, 390 "Backfilled record_blobs" 391 ); 392 } 393 success += 1; 394 } 395 396 info!(success, failed, "Completed record_blobs backfill"); 397} 398 399pub async fn start_scheduled_tasks( 400 db: PgPool, 401 blob_store: Arc<dyn BlobStorage>, 402 mut shutdown_rx: watch::Receiver<bool>, 403) { 404 let check_interval = Duration::from_secs( 405 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 406 .ok() 407 .and_then(|s| s.parse().ok()) 408 .unwrap_or(3600), 409 ); 410 411 info!( 412 check_interval_secs = check_interval.as_secs(), 413 "Starting scheduled tasks service" 414 ); 415 416 let mut ticker = interval(check_interval); 417 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 418 419 loop { 420 tokio::select! { 421 _ = shutdown_rx.changed() => { 422 if *shutdown_rx.borrow() { 423 info!("Scheduled tasks service shutting down"); 424 break; 425 } 426 } 427 _ = ticker.tick() => { 428 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await { 429 error!("Error processing scheduled deletions: {}", e); 430 } 431 } 432 } 433 } 434} 435 436async fn process_scheduled_deletions( 437 db: &PgPool, 438 blob_store: &Arc<dyn BlobStorage>, 439) -> Result<(), String> { 440 let accounts_to_delete = sqlx::query!( 441 r#" 442 SELECT did, handle 443 FROM users 444 WHERE delete_after IS NOT NULL 445 AND delete_after < NOW() 446 AND deactivated_at IS NOT NULL 447 LIMIT 100 448 "# 449 ) 450 .fetch_all(db) 451 .await 452 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 453 454 if accounts_to_delete.is_empty() { 455 debug!("No accounts scheduled for deletion"); 456 return Ok(()); 457 } 458 459 info!( 460 count = accounts_to_delete.len(), 461 "Processing scheduled account deletions" 462 ); 463 464 for account in accounts_to_delete { 465 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 466 warn!( 467 did = %account.did, 468 handle = %account.handle, 469 error = %e, 470 "Failed to delete scheduled account" 471 ); 472 } else { 473 info!( 474 did = %account.did, 475 handle = %account.handle, 476 "Successfully deleted scheduled account" 477 ); 478 } 479 } 480 481 Ok(()) 482} 483 484async fn delete_account_data( 485 db: &PgPool, 486 blob_store: &Arc<dyn BlobStorage>, 487 did: &str, 488 _handle: &str, 489) -> Result<(), String> { 490 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 491 .fetch_one(db) 492 .await 493 .map_err(|e| format!("DB error fetching user: {}", e))?; 494 495 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 496 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 497 user_id 498 ) 499 .fetch_all(db) 500 .await 501 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 502 503 for storage_key in &blob_storage_keys { 504 if let Err(e) = blob_store.delete(storage_key).await { 505 warn!( 506 storage_key = %storage_key, 507 error = %e, 508 "Failed to delete blob from storage (continuing anyway)" 509 ); 510 } 511 } 512 513 let mut tx = db 514 .begin() 515 .await 516 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 517 518 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 519 .execute(&mut *tx) 520 .await 521 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 522 523 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 524 .execute(&mut *tx) 525 .await 526 .map_err(|e| format!("Failed to delete user: {}", e))?; 527 528 let account_seq = sqlx::query_scalar!( 529 r#" 530 INSERT INTO repo_seq (did, event_type, active, status) 531 VALUES ($1, 'account', false, 'deleted') 532 RETURNING seq 533 "#, 534 did 535 ) 536 .fetch_one(&mut *tx) 537 .await 538 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 539 540 sqlx::query!( 541 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 542 did, 543 account_seq 544 ) 545 .execute(&mut *tx) 546 .await 547 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 548 549 tx.commit() 550 .await 551 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 552 553 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 554 .execute(db) 555 .await 556 .map_err(|e| format!("Failed to notify: {}", e))?; 557 558 info!( 559 did = %did, 560 blob_count = blob_storage_keys.len(), 561 "Deleted account data including blobs from storage" 562 ); 563 564 Ok(()) 565}