this repo has no description
1use cid::Cid; 2use jacquard_repo::commit::Commit; 3use jacquard_repo::storage::BlockStore; 4use ipld_core::ipld::Ipld; 5use sqlx::PgPool; 6use std::str::FromStr; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::watch; 10use tokio::time::interval; 11use tracing::{debug, error, info, warn}; 12 13use crate::repo::PostgresBlockStore; 14use crate::storage::BlobStorage; 15 16pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 17 let repos_missing_rev = match sqlx::query!( 18 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL" 19 ) 20 .fetch_all(db) 21 .await 22 { 23 Ok(rows) => rows, 24 Err(e) => { 25 error!("Failed to query repos for backfill: {}", e); 26 return; 27 } 28 }; 29 30 if repos_missing_rev.is_empty() { 31 debug!("No repos need repo_rev backfill"); 32 return; 33 } 34 35 info!( 36 count = repos_missing_rev.len(), 37 "Backfilling repo_rev for existing repos" 38 ); 39 40 let mut success = 0; 41 let mut failed = 0; 42 43 for repo in repos_missing_rev { 44 let cid = match Cid::from_str(&repo.repo_root_cid) { 45 Ok(c) => c, 46 Err(_) => { 47 failed += 1; 48 continue; 49 } 50 }; 51 52 let block = match block_store.get(&cid).await { 53 Ok(Some(b)) => b, 54 _ => { 55 failed += 1; 56 continue; 57 } 58 }; 59 60 let commit = match Commit::from_cbor(&block) { 61 Ok(c) => c, 62 Err(_) => { 63 failed += 1; 64 continue; 65 } 66 }; 67 68 let rev = commit.rev().to_string(); 69 70 if let Err(e) = sqlx::query!( 71 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 72 rev, 73 repo.user_id 74 ) 75 .execute(db) 76 .await 77 { 78 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 79 failed += 1; 80 } else { 81 success += 1; 82 } 83 } 84 85 info!(success, failed, "Completed repo_rev backfill"); 86} 87 88pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 89 let users_without_blocks = match sqlx::query!( 90 r#" 91 SELECT u.id as user_id, r.repo_root_cid 92 FROM users u 93 JOIN repos r ON r.user_id = u.id 94 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 95 "# 96 ) 97 .fetch_all(db) 98 .await 99 { 100 Ok(rows) => rows, 101 Err(e) => { 102 error!("Failed to query users for user_blocks backfill: {}", e); 103 return; 104 } 105 }; 106 107 if users_without_blocks.is_empty() { 108 debug!("No users need user_blocks backfill"); 109 return; 110 } 111 112 info!( 113 count = users_without_blocks.len(), 114 "Backfilling user_blocks for existing repos" 115 ); 116 117 let mut success = 0; 118 let mut failed = 0; 119 120 for user in users_without_blocks { 121 let root_cid = match Cid::from_str(&user.repo_root_cid) { 122 Ok(c) => c, 123 Err(_) => { 124 failed += 1; 125 continue; 126 } 127 }; 128 129 let mut block_cids: Vec<Vec<u8>> = Vec::new(); 130 let mut to_visit = vec![root_cid]; 131 let mut visited = std::collections::HashSet::new(); 132 133 while let Some(cid) = to_visit.pop() { 134 if visited.contains(&cid) { 135 continue; 136 } 137 visited.insert(cid); 138 block_cids.push(cid.to_bytes()); 139 140 let block = match block_store.get(&cid).await { 141 Ok(Some(b)) => b, 142 _ => continue, 143 }; 144 145 if let Ok(commit) = Commit::from_cbor(&block) { 146 to_visit.push(commit.data); 147 if let Some(prev) = commit.prev { 148 to_visit.push(prev); 149 } 150 } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 151 if let Ipld::Map(ref obj) = ipld { 152 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 153 to_visit.push(*left_cid); 154 } 155 if let Some(Ipld::List(entries)) = obj.get("e") { 156 for entry in entries { 157 if let Ipld::Map(entry_obj) = entry { 158 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 159 to_visit.push(*tree_cid); 160 } 161 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 162 to_visit.push(*val_cid); 163 } 164 } 165 } 166 } 167 } 168 } 169 } 170 171 if block_cids.is_empty() { 172 failed += 1; 173 continue; 174 } 175 176 if let Err(e) = sqlx::query!( 177 r#" 178 INSERT INTO user_blocks (user_id, block_cid) 179 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 180 ON CONFLICT (user_id, block_cid) DO NOTHING 181 "#, 182 user.user_id, 183 &block_cids 184 ) 185 .execute(db) 186 .await 187 { 188 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 189 failed += 1; 190 } else { 191 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 192 success += 1; 193 } 194 } 195 196 info!(success, failed, "Completed user_blocks backfill"); 197} 198 199pub async fn start_scheduled_tasks( 200 db: PgPool, 201 blob_store: Arc<dyn BlobStorage>, 202 mut shutdown_rx: watch::Receiver<bool>, 203) { 204 let check_interval = Duration::from_secs( 205 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 206 .ok() 207 .and_then(|s| s.parse().ok()) 208 .unwrap_or(3600), 209 ); 210 211 info!( 212 check_interval_secs = check_interval.as_secs(), 213 "Starting scheduled tasks service" 214 ); 215 216 let mut ticker = interval(check_interval); 217 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 218 219 loop { 220 tokio::select! { 221 _ = shutdown_rx.changed() => { 222 if *shutdown_rx.borrow() { 223 info!("Scheduled tasks service shutting down"); 224 break; 225 } 226 } 227 _ = ticker.tick() => { 228 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await { 229 error!("Error processing scheduled deletions: {}", e); 230 } 231 } 232 } 233 } 234} 235 236async fn process_scheduled_deletions( 237 db: &PgPool, 238 blob_store: &Arc<dyn BlobStorage>, 239) -> Result<(), String> { 240 let accounts_to_delete = sqlx::query!( 241 r#" 242 SELECT did, handle 243 FROM users 244 WHERE delete_after IS NOT NULL 245 AND delete_after < NOW() 246 AND deactivated_at IS NOT NULL 247 LIMIT 100 248 "# 249 ) 250 .fetch_all(db) 251 .await 252 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 253 254 if accounts_to_delete.is_empty() { 255 debug!("No accounts scheduled for deletion"); 256 return Ok(()); 257 } 258 259 info!( 260 count = accounts_to_delete.len(), 261 "Processing scheduled account deletions" 262 ); 263 264 for account in accounts_to_delete { 265 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 266 warn!( 267 did = %account.did, 268 handle = %account.handle, 269 error = %e, 270 "Failed to delete scheduled account" 271 ); 272 } else { 273 info!( 274 did = %account.did, 275 handle = %account.handle, 276 "Successfully deleted scheduled account" 277 ); 278 } 279 } 280 281 Ok(()) 282} 283 284async fn delete_account_data( 285 db: &PgPool, 286 blob_store: &Arc<dyn BlobStorage>, 287 did: &str, 288 _handle: &str, 289) -> Result<(), String> { 290 let user_id: uuid::Uuid = sqlx::query_scalar!( 291 "SELECT id FROM users WHERE did = $1", 292 did 293 ) 294 .fetch_one(db) 295 .await 296 .map_err(|e| format!("DB error fetching user: {}", e))?; 297 298 let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 299 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 300 user_id 301 ) 302 .fetch_all(db) 303 .await 304 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 305 306 for storage_key in &blob_storage_keys { 307 if let Err(e) = blob_store.delete(storage_key).await { 308 warn!( 309 storage_key = %storage_key, 310 error = %e, 311 "Failed to delete blob from storage (continuing anyway)" 312 ); 313 } 314 } 315 316 let mut tx = db 317 .begin() 318 .await 319 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 320 321 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 322 .execute(&mut *tx) 323 .await 324 .map_err(|e| format!("Failed to delete blobs: {}", e))?; 325 326 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 327 .execute(&mut *tx) 328 .await 329 .map_err(|e| format!("Failed to delete user: {}", e))?; 330 331 let account_seq = sqlx::query_scalar!( 332 r#" 333 INSERT INTO repo_seq (did, event_type, active, status) 334 VALUES ($1, 'account', false, 'deleted') 335 RETURNING seq 336 "#, 337 did 338 ) 339 .fetch_one(&mut *tx) 340 .await 341 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 342 343 sqlx::query!( 344 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 345 did, 346 account_seq 347 ) 348 .execute(&mut *tx) 349 .await 350 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 351 352 tx.commit() 353 .await 354 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 355 356 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 357 .execute(db) 358 .await 359 .map_err(|e| format!("Failed to notify: {}", e))?; 360 361 info!( 362 did = %did, 363 blob_count = blob_storage_keys.len(), 364 "Deleted account data including blobs from storage" 365 ); 366 367 Ok(()) 368}