this repo has no description
1use cid::Cid; 2use jacquard_repo::commit::Commit; 3use jacquard_repo::storage::BlockStore; 4use ipld_core::ipld::Ipld; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::BlobStorage; 15 16pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 17 let broken_genesis_commits = match sqlx::query!( 18 r#" 19 SELECT seq, did, commit_cid 20 FROM repo_seq 21 WHERE event_type = 'commit' 22 AND prev_cid IS NULL 23 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 24 "# 25 ) 26 .fetch_all(db) 27 .await 28 { 29 Ok(rows) => rows, 30 Err(e) => { 31 error!("Failed to query repo_seq for genesis commit backfill: {}", e); 32 return; 33 } 34 }; 35 36 if broken_genesis_commits.is_empty() { 37 debug!("No genesis commits need blocks_cids backfill"); 38 return; 39 } 40 41 info!( 42 count = broken_genesis_commits.len(), 43 "Backfilling blocks_cids for genesis commits" 44 ); 45 46 let mut success = 0; 47 let mut failed = 0; 48 49 for commit_row in broken_genesis_commits { 50 let commit_cid_str = match &commit_row.commit_cid { 51 Some(c) => c.clone(), 52 None => { 53 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 54 failed += 1; 55 continue; 56 } 57 }; 58 59 let commit_cid = match Cid::from_str(&commit_cid_str) { 60 Ok(c) => c, 61 Err(_) => { 62 warn!(seq = commit_row.seq, "Invalid commit CID"); 63 failed += 1; 64 continue; 65 } 66 }; 67 68 let block = match block_store.get(&commit_cid).await { 69 Ok(Some(b)) => b, 70 Ok(None) => { 71 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 72 failed += 1; 73 continue; 74 } 75 Err(e) => { 76 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 77 failed += 1; 78 continue; 79 } 80 }; 81 82 let commit = match Commit::from_cbor(&block) { 83 Ok(c) => c, 84 Err(e) => { 85 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 86 failed += 1; 87 continue; 88 } 89 }; 90 91 let mst_root_cid = commit.data; 92 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 93 94 if let Err(e) = sqlx::query!( 95 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 96 &blocks_cids, 97 commit_row.seq 98 ) 99 .execute(db) 100 .await 101 { 102 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 103 failed += 1; 104 } else { 105 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 106 success += 1; 107 } 108 } 109 110 info!(success, failed, "Completed genesis commit blocks_cids backfill"); 111} 112 113pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 114 let repos_missing_rev = match sqlx::query!( 115 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL" 116 ) 117 .fetch_all(db) 118 .await 119 { 120 Ok(rows) => rows, 121 Err(e) => { 122 error!("Failed to query repos for backfill: {}", e); 123 return; 124 } 125 }; 126 127 if repos_missing_rev.is_empty() { 128 debug!("No repos need repo_rev backfill"); 129 return; 130 } 131 132 info!( 133 count = repos_missing_rev.len(), 134 "Backfilling repo_rev for existing repos" 135 ); 136 137 let mut success = 0; 138 let mut failed = 0; 139 140 for repo in repos_missing_rev { 141 let cid = match Cid::from_str(&repo.repo_root_cid) { 142 Ok(c) => c, 143 Err(_) => { 144 failed += 1; 145 continue; 146 } 147 }; 148 149 let block = match block_store.get(&cid).await { 150 Ok(Some(b)) => b, 151 _ => { 152 failed += 1; 153 continue; 154 } 155 }; 156 157 let commit = match Commit::from_cbor(&block) { 158 Ok(c) => c, 159 Err(_) => { 160 failed += 1; 161 continue; 162 } 163 }; 164 165 let rev = commit.rev().to_string(); 166 167 if let Err(e) = sqlx::query!( 168 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 169 rev, 170 repo.user_id 171 ) 172 .execute(db) 173 .await 174 { 175 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 176 failed += 1; 177 } else { 178 success += 1; 179 } 180 } 181 182 info!(success, failed, "Completed repo_rev backfill"); 183} 184 185pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 186 let users_without_blocks = match sqlx::query!( 187 r#" 188 SELECT u.id as user_id, r.repo_root_cid 189 FROM users u 190 JOIN repos r ON r.user_id = u.id 191 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 192 "# 193 ) 194 .fetch_all(db) 195 .await 196 { 197 Ok(rows) => rows, 198 Err(e) => { 199 error!("Failed to query users for user_blocks backfill: {}", e); 200 return; 201 } 202 }; 203 204 if users_without_blocks.is_empty() { 205 debug!("No users need user_blocks backfill"); 206 return; 207 } 208 209 info!( 210 count = users_without_blocks.len(), 211 "Backfilling user_blocks for existing repos" 212 ); 213 214 let mut success = 0; 215 let mut failed = 0; 216 217 for user in users_without_blocks { 218 let root_cid = match Cid::from_str(&user.repo_root_cid) { 219 Ok(c) => c, 220 Err(_) => { 221 failed += 1; 222 continue; 223 } 224 }; 225 226 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 227 let mut to_visit = vec![root_cid]; 228 let mut visited = std::collections::HashSet::new(); 229 230 while let Some(cid) = to_visit.pop() { 231 if visited.contains(&cid) { 232 continue; 233 } 234 visited.insert(cid); 235 block_cids.push(cid.to_bytes()); 236 237 let block = match block_store.get(&cid).await { 238 Ok(Some(b)) => b, 239 _ => continue, 240 }; 241 242 if let Ok(commit) = Commit::from_cbor(&block) { 243 to_visit.push(commit.data); 244 if let Some(prev) = commit.prev { 245 to_visit.push(prev); 246 } 247 } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 248 if let Ipld::Map(ref obj) = ipld { 249 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 250 to_visit.push(*left_cid); 251 } 252 if let Some(Ipld::List(entries)) = obj.get("e") { 253 for entry in entries { 254 if let Ipld::Map(entry_obj) = entry { 255 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 256 to_visit.push(*tree_cid); 257 } 258 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 259 to_visit.push(*val_cid); 260 } 261 } 262 } 263 } 264 } 265 } 266 } 267 268 if block_cids.is_empty() { 269 failed += 1; 270 continue; 271 } 272 273 if let Err(e) = sqlx::query!( 274 r#" 275 INSERT INTO user_blocks (user_id, block_cid) 276 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 277 ON CONFLICT (user_id, block_cid) DO NOTHING 278 "#, 279 user.user_id, 280 &block_cids 281 ) 282 .execute(db) 283 .await 284 { 285 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 286 failed += 1; 287 } else { 288 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 289 success += 1; 290 } 291 } 292 293 info!(success, failed, "Completed user_blocks backfill"); 294} 295 296pub async fn start_scheduled_tasks( 297 db: PgPool, 298 blob_store: Arc<dyn BlobStorage>, 299 mut shutdown_rx: watch::Receiver<bool>, 300) { 301 let check_interval = Duration::from_secs( 302 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 303 .ok() 304 .and_then(|s| s.parse().ok()) 305 .unwrap_or(3600), 306 ); 307 308 info!( 309 check_interval_secs = check_interval.as_secs(), 310 "Starting scheduled tasks service" 311 ); 312 313 let mut ticker = interval(check_interval); 314 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 315 316 loop { 317 tokio::select! { 318 _ = shutdown_rx.changed() => { 319 if *shutdown_rx.borrow() { 320 info!("Scheduled tasks service shutting down"); 321 break; 322 } 323 } 324 _ = ticker.tick() => { 325 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await { 326 error!("Error processing scheduled deletions: {}", e); 327 } 328 } 329 } 330 } 331} 332 333async fn process_scheduled_deletions( 334 db: &PgPool, 335 blob_store: &Arc<dyn BlobStorage>, 336) -> Result<(), String> { 337 let accounts_to_delete = sqlx::query!( 338 r#" 339 SELECT did, handle 340 FROM users 341 WHERE delete_after IS NOT NULL 342 AND delete_after < NOW() 343 AND deactivated_at IS NOT NULL 344 LIMIT 100 345 "# 346 ) 347 .fetch_all(db) 348 .await 349 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 350 351 if accounts_to_delete.is_empty() { 352 debug!("No accounts scheduled for deletion"); 353 return Ok(()); 354 } 355 356 info!( 357 count = accounts_to_delete.len(), 358 "Processing scheduled account deletions" 359 ); 360 361 for account in accounts_to_delete { 362 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 363 warn!( 364 did = %account.did, 365 handle = %account.handle, 366 error = %e, 367 "Failed to delete scheduled account" 368 ); 369 } else { 370 info!( 371 did = %account.did, 372 handle = %account.handle, 373 "Successfully deleted scheduled account" 374 ); 375 } 376 } 377 378 Ok(()) 379} 380 381async fn delete_account_data( 382 db: &PgPool, 383 blob_store: &Arc<dyn BlobStorage>, 384 did: &str, 385 _handle: &str, 386) -> Result<(), String> { 387 let user_id: uuid::Uuid = sqlx::query_scalar!( 388 "SELECT id FROM users WHERE did = $1", 389 did 390 ) 391 .fetch_one(db) 392 .await 393 .map_err(|e| format!("DB error fetching user: {}", e))?; 394 395 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 396 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 397 user_id 398 ) 399 .fetch_all(db) 400 .await 401 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 402 403 for storage_key in &blob_storage_keys { 404 if let Err(e) = blob_store.delete(storage_key).await { 405 warn!( 406 storage_key = %storage_key, 407 error = %e, 408 "Failed to delete blob from storage (continuing anyway)" 409 ); 410 } 411 } 412 413 let mut tx = db 414 .begin() 415 .await 416 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 417 418 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 419 .execute(&mut *tx) 420 .await 421 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 422 423 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 424 .execute(&mut *tx) 425 .await 426 .map_err(|e| format!("Failed to delete user: {}", e))?; 427 428 let account_seq = sqlx::query_scalar!( 429 r#" 430 INSERT INTO repo_seq (did, event_type, active, status) 431 VALUES ($1, 'account', false, 'deleted') 432 RETURNING seq 433 "#, 434 did 435 ) 436 .fetch_one(&mut *tx) 437 .await 438 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 439 440 sqlx::query!( 441 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 442 did, 443 account_seq 444 ) 445 .execute(&mut *tx) 446 .await 447 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 448 449 tx.commit() 450 .await 451 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 452 453 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 454 .execute(db) 455 .await 456 .map_err(|e| format!("Failed to notify: {}", e))?; 457 458 info!( 459 did = %did, 460 blob_count = blob_storage_keys.len(), 461 "Deleted account data including blobs from storage" 462 ); 463 464 Ok(()) 465}