this repo has no description
1use cid::Cid;
2use jacquard_repo::commit::Commit;
3use jacquard_repo::storage::BlockStore;
4use ipld_core::ipld::Ipld;
5use sqlx::PgPool;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch;
10use tokio::time::interval;
11use tracing::{debug, error, info, warn};
12
13use crate::repo::PostgresBlockStore;
14use crate::storage::BlobStorage;
15
16pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
17 let repos_missing_rev = match sqlx::query!(
18 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL"
19 )
20 .fetch_all(db)
21 .await
22 {
23 Ok(rows) => rows,
24 Err(e) => {
25 error!("Failed to query repos for backfill: {}", e);
26 return;
27 }
28 };
29
30 if repos_missing_rev.is_empty() {
31 debug!("No repos need repo_rev backfill");
32 return;
33 }
34
35 info!(
36 count = repos_missing_rev.len(),
37 "Backfilling repo_rev for existing repos"
38 );
39
40 let mut success = 0;
41 let mut failed = 0;
42
43 for repo in repos_missing_rev {
44 let cid = match Cid::from_str(&repo.repo_root_cid) {
45 Ok(c) => c,
46 Err(_) => {
47 failed += 1;
48 continue;
49 }
50 };
51
52 let block = match block_store.get(&cid).await {
53 Ok(Some(b)) => b,
54 _ => {
55 failed += 1;
56 continue;
57 }
58 };
59
60 let commit = match Commit::from_cbor(&block) {
61 Ok(c) => c,
62 Err(_) => {
63 failed += 1;
64 continue;
65 }
66 };
67
68 let rev = commit.rev().to_string();
69
70 if let Err(e) = sqlx::query!(
71 "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
72 rev,
73 repo.user_id
74 )
75 .execute(db)
76 .await
77 {
78 warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
79 failed += 1;
80 } else {
81 success += 1;
82 }
83 }
84
85 info!(success, failed, "Completed repo_rev backfill");
86}
87
88pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
89 let users_without_blocks = match sqlx::query!(
90 r#"
91 SELECT u.id as user_id, r.repo_root_cid
92 FROM users u
93 JOIN repos r ON r.user_id = u.id
94 WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
95 "#
96 )
97 .fetch_all(db)
98 .await
99 {
100 Ok(rows) => rows,
101 Err(e) => {
102 error!("Failed to query users for user_blocks backfill: {}", e);
103 return;
104 }
105 };
106
107 if users_without_blocks.is_empty() {
108 debug!("No users need user_blocks backfill");
109 return;
110 }
111
112 info!(
113 count = users_without_blocks.len(),
114 "Backfilling user_blocks for existing repos"
115 );
116
117 let mut success = 0;
118 let mut failed = 0;
119
120 for user in users_without_blocks {
121 let root_cid = match Cid::from_str(&user.repo_root_cid) {
122 Ok(c) => c,
123 Err(_) => {
124 failed += 1;
125 continue;
126 }
127 };
128
129 let mut block_cids: Vec<Vec<u8>> = Vec::new();
130 let mut to_visit = vec![root_cid];
131 let mut visited = std::collections::HashSet::new();
132
133 while let Some(cid) = to_visit.pop() {
134 if visited.contains(&cid) {
135 continue;
136 }
137 visited.insert(cid);
138 block_cids.push(cid.to_bytes());
139
140 let block = match block_store.get(&cid).await {
141 Ok(Some(b)) => b,
142 _ => continue,
143 };
144
145 if let Ok(commit) = Commit::from_cbor(&block) {
146 to_visit.push(commit.data);
147 if let Some(prev) = commit.prev {
148 to_visit.push(prev);
149 }
150 } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
151 if let Ipld::Map(ref obj) = ipld {
152 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
153 to_visit.push(*left_cid);
154 }
155 if let Some(Ipld::List(entries)) = obj.get("e") {
156 for entry in entries {
157 if let Ipld::Map(entry_obj) = entry {
158 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
159 to_visit.push(*tree_cid);
160 }
161 if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") {
162 to_visit.push(*val_cid);
163 }
164 }
165 }
166 }
167 }
168 }
169 }
170
171 if block_cids.is_empty() {
172 failed += 1;
173 continue;
174 }
175
176 if let Err(e) = sqlx::query!(
177 r#"
178 INSERT INTO user_blocks (user_id, block_cid)
179 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
180 ON CONFLICT (user_id, block_cid) DO NOTHING
181 "#,
182 user.user_id,
183 &block_cids
184 )
185 .execute(db)
186 .await
187 {
188 warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
189 failed += 1;
190 } else {
191 info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
192 success += 1;
193 }
194 }
195
196 info!(success, failed, "Completed user_blocks backfill");
197}
198
199pub async fn start_scheduled_tasks(
200 db: PgPool,
201 blob_store: Arc<dyn BlobStorage>,
202 mut shutdown_rx: watch::Receiver<bool>,
203) {
204 let check_interval = Duration::from_secs(
205 std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
206 .ok()
207 .and_then(|s| s.parse().ok())
208 .unwrap_or(3600),
209 );
210
211 info!(
212 check_interval_secs = check_interval.as_secs(),
213 "Starting scheduled tasks service"
214 );
215
216 let mut ticker = interval(check_interval);
217 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
218
219 loop {
220 tokio::select! {
221 _ = shutdown_rx.changed() => {
222 if *shutdown_rx.borrow() {
223 info!("Scheduled tasks service shutting down");
224 break;
225 }
226 }
227 _ = ticker.tick() => {
228 if let Err(e) = process_scheduled_deletions(&db, &blob_store).await {
229 error!("Error processing scheduled deletions: {}", e);
230 }
231 }
232 }
233 }
234}
235
236async fn process_scheduled_deletions(
237 db: &PgPool,
238 blob_store: &Arc<dyn BlobStorage>,
239) -> Result<(), String> {
240 let accounts_to_delete = sqlx::query!(
241 r#"
242 SELECT did, handle
243 FROM users
244 WHERE delete_after IS NOT NULL
245 AND delete_after < NOW()
246 AND deactivated_at IS NOT NULL
247 LIMIT 100
248 "#
249 )
250 .fetch_all(db)
251 .await
252 .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
253
254 if accounts_to_delete.is_empty() {
255 debug!("No accounts scheduled for deletion");
256 return Ok(());
257 }
258
259 info!(
260 count = accounts_to_delete.len(),
261 "Processing scheduled account deletions"
262 );
263
264 for account in accounts_to_delete {
265 if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
266 warn!(
267 did = %account.did,
268 handle = %account.handle,
269 error = %e,
270 "Failed to delete scheduled account"
271 );
272 } else {
273 info!(
274 did = %account.did,
275 handle = %account.handle,
276 "Successfully deleted scheduled account"
277 );
278 }
279 }
280
281 Ok(())
282}
283
284async fn delete_account_data(
285 db: &PgPool,
286 blob_store: &Arc<dyn BlobStorage>,
287 did: &str,
288 _handle: &str,
289) -> Result<(), String> {
290 let user_id: uuid::Uuid = sqlx::query_scalar!(
291 "SELECT id FROM users WHERE did = $1",
292 did
293 )
294 .fetch_one(db)
295 .await
296 .map_err(|e| format!("DB error fetching user: {}", e))?;
297
298 let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
299 r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
300 user_id
301 )
302 .fetch_all(db)
303 .await
304 .map_err(|e| format!("DB error fetching blob keys: {}", e))?;
305
306 for storage_key in &blob_storage_keys {
307 if let Err(e) = blob_store.delete(storage_key).await {
308 warn!(
309 storage_key = %storage_key,
310 error = %e,
311 "Failed to delete blob from storage (continuing anyway)"
312 );
313 }
314 }
315
316 let mut tx = db
317 .begin()
318 .await
319 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
320
321 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
322 .execute(&mut *tx)
323 .await
324 .map_err(|e| format!("Failed to delete blobs: {}", e))?;
325
326 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
327 .execute(&mut *tx)
328 .await
329 .map_err(|e| format!("Failed to delete user: {}", e))?;
330
331 let account_seq = sqlx::query_scalar!(
332 r#"
333 INSERT INTO repo_seq (did, event_type, active, status)
334 VALUES ($1, 'account', false, 'deleted')
335 RETURNING seq
336 "#,
337 did
338 )
339 .fetch_one(&mut *tx)
340 .await
341 .map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
342
343 sqlx::query!(
344 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
345 did,
346 account_seq
347 )
348 .execute(&mut *tx)
349 .await
350 .map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
351
352 tx.commit()
353 .await
354 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
355
356 sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
357 .execute(db)
358 .await
359 .map_err(|e| format!("Failed to notify: {}", e))?;
360
361 info!(
362 did = %did,
363 blob_count = blob_storage_keys.len(),
364 "Deleted account data including blobs from storage"
365 );
366
367 Ok(())
368}