this repo has no description
1use cid::Cid;
2use jacquard_repo::commit::Commit;
3use jacquard_repo::storage::BlockStore;
4use ipld_core::ipld::Ipld;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::BlobStorage;
15
16pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
17 let broken_genesis_commits = match sqlx::query!(
18 r#"
19 SELECT seq, did, commit_cid
20 FROM repo_seq
21 WHERE event_type = 'commit'
22 AND prev_cid IS NULL
23 AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)
24 "#
25 )
26 .fetch_all(db)
27 .await
28 {
29 Ok(rows) => rows,
30 Err(e) => {
31 error!("Failed to query repo_seq for genesis commit backfill: {}", e);
32 return;
33 }
34 };
35
36 if broken_genesis_commits.is_empty() {
37 debug!("No genesis commits need blocks_cids backfill");
38 return;
39 }
40
41 info!(
42 count = broken_genesis_commits.len(),
43 "Backfilling blocks_cids for genesis commits"
44 );
45
46 let mut success = 0;
47 let mut failed = 0;
48
49 for commit_row in broken_genesis_commits {
50 let commit_cid_str = match &commit_row.commit_cid {
51 Some(c) => c.clone(),
52 None => {
53 warn!(seq = commit_row.seq, "Genesis commit missing commit_cid");
54 failed += 1;
55 continue;
56 }
57 };
58
59 let commit_cid = match Cid::from_str(&commit_cid_str) {
60 Ok(c) => c,
61 Err(_) => {
62 warn!(seq = commit_row.seq, "Invalid commit CID");
63 failed += 1;
64 continue;
65 }
66 };
67
68 let block = match block_store.get(&commit_cid).await {
69 Ok(Some(b)) => b,
70 Ok(None) => {
71 warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store");
72 failed += 1;
73 continue;
74 }
75 Err(e) => {
76 warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block");
77 failed += 1;
78 continue;
79 }
80 };
81
82 let commit = match Commit::from_cbor(&block) {
83 Ok(c) => c,
84 Err(e) => {
85 warn!(seq = commit_row.seq, error = %e, "Failed to parse commit");
86 failed += 1;
87 continue;
88 }
89 };
90
91 let mst_root_cid = commit.data;
92 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
93
94 if let Err(e) = sqlx::query!(
95 "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
96 &blocks_cids,
97 commit_row.seq
98 )
99 .execute(db)
100 .await
101 {
102 warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids");
103 failed += 1;
104 } else {
105 info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids");
106 success += 1;
107 }
108 }
109
110 info!(success, failed, "Completed genesis commit blocks_cids backfill");
111}
112
113pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
114 let repos_missing_rev = match sqlx::query!(
115 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL"
116 )
117 .fetch_all(db)
118 .await
119 {
120 Ok(rows) => rows,
121 Err(e) => {
122 error!("Failed to query repos for backfill: {}", e);
123 return;
124 }
125 };
126
127 if repos_missing_rev.is_empty() {
128 debug!("No repos need repo_rev backfill");
129 return;
130 }
131
132 info!(
133 count = repos_missing_rev.len(),
134 "Backfilling repo_rev for existing repos"
135 );
136
137 let mut success = 0;
138 let mut failed = 0;
139
140 for repo in repos_missing_rev {
141 let cid = match Cid::from_str(&repo.repo_root_cid) {
142 Ok(c) => c,
143 Err(_) => {
144 failed += 1;
145 continue;
146 }
147 };
148
149 let block = match block_store.get(&cid).await {
150 Ok(Some(b)) => b,
151 _ => {
152 failed += 1;
153 continue;
154 }
155 };
156
157 let commit = match Commit::from_cbor(&block) {
158 Ok(c) => c,
159 Err(_) => {
160 failed += 1;
161 continue;
162 }
163 };
164
165 let rev = commit.rev().to_string();
166
167 if let Err(e) = sqlx::query!(
168 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
169 rev,
170 repo.user_id
171 )
172 .execute(db)
173 .await
174 {
175 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
176 failed += 1;
177 } else {
178 success += 1;
179 }
180 }
181
182 info!(success, failed, "Completed repo_rev backfill");
183}
184
185pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
186 let users_without_blocks = match sqlx::query!(
187 r#"
188 SELECT u.id as user_id, r.repo_root_cid
189 FROM users u
190 JOIN repos r ON r.user_id = u.id
191 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
192 "#
193 )
194 .fetch_all(db)
195 .await
196 {
197 Ok(rows) => rows,
198 Err(e) => {
199 error!("Failed to query users for user_blocks backfill: {}", e);
200 return;
201 }
202 };
203
204 if users_without_blocks.is_empty() {
205 debug!("No users need user_blocks backfill");
206 return;
207 }
208
209 info!(
210 count = users_without_blocks.len(),
211 "Backfilling user_blocks for existing repos"
212 );
213
214 let mut success = 0;
215 let mut failed = 0;
216
217 for user in users_without_blocks {
218 let root_cid = match Cid::from_str(&user.repo_root_cid) {
219 Ok(c) => c,
220 Err(_) => {
221 failed += 1;
222 continue;
223 }
224 };
225
226 let mut block_cids: Vec<Vec<u8>> = Vec::new();
227 let mut to_visit = vec![root_cid];
228 let mut visited = std::collections::HashSet::new();
229
230 while let Some(cid) = to_visit.pop() {
231 if visited.contains(&cid) {
232 continue;
233 }
234 visited.insert(cid);
235 block_cids.push(cid.to_bytes());
236
237 let block = match block_store.get(&cid).await {
238 Ok(Some(b)) => b,
239 _ => continue,
240 };
241
242 if let Ok(commit) = Commit::from_cbor(&block) {
243 to_visit.push(commit.data);
244 if let Some(prev) = commit.prev {
245 to_visit.push(prev);
246 }
247 } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
248 if let Ipld::Map(ref obj) = ipld {
249 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
250 to_visit.push(*left_cid);
251 }
252 if let Some(Ipld::List(entries)) = obj.get("e") {
253 for entry in entries {
254 if let Ipld::Map(entry_obj) = entry {
255 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
256 to_visit.push(*tree_cid);
257 }
258 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") {
259 to_visit.push(*val_cid);
260 }
261 }
262 }
263 }
264 }
265 }
266 }
267
268 if block_cids.is_empty() {
269 failed += 1;
270 continue;
271 }
272
273 if let Err(e) = sqlx::query!(
274 r#"
275 INSERT INTO user_blocks (user_id, block_cid)
276 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
277 ON CONFLICT (user_id, block_cid) DO NOTHING
278 "#,
279 user.user_id,
280 &block_cids
281 )
282 .execute(db)
283 .await
284 {
285 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
286 failed += 1;
287 } else {
288 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
289 success += 1;
290 }
291 }
292
293 info!(success, failed, "Completed user_blocks backfill");
294}
295
296pub async fn start_scheduled_tasks(
297 db: PgPool,
298 blob_store: Arc<dyn BlobStorage>,
299 mut shutdown_rx: watch::Receiver<bool>,
300) {
301 let check_interval = Duration::from_secs(
302 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
303 .ok()
304 .and_then(|s| s.parse().ok())
305 .unwrap_or(3600),
306 );
307
308 info!(
309 check_interval_secs = check_interval.as_secs(),
310 "Starting scheduled tasks service"
311 );
312
313 let mut ticker = interval(check_interval);
314 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
315
316 loop {
317 tokio::select! {
318 _ = shutdown_rx.changed() => {
319 if *shutdown_rx.borrow() {
320 info!("Scheduled tasks service shutting down");
321 break;
322 }
323 }
324 _ = ticker.tick() => {
325 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await {
326 error!("Error processing scheduled deletions: {}", e);
327 }
328 }
329 }
330 }
331}
332
333async fn process_scheduled_deletions(
334 db: &PgPool,
335 blob_store: &Arc<dyn BlobStorage>,
336) -> Result<(), String> {
337 let accounts_to_delete = sqlx::query!(
338 r#"
339 SELECT did, handle
340 FROM users
341 WHERE delete_after IS NOT NULL
342 AND delete_after < NOW()
343 AND deactivated_at IS NOT NULL
344 LIMIT 100
345 "#
346 )
347 .fetch_all(db)
348 .await
349 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
350
351 if accounts_to_delete.is_empty() {
352 debug!("No accounts scheduled for deletion");
353 return Ok(());
354 }
355
356 info!(
357 count = accounts_to_delete.len(),
358 "Processing scheduled account deletions"
359 );
360
361 for account in accounts_to_delete {
362 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
363 warn!(
364 did = %account.did,
365 handle = %account.handle,
366 error = %e,
367 "Failed to delete scheduled account"
368 );
369 } else {
370 info!(
371 did = %account.did,
372 handle = %account.handle,
373 "Successfully deleted scheduled account"
374 );
375 }
376 }
377
378 Ok(())
379}
380
381async fn delete_account_data(
382 db: &PgPool,
383 blob_store: &Arc<dyn BlobStorage>,
384 did: &str,
385 _handle: &str,
386) -> Result<(), String> {
387 let user_id: uuid::Uuid = sqlx::query_scalar!(
388 "SELECT id FROM users WHERE did = $1",
389 did
390 )
391 .fetch_one(db)
392 .await
393 .map_err(|e| format!("DB error fetching user: {}", e))?;
394
395 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
396 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
397 user_id
398 )
399 .fetch_all(db)
400 .await
401 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
402
403 for storage_key in &blob_storage_keys {
404 if let Err(e) = blob_store.delete(storage_key).await {
405 warn!(
406 storage_key = %storage_key,
407 error = %e,
408 "Failed to delete blob from storage (continuing anyway)"
409 );
410 }
411 }
412
413 let mut tx = db
414 .begin()
415 .await
416 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
417
418 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
419 .execute(&mut *tx)
420 .await
421 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
422
423 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
424 .execute(&mut *tx)
425 .await
426 .map_err(|e| format!("Failed to delete user: {}", e))?;
427
428 let account_seq = sqlx::query_scalar!(
429 r#"
430 INSERT INTO repo_seq (did, event_type, active, status)
431 VALUES ($1, 'account', false, 'deleted')
432 RETURNING seq
433 "#,
434 did
435 )
436 .fetch_one(&mut *tx)
437 .await
438 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
439
440 sqlx::query!(
441 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
442 did,
443 account_seq
444 )
445 .execute(&mut *tx)
446 .await
447 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
448
449 tx.commit()
450 .await
451 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
452
453 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
454 .execute(db)
455 .await
456 .map_err(|e| format!("Failed to notify: {}", e))?;
457
458 info!(
459 did = %did,
460 blob_count = blob_storage_keys.len(),
461 "Deleted account data including blobs from storage"
462 );
463
464 Ok(())
465}