this repo has no description
1use cid::Cid;
2use jacquard_repo::commit::Commit;
3use jacquard_repo::storage::BlockStore;
4use ipld_core::ipld::Ipld;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::BlobStorage;
15
16pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
17 let broken_genesis_commits = match sqlx::query!(
18 r#"
19 SELECT seq, did, commit_cid
20 FROM repo_seq
21 WHERE event_type = 'commit'
22 AND prev_cid IS NULL
23 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)
24 "#
25 )
26 .fetch_all(db)
27 .await
28 {
29 Ok(rows) => rows,
30 Err(e) => {
31 error!("Failed to query repo_seq for genesis commit backfill: {}", e);
32 return;
33 }
34 };
35
36 if broken_genesis_commits.is_empty() {
37 debug!("No genesis commits need blocks_cids backfill");
38 return;
39 }
40
41 info!(
42 count = broken_genesis_commits.len(),
43 "Backfilling blocks_cids for genesis commits"
44 );
45
46 let mut success = 0;
47 let mut failed = 0;
48
49 for commit_row in broken_genesis_commits {
50 let commit_cid_str = match &commit_row.commit_cid {
51 Some(c) => c.clone(),
52 None => {
53 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid");
54 failed += 1;
55 continue;
56 }
57 };
58
59 let commit_cid = match Cid::from_str(&commit_cid_str) {
60 Ok(c) => c,
61 Err(_) => {
62 warn!(seq = commit_row.seq, "Invalid commit CID");
63 failed += 1;
64 continue;
65 }
66 };
67
68 let block = match block_store.get(&commit_cid).await {
69 Ok(Some(b)) => b,
70 Ok(None) => {
71 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store");
72 failed += 1;
73 continue;
74 }
75 Err(e) => {
76 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block");
77 failed += 1;
78 continue;
79 }
80 };
81
82 let commit = match Commit::from_cbor(&block) {
83 Ok(c) => c,
84 Err(e) => {
85 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit");
86 failed += 1;
87 continue;
88 }
89 };
90
91 let mst_root_cid = commit.data;
92 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
93
94 if let Err(e) = sqlx::query!(
95 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
96 &blocks_cids,
97 commit_row.seq
98 )
99 .execute(db)
100 .await
101 {
102 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids");
103 failed += 1;
104 } else {
105 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids");
106 success += 1;
107 }
108 }
109
110 info!(success, failed, "Completed genesis commit blocks_cids backfill");
111}
112
113pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
114 let repos_missing_rev = match sqlx::query!(
115 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL"
116 )
117 .fetch_all(db)
118 .await
119 {
120 Ok(rows) => rows,
121 Err(e) => {
122 error!("Failed to query repos for backfill: {}", e);
123 return;
124 }
125 };
126
127 if repos_missing_rev.is_empty() {
128 debug!("No repos need repo_rev backfill");
129 return;
130 }
131
132 info!(
133 count = repos_missing_rev.len(),
134 "Backfilling repo_rev for existing repos"
135 );
136
137 let mut success = 0;
138 let mut failed = 0;
139
140 for repo in repos_missing_rev {
141 let cid = match Cid::from_str(&repo.repo_root_cid) {
142 Ok(c) => c,
143 Err(_) => {
144 failed += 1;
145 continue;
146 }
147 };
148
149 let block = match block_store.get(&cid).await {
150 Ok(Some(b)) => b,
151 _ => {
152 failed += 1;
153 continue;
154 }
155 };
156
157 let commit = match Commit::from_cbor(&block) {
158 Ok(c) => c,
159 Err(_) => {
160 failed += 1;
161 continue;
162 }
163 };
164
165 let rev = commit.rev().to_string();
166
167 if let Err(e) = sqlx::query!(
168 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
169 rev,
170 repo.user_id
171 )
172 .execute(db)
173 .await
174 {
175 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
176 failed += 1;
177 } else {
178 success += 1;
179 }
180 }
181
182 info!(success, failed, "Completed repo_rev backfill");
183}
184
185pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
186 let users_without_blocks = match sqlx::query!(
187 r#"
188 SELECT u.id as user_id, r.repo_root_cid
189 FROM users u
190 JOIN repos r ON r.user_id = u.id
191 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
192 "#
193 )
194 .fetch_all(db)
195 .await
196 {
197 Ok(rows) => rows,
198 Err(e) => {
199 error!("Failed to query users for user_blocks backfill: {}", e);
200 return;
201 }
202 };
203
204 if users_without_blocks.is_empty() {
205 debug!("No users need user_blocks backfill");
206 return;
207 }
208
209 info!(
210 count = users_without_blocks.len(),
211 "Backfilling user_blocks for existing repos"
212 );
213
214 let mut success = 0;
215 let mut failed = 0;
216
217 for user in users_without_blocks {
218 let root_cid = match Cid::from_str(&user.repo_root_cid) {
219 Ok(c) => c,
220 Err(_) => {
221 failed += 1;
222 continue;
223 }
224 };
225
226 let mut block_cids: Vec<Vec<u8>> = Vec::new();
227 let mut to_visit = vec![root_cid];
228 let mut visited = std::collections::HashSet::new();
229
230 while let Some(cid) = to_visit.pop() {
231 if visited.contains(&cid) {
232 continue;
233 }
234 visited.insert(cid);
235 block_cids.push(cid.to_bytes());
236
237 let block = match block_store.get(&cid).await {
238 Ok(Some(b)) => b,
239 _ => continue,
240 };
241
242 if let Ok(commit) = Commit::from_cbor(&block) {
243 to_visit.push(commit.data);
244 if let Some(prev) = commit.prev {
245 to_visit.push(prev);
246 }
247 } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
248 if let Ipld::Map(ref obj) = ipld {
249 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
250 to_visit.push(*left_cid);
251 }
252 if let Some(Ipld::List(entries)) = obj.get("e") {
253 for entry in entries {
254 if let Ipld::Map(entry_obj) = entry {
255 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
256 to_visit.push(*tree_cid);
257 }
258 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") {
259 to_visit.push(*val_cid);
260 }
261 }
262 }
263 }
264 }
265 }
266 }
267
268 if block_cids.is_empty() {
269 failed += 1;
270 continue;
271 }
272
273 if let Err(e) = sqlx::query!(
274 r#"
275 INSERT INTO user_blocks (user_id, block_cid)
276 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
277 ON CONFLICT (user_id, block_cid) DO NOTHING
278 "#,
279 user.user_id,
280 &block_cids
281 )
282 .execute(db)
283 .await
284 {
285 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
286 failed += 1;
287 } else {
288 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
289 success += 1;
290 }
291 }
292
293 info!(success, failed, "Completed user_blocks backfill");
294}
295
296pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
297 let users_needing_backfill = match sqlx::query!(
298 r#"
299 SELECT DISTINCT u.id as user_id, u.did
300 FROM users u
301 JOIN records r ON r.repo_id = u.id
302 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
303 LIMIT 100
304 "#
305 )
306 .fetch_all(db)
307 .await
308 {
309 Ok(rows) => rows,
310 Err(e) => {
311 error!("Failed to query users for record_blobs backfill: {}", e);
312 return;
313 }
314 };
315
316 if users_needing_backfill.is_empty() {
317 debug!("No users need record_blobs backfill");
318 return;
319 }
320
321 info!(
322 count = users_needing_backfill.len(),
323 "Backfilling record_blobs for existing repos"
324 );
325
326 let mut success = 0;
327 let mut failed = 0;
328
329 for user in users_needing_backfill {
330 let records = match sqlx::query!(
331 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
332 user.user_id
333 )
334 .fetch_all(db)
335 .await
336 {
337 Ok(r) => r,
338 Err(e) => {
339 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
340 failed += 1;
341 continue;
342 }
343 };
344
345 let mut blob_refs_found = 0;
346 for record in records {
347 let record_cid = match Cid::from_str(&record.record_cid) {
348 Ok(c) => c,
349 Err(_) => continue,
350 };
351
352 let block_bytes = match block_store.get(&record_cid).await {
353 Ok(Some(b)) => b,
354 _ => continue,
355 };
356
357 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
358 Ok(v) => v,
359 Err(_) => continue,
360 };
361
362 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
363 for blob_ref in blob_refs {
364 let record_uri = format!(
365 "at://{}/{}/{}",
366 user.did, record.collection, record.rkey
367 );
368 if let Err(e) = sqlx::query!(
369 r#"
370 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
371 VALUES ($1, $2, $3)
372 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
373 "#,
374 user.user_id,
375 record_uri,
376 blob_ref.cid
377 )
378 .execute(db)
379 .await
380 {
381 warn!(error = %e, "Failed to insert record_blob during backfill");
382 } else {
383 blob_refs_found += 1;
384 }
385 }
386 }
387
388 if blob_refs_found > 0 {
389 info!(
390 user_id = %user.user_id,
391 did = %user.did,
392 blob_refs = blob_refs_found,
393 "Backfilled record_blobs"
394 );
395 }
396 success += 1;
397 }
398
399 info!(success, failed, "Completed record_blobs backfill");
400}
401
402pub async fn start_scheduled_tasks(
403 db: PgPool,
404 blob_store: Arc<dyn BlobStorage>,
405 mut shutdown_rx: watch::Receiver<bool>,
406) {
407 let check_interval = Duration::from_secs(
408 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
409 .ok()
410 .and_then(|s| s.parse().ok())
411 .unwrap_or(3600),
412 );
413
414 info!(
415 check_interval_secs = check_interval.as_secs(),
416 "Starting scheduled tasks service"
417 );
418
419 let mut ticker = interval(check_interval);
420 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
421
422 loop {
423 tokio::select! {
424 _ = shutdown_rx.changed() => {
425 if *shutdown_rx.borrow() {
426 info!("Scheduled tasks service shutting down");
427 break;
428 }
429 }
430 _ = ticker.tick() => {
431 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await {
432 error!("Error processing scheduled deletions: {}", e);
433 }
434 }
435 }
436 }
437}
438
439async fn process_scheduled_deletions(
440 db: &PgPool,
441 blob_store: &Arc<dyn BlobStorage>,
442) -> Result<(), String> {
443 let accounts_to_delete = sqlx::query!(
444 r#"
445 SELECT did, handle
446 FROM users
447 WHERE delete_after IS NOT NULL
448 AND delete_after < NOW()
449 AND deactivated_at IS NOT NULL
450 LIMIT 100
451 "#
452 )
453 .fetch_all(db)
454 .await
455 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
456
457 if accounts_to_delete.is_empty() {
458 debug!("No accounts scheduled for deletion");
459 return Ok(());
460 }
461
462 info!(
463 count = accounts_to_delete.len(),
464 "Processing scheduled account deletions"
465 );
466
467 for account in accounts_to_delete {
468 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
469 warn!(
470 did = %account.did,
471 handle = %account.handle,
472 error = %e,
473 "Failed to delete scheduled account"
474 );
475 } else {
476 info!(
477 did = %account.did,
478 handle = %account.handle,
479 "Successfully deleted scheduled account"
480 );
481 }
482 }
483
484 Ok(())
485}
486
487async fn delete_account_data(
488 db: &PgPool,
489 blob_store: &Arc<dyn BlobStorage>,
490 did: &str,
491 _handle: &str,
492) -> Result<(), String> {
493 let user_id: uuid::Uuid = sqlx::query_scalar!(
494 "SELECT id FROM users WHERE did = $1",
495 did
496 )
497 .fetch_one(db)
498 .await
499 .map_err(|e| format!("DB error fetching user: {}", e))?;
500
501 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
502 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
503 user_id
504 )
505 .fetch_all(db)
506 .await
507 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
508
509 for storage_key in &blob_storage_keys {
510 if let Err(e) = blob_store.delete(storage_key).await {
511 warn!(
512 storage_key = %storage_key,
513 error = %e,
514 "Failed to delete blob from storage (continuing anyway)"
515 );
516 }
517 }
518
519 let mut tx = db
520 .begin()
521 .await
522 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
523
524 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
525 .execute(&mut *tx)
526 .await
527 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
528
529 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
530 .execute(&mut *tx)
531 .await
532 .map_err(|e| format!("Failed to delete user: {}", e))?;
533
534 let account_seq = sqlx::query_scalar!(
535 r#"
536 INSERT INTO repo_seq (did, event_type, active, status)
537 VALUES ($1, 'account', false, 'deleted')
538 RETURNING seq
539 "#,
540 did
541 )
542 .fetch_one(&mut *tx)
543 .await
544 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
545
546 sqlx::query!(
547 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
548 did,
549 account_seq
550 )
551 .execute(&mut *tx)
552 .await
553 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
554
555 tx.commit()
556 .await
557 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
558
559 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
560 .execute(db)
561 .await
562 .map_err(|e| format!("Failed to notify: {}", e))?;
563
564 info!(
565 did = %did,
566 blob_count = blob_storage_keys.len(),
567 "Deleted account data including blobs from storage"
568 );
569
570 Ok(())
571}