this repo has no description
1use cid::Cid;
2use ipld_core::ipld::Ipld;
3use jacquard_repo::commit::Commit;
4use jacquard_repo::storage::BlockStore;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::BlobStorage;
15
16pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
17 let broken_genesis_commits = match sqlx::query!(
18 r#"
19 SELECT seq, did, commit_cid
20 FROM repo_seq
21 WHERE event_type = 'commit'
22 AND prev_cid IS NULL
23 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)
24 "#
25 )
26 .fetch_all(db)
27 .await
28 {
29 Ok(rows) => rows,
30 Err(e) => {
31 error!("Failed to query repo_seq for genesis commit backfill: {}", e);
32 return;
33 }
34 };
35
36 if broken_genesis_commits.is_empty() {
37 debug!("No genesis commits need blocks_cids backfill");
38 return;
39 }
40
41 info!(
42 count = broken_genesis_commits.len(),
43 "Backfilling blocks_cids for genesis commits"
44 );
45
46 let mut success = 0;
47 let mut failed = 0;
48
49 for commit_row in broken_genesis_commits {
50 let commit_cid_str = match &commit_row.commit_cid {
51 Some(c) => c.clone(),
52 None => {
53 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid");
54 failed += 1;
55 continue;
56 }
57 };
58
59 let commit_cid = match Cid::from_str(&commit_cid_str) {
60 Ok(c) => c,
61 Err(_) => {
62 warn!(seq = commit_row.seq, "Invalid commit CID");
63 failed += 1;
64 continue;
65 }
66 };
67
68 let block = match block_store.get(&commit_cid).await {
69 Ok(Some(b)) => b,
70 Ok(None) => {
71 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store");
72 failed += 1;
73 continue;
74 }
75 Err(e) => {
76 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block");
77 failed += 1;
78 continue;
79 }
80 };
81
82 let commit = match Commit::from_cbor(&block) {
83 Ok(c) => c,
84 Err(e) => {
85 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit");
86 failed += 1;
87 continue;
88 }
89 };
90
91 let mst_root_cid = commit.data;
92 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
93
94 if let Err(e) = sqlx::query!(
95 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
96 &blocks_cids,
97 commit_row.seq
98 )
99 .execute(db)
100 .await
101 {
102 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids");
103 failed += 1;
104 } else {
105 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids");
106 success += 1;
107 }
108 }
109
110 info!(
111 success,
112 failed, "Completed genesis commit blocks_cids backfill"
113 );
114}
115
116pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
117 let repos_missing_rev =
118 match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL")
119 .fetch_all(db)
120 .await
121 {
122 Ok(rows) => rows,
123 Err(e) => {
124 error!("Failed to query repos for backfill: {}", e);
125 return;
126 }
127 };
128
129 if repos_missing_rev.is_empty() {
130 debug!("No repos need repo_rev backfill");
131 return;
132 }
133
134 info!(
135 count = repos_missing_rev.len(),
136 "Backfilling repo_rev for existing repos"
137 );
138
139 let mut success = 0;
140 let mut failed = 0;
141
142 for repo in repos_missing_rev {
143 let cid = match Cid::from_str(&repo.repo_root_cid) {
144 Ok(c) => c,
145 Err(_) => {
146 failed += 1;
147 continue;
148 }
149 };
150
151 let block = match block_store.get(&cid).await {
152 Ok(Some(b)) => b,
153 _ => {
154 failed += 1;
155 continue;
156 }
157 };
158
159 let commit = match Commit::from_cbor(&block) {
160 Ok(c) => c,
161 Err(_) => {
162 failed += 1;
163 continue;
164 }
165 };
166
167 let rev = commit.rev().to_string();
168
169 if let Err(e) = sqlx::query!(
170 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
171 rev,
172 repo.user_id
173 )
174 .execute(db)
175 .await
176 {
177 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
178 failed += 1;
179 } else {
180 success += 1;
181 }
182 }
183
184 info!(success, failed, "Completed repo_rev backfill");
185}
186
187pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
188 let users_without_blocks = match sqlx::query!(
189 r#"
190 SELECT u.id as user_id, r.repo_root_cid
191 FROM users u
192 JOIN repos r ON r.user_id = u.id
193 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
194 "#
195 )
196 .fetch_all(db)
197 .await
198 {
199 Ok(rows) => rows,
200 Err(e) => {
201 error!("Failed to query users for user_blocks backfill: {}", e);
202 return;
203 }
204 };
205
206 if users_without_blocks.is_empty() {
207 debug!("No users need user_blocks backfill");
208 return;
209 }
210
211 info!(
212 count = users_without_blocks.len(),
213 "Backfilling user_blocks for existing repos"
214 );
215
216 let mut success = 0;
217 let mut failed = 0;
218
219 for user in users_without_blocks {
220 let root_cid = match Cid::from_str(&user.repo_root_cid) {
221 Ok(c) => c,
222 Err(_) => {
223 failed += 1;
224 continue;
225 }
226 };
227
228 let mut block_cids: Vec<Vec<u8>> = Vec::new();
229 let mut to_visit = vec![root_cid];
230 let mut visited = std::collections::HashSet::new();
231
232 while let Some(cid) = to_visit.pop() {
233 if visited.contains(&cid) {
234 continue;
235 }
236 visited.insert(cid);
237 block_cids.push(cid.to_bytes());
238
239 let block = match block_store.get(&cid).await {
240 Ok(Some(b)) => b,
241 _ => continue,
242 };
243
244 if let Ok(commit) = Commit::from_cbor(&block) {
245 to_visit.push(commit.data);
246 if let Some(prev) = commit.prev {
247 to_visit.push(prev);
248 }
249 } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
250 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
251 to_visit.push(*left_cid);
252 }
253 if let Some(Ipld::List(entries)) = obj.get("e") {
254 for entry in entries {
255 if let Ipld::Map(entry_obj) = entry {
256 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
257 to_visit.push(*tree_cid);
258 }
259 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") {
260 to_visit.push(*val_cid);
261 }
262 }
263 }
264 }
265 }
266 }
267
268 if block_cids.is_empty() {
269 failed += 1;
270 continue;
271 }
272
273 if let Err(e) = sqlx::query!(
274 r#"
275 INSERT INTO user_blocks (user_id, block_cid)
276 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
277 ON CONFLICT (user_id, block_cid) DO NOTHING
278 "#,
279 user.user_id,
280 &block_cids
281 )
282 .execute(db)
283 .await
284 {
285 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
286 failed += 1;
287 } else {
288 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
289 success += 1;
290 }
291 }
292
293 info!(success, failed, "Completed user_blocks backfill");
294}
295
296pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
297 let users_needing_backfill = match sqlx::query!(
298 r#"
299 SELECT DISTINCT u.id as user_id, u.did
300 FROM users u
301 JOIN records r ON r.repo_id = u.id
302 WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
303 LIMIT 100
304 "#
305 )
306 .fetch_all(db)
307 .await
308 {
309 Ok(rows) => rows,
310 Err(e) => {
311 error!("Failed to query users for record_blobs backfill: {}", e);
312 return;
313 }
314 };
315
316 if users_needing_backfill.is_empty() {
317 debug!("No users need record_blobs backfill");
318 return;
319 }
320
321 info!(
322 count = users_needing_backfill.len(),
323 "Backfilling record_blobs for existing repos"
324 );
325
326 let mut success = 0;
327 let mut failed = 0;
328
329 for user in users_needing_backfill {
330 let records = match sqlx::query!(
331 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
332 user.user_id
333 )
334 .fetch_all(db)
335 .await
336 {
337 Ok(r) => r,
338 Err(e) => {
339 warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
340 failed += 1;
341 continue;
342 }
343 };
344
345 let mut blob_refs_found = 0;
346 for record in records {
347 let record_cid = match Cid::from_str(&record.record_cid) {
348 Ok(c) => c,
349 Err(_) => continue,
350 };
351
352 let block_bytes = match block_store.get(&record_cid).await {
353 Ok(Some(b)) => b,
354 _ => continue,
355 };
356
357 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
358 Ok(v) => v,
359 Err(_) => continue,
360 };
361
362 let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
363 for blob_ref in blob_refs {
364 let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey);
365 if let Err(e) = sqlx::query!(
366 r#"
367 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
368 VALUES ($1, $2, $3)
369 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
370 "#,
371 user.user_id,
372 record_uri,
373 blob_ref.cid
374 )
375 .execute(db)
376 .await
377 {
378 warn!(error = %e, "Failed to insert record_blob during backfill");
379 } else {
380 blob_refs_found += 1;
381 }
382 }
383 }
384
385 if blob_refs_found > 0 {
386 info!(
387 user_id = %user.user_id,
388 did = %user.did,
389 blob_refs = blob_refs_found,
390 "Backfilled record_blobs"
391 );
392 }
393 success += 1;
394 }
395
396 info!(success, failed, "Completed record_blobs backfill");
397}
398
399pub async fn start_scheduled_tasks(
400 db: PgPool,
401 blob_store: Arc<dyn BlobStorage>,
402 mut shutdown_rx: watch::Receiver<bool>,
403) {
404 let check_interval = Duration::from_secs(
405 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
406 .ok()
407 .and_then(|s| s.parse().ok())
408 .unwrap_or(3600),
409 );
410
411 info!(
412 check_interval_secs = check_interval.as_secs(),
413 "Starting scheduled tasks service"
414 );
415
416 let mut ticker = interval(check_interval);
417 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
418
419 loop {
420 tokio::select! {
421 _ = shutdown_rx.changed() => {
422 if *shutdown_rx.borrow() {
423 info!("Scheduled tasks service shutting down");
424 break;
425 }
426 }
427 _ = ticker.tick() => {
428 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await {
429 error!("Error processing scheduled deletions: {}", e);
430 }
431 }
432 }
433 }
434}
435
436async fn process_scheduled_deletions(
437 db: &PgPool,
438 blob_store: &Arc<dyn BlobStorage>,
439) -> Result<(), String> {
440 let accounts_to_delete = sqlx::query!(
441 r#"
442 SELECT did, handle
443 FROM users
444 WHERE delete_after IS NOT NULL
445 AND delete_after < NOW()
446 AND deactivated_at IS NOT NULL
447 LIMIT 100
448 "#
449 )
450 .fetch_all(db)
451 .await
452 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
453
454 if accounts_to_delete.is_empty() {
455 debug!("No accounts scheduled for deletion");
456 return Ok(());
457 }
458
459 info!(
460 count = accounts_to_delete.len(),
461 "Processing scheduled account deletions"
462 );
463
464 for account in accounts_to_delete {
465 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
466 warn!(
467 did = %account.did,
468 handle = %account.handle,
469 error = %e,
470 "Failed to delete scheduled account"
471 );
472 } else {
473 info!(
474 did = %account.did,
475 handle = %account.handle,
476 "Successfully deleted scheduled account"
477 );
478 }
479 }
480
481 Ok(())
482}
483
484async fn delete_account_data(
485 db: &PgPool,
486 blob_store: &Arc<dyn BlobStorage>,
487 did: &str,
488 _handle: &str,
489) -> Result<(), String> {
490 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
491 .fetch_one(db)
492 .await
493 .map_err(|e| format!("DB error fetching user: {}", e))?;
494
495 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
496 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
497 user_id
498 )
499 .fetch_all(db)
500 .await
501 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
502
503 for storage_key in &blob_storage_keys {
504 if let Err(e) = blob_store.delete(storage_key).await {
505 warn!(
506 storage_key = %storage_key,
507 error = %e,
508 "Failed to delete blob from storage (continuing anyway)"
509 );
510 }
511 }
512
513 let mut tx = db
514 .begin()
515 .await
516 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
517
518 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
519 .execute(&mut *tx)
520 .await
521 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
522
523 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
524 .execute(&mut *tx)
525 .await
526 .map_err(|e| format!("Failed to delete user: {}", e))?;
527
528 let account_seq = sqlx::query_scalar!(
529 r#"
530 INSERT INTO repo_seq (did, event_type, active, status)
531 VALUES ($1, 'account', false, 'deleted')
532 RETURNING seq
533 "#,
534 did
535 )
536 .fetch_one(&mut *tx)
537 .await
538 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
539
540 sqlx::query!(
541 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
542 did,
543 account_seq
544 )
545 .execute(&mut *tx)
546 .await
547 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
548
549 tx.commit()
550 .await
551 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
552
553 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
554 .execute(db)
555 .await
556 .map_err(|e| format!("Failed to notify: {}", e))?;
557
558 info!(
559 did = %did,
560 blob_count = blob_storage_keys.len(),
561 "Deleted account data including blobs from storage"
562 );
563
564 Ok(())
565}