this repo has no description

Remove old user blocks

lewis aeeaec71 041fb933

+15
.sqlx/query-03faaf7b8676e0af1bf620759425632560dabfd5748d0383971c10f9b2847d7d.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n DELETE FROM user_blocks\n WHERE user_id = $1\n AND block_cid = ANY($2)\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "ByteaArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "03faaf7b8676e0af1bf620759425632560dabfd5748d0383971c10f9b2847d7d" 15 + }
+15
.sqlx/query-d71881b1dd8111b2afff6a7af8829651379afbe050dcc8a93e0b91eced31ca89.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "ByteaArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "d71881b1dd8111b2afff6a7af8829651379afbe050dcc8a93e0b91eced31ca89" 15 + }
+22
.sqlx/query-e70fc3dced4eb7dc220ca2a18cdfcbd5f2d66dff2262bb083fd4118b032ff978.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT block_cid FROM user_blocks WHERE user_id = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "block_cid", 9 + "type_info": "Bytea" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Uuid" 15 + ] 16 + }, 17 + "nullable": [ 18 + false 19 + ] 20 + }, 21 + "hash": "e70fc3dced4eb7dc220ca2a18cdfcbd5f2d66dff2262bb083fd4118b032ff978" 22 + }
+1
migrations/20260106_clear_user_blocks.sql
···
··· 1 + TRUNCATE TABLE user_blocks;
+9 -8
src/api/backup.rs
··· 220 } 221 }; 222 223 - let car_bytes = match generate_full_backup(&state.block_store, &head_cid).await { 224 - Ok(bytes) => bytes, 225 - Err(e) => { 226 - error!("Failed to generate CAR: {:?}", e); 227 - return ApiError::InternalError(Some("Failed to generate backup".into())) 228 - .into_response(); 229 - } 230 - }; 231 232 let block_count = crate::scheduled::count_car_blocks(&car_bytes); 233 let size_bytes = car_bytes.len() as i64;
··· 220 } 221 }; 222 223 + let car_bytes = 224 + match generate_full_backup(&state.db, &state.block_store, user.id, &head_cid).await { 225 + Ok(bytes) => bytes, 226 + Err(e) => { 227 + error!("Failed to generate CAR: {:?}", e); 228 + return ApiError::InternalError(Some("Failed to generate backup".into())) 229 + .into_response(); 230 + } 231 + }; 232 233 let block_count = crate::scheduled::count_car_blocks(&car_bytes); 234 let size_bytes = car_bytes.len() as i64;
+34 -16
src/api/repo/record/batch.rs
··· 388 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 389 } 390 }; 391 - let mut relevant_blocks = std::collections::BTreeMap::new(); 392 for key in &modified_keys { 393 - if mst 394 - .blocks_for_path(key, &mut relevant_blocks) 395 - .await 396 - .is_err() 397 - { 398 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 399 .into_response(); 400 } 401 if original_mst 402 - .blocks_for_path(key, &mut relevant_blocks) 403 .await 404 .is_err() 405 { ··· 407 .into_response(); 408 } 409 } 410 - let mut written_cids = tracking_store.get_all_relevant_cids(); 411 - for cid in relevant_blocks.keys() { 412 - if !written_cids.contains(cid) { 413 - written_cids.push(*cid); 414 } 415 - } 416 - let written_cids_str = written_cids 417 - .iter() 418 - .map(|c| c.to_string()) 419 - .collect::<Vec<_>>(); 420 let commit_res = match commit_and_log( 421 &state, 422 CommitParams { ··· 428 ops, 429 blocks_cids: &written_cids_str, 430 blobs: &all_blob_cids, 431 }, 432 ) 433 .await
··· 388 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 389 } 390 }; 391 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 392 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 393 for key in &modified_keys { 394 + if mst.blocks_for_path(key, &mut new_mst_blocks).await.is_err() { 395 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 396 .into_response(); 397 } 398 if original_mst 399 + .blocks_for_path(key, &mut old_mst_blocks) 400 .await 401 .is_err() 402 { ··· 404 .into_response(); 405 } 406 } 407 + let mut relevant_blocks = new_mst_blocks.clone(); 408 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 409 + let written_cids: Vec<Cid> = tracking_store 410 + .get_all_relevant_cids() 411 + .into_iter() 412 + .chain(relevant_blocks.keys().copied()) 413 + .collect::<std::collections::HashSet<_>>() 414 + .into_iter() 415 + .collect(); 416 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 417 + let prev_record_cids = ops.iter().filter_map(|op| match op { 418 + RecordOp::Update { 419 + prev: Some(cid), .. 420 } 421 + | RecordOp::Delete { 422 + prev: Some(cid), .. 423 + } => Some(*cid), 424 + _ => None, 425 + }); 426 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 427 + .chain( 428 + old_mst_blocks 429 + .keys() 430 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 431 + .copied(), 432 + ) 433 + .chain(prev_record_cids) 434 + .collect::<std::collections::HashSet<_>>() 435 + .into_iter() 436 + .collect(); 437 let commit_res = match commit_and_log( 438 &state, 439 CommitParams { ··· 445 ops, 446 blocks_cids: &written_cids_str, 447 blobs: &all_blob_cids, 448 + obsolete_cids, 449 }, 450 ) 451 .await
+24 -13
src/api/repo/record/delete.rs
··· 129 rkey: rkey_for_audit.clone(), 130 prev: prev_record_cid, 131 }; 132 - let mut relevant_blocks = std::collections::BTreeMap::new(); 133 if new_mst 134 - .blocks_for_path(&key, &mut relevant_blocks) 135 .await 136 .is_err() 137 { ··· 139 .into_response(); 140 } 141 if mst 142 - .blocks_for_path(&key, &mut relevant_blocks) 143 .await 144 .is_err() 145 { 146 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 147 .into_response(); 148 } 149 - let mut written_cids = tracking_store.get_all_relevant_cids(); 150 - for cid in relevant_blocks.keys() { 151 - if !written_cids.contains(cid) { 152 - written_cids.push(*cid); 153 - } 154 - } 155 - let written_cids_str = written_cids 156 - .iter() 157 - .map(|c| c.to_string()) 158 - .collect::<Vec<_>>(); 159 let commit_result = match commit_and_log( 160 &state, 161 CommitParams { ··· 167 ops: vec![op], 168 blocks_cids: &written_cids_str, 169 blobs: &[], 170 }, 171 ) 172 .await
··· 129 rkey: rkey_for_audit.clone(), 130 prev: prev_record_cid, 131 }; 132 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 133 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 134 if new_mst 135 + .blocks_for_path(&key, &mut new_mst_blocks) 136 .await 137 .is_err() 138 { ··· 140 .into_response(); 141 } 142 if mst 143 + .blocks_for_path(&key, &mut old_mst_blocks) 144 .await 145 .is_err() 146 { 147 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 148 .into_response(); 149 } 150 + let mut relevant_blocks = new_mst_blocks.clone(); 151 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 152 + let written_cids: Vec<Cid> = tracking_store 153 + .get_all_relevant_cids() 154 + .into_iter() 155 + .chain(relevant_blocks.keys().copied()) 156 + .collect::<std::collections::HashSet<_>>() 157 + .into_iter() 158 + .collect(); 159 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 160 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 161 + .chain( 162 + old_mst_blocks 163 + .keys() 164 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 165 + .copied(), 166 + ) 167 + .chain(prev_record_cid) 168 + .collect(); 169 let commit_result = match commit_and_log( 170 &state, 171 CommitParams { ··· 177 ops: vec![op], 178 blocks_cids: &written_cids_str, 179 blobs: &[], 180 + obsolete_cids, 181 }, 182 ) 183 .await
+39 -9
src/api/repo/record/utils.rs
··· 92 pub ops: Vec<RecordOp>, 93 pub blocks_cids: &'a [String], 94 pub blobs: &'a [String], 95 } 96 97 pub async fn commit_and_log( ··· 107 ops, 108 blocks_cids, 109 blobs, 110 } = params; 111 let key_row = sqlx::query!( 112 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", ··· 200 .execute(&mut *tx) 201 .await 202 .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 203 } 204 let mut upsert_collections: Vec<String> = Vec::new(); 205 let mut upsert_rkeys: Vec<String> = Vec::new(); ··· 404 rkey: rkey.to_string(), 405 cid: record_cid, 406 }; 407 - let mut relevant_blocks = std::collections::BTreeMap::new(); 408 new_mst 409 - .blocks_for_path(&key, &mut relevant_blocks) 410 .await 411 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 412 - mst.blocks_for_path(&key, &mut relevant_blocks) 413 .await 414 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 415 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 416 - let mut written_cids = tracking_store.get_all_relevant_cids(); 417 - for cid in relevant_blocks.keys() { 418 - if !written_cids.contains(cid) { 419 - written_cids.push(*cid); 420 - } 421 - } 422 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 423 let blob_cids = extract_blob_cids(record); 424 let result = commit_and_log( ··· 432 ops: vec![op], 433 blocks_cids: &written_cids_str, 434 blobs: &blob_cids, 435 }, 436 ) 437 .await?;
··· 92 pub ops: Vec<RecordOp>, 93 pub blocks_cids: &'a [String], 94 pub blobs: &'a [String], 95 + pub obsolete_cids: Vec<Cid>, 96 } 97 98 pub async fn commit_and_log( ··· 108 ops, 109 blocks_cids, 110 blobs, 111 + obsolete_cids, 112 } = params; 113 let key_row = sqlx::query!( 114 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", ··· 202 .execute(&mut *tx) 203 .await 204 .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 205 + } 206 + if !obsolete_cids.is_empty() { 207 + let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 208 + sqlx::query!( 209 + r#" 210 + DELETE FROM user_blocks 211 + WHERE user_id = $1 212 + AND block_cid = ANY($2) 213 + "#, 214 + user_id, 215 + &obsolete_bytes as &[Vec<u8>] 216 + ) 217 + .execute(&mut *tx) 218 + .await 219 + .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?; 220 } 221 let mut upsert_collections: Vec<String> = Vec::new(); 222 let mut upsert_rkeys: Vec<String> = Vec::new(); ··· 421 rkey: rkey.to_string(), 422 cid: record_cid, 423 }; 424 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 425 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 426 new_mst 427 + .blocks_for_path(&key, &mut new_mst_blocks) 428 .await 429 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 430 + mst.blocks_for_path(&key, &mut old_mst_blocks) 431 .await 432 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 433 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 434 + .chain( 435 + old_mst_blocks 436 + .keys() 437 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 438 + .copied(), 439 + ) 440 + .collect(); 441 + let mut relevant_blocks = new_mst_blocks; 442 + relevant_blocks.extend(old_mst_blocks); 443 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 444 + let written_cids: Vec<Cid> = tracking_store 445 + .get_all_relevant_cids() 446 + .into_iter() 447 + .chain(relevant_blocks.keys().copied()) 448 + .collect::<std::collections::HashSet<_>>() 449 + .into_iter() 450 + .collect(); 451 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 452 let blob_cids = extract_blob_cids(record); 453 let result = commit_and_log( ··· 461 ops: vec![op], 462 blocks_cids: &written_cids_str, 463 blobs: &blob_cids, 464 + obsolete_cids, 465 }, 466 ) 467 .await?;
+47 -26
src/api/repo/record/write.rs
··· 266 rkey: rkey.to_string(), 267 cid: record_cid, 268 }; 269 - let mut relevant_blocks = std::collections::BTreeMap::new(); 270 if new_mst 271 - .blocks_for_path(&key, &mut relevant_blocks) 272 .await 273 .is_err() 274 { ··· 276 .into_response(); 277 } 278 if mst 279 - .blocks_for_path(&key, &mut relevant_blocks) 280 .await 281 .is_err() 282 { 283 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 284 .into_response(); 285 } 286 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 287 - let mut written_cids = tracking_store.get_all_relevant_cids(); 288 - for cid in relevant_blocks.keys() { 289 - if !written_cids.contains(cid) { 290 - written_cids.push(*cid); 291 - } 292 - } 293 - let written_cids_str = written_cids 294 - .iter() 295 - .map(|c| c.to_string()) 296 - .collect::<Vec<_>>(); 297 let blob_cids = extract_blob_cids(&input.record); 298 let commit_result = match commit_and_log( 299 &state, 300 CommitParams { ··· 306 ops: vec![op], 307 blocks_cids: &written_cids_str, 308 blobs: &blob_cids, 309 }, 310 ) 311 .await ··· 512 cid: record_cid, 513 } 514 }; 515 - let mut relevant_blocks = std::collections::BTreeMap::new(); 516 if new_mst 517 - .blocks_for_path(&key, &mut relevant_blocks) 518 .await 519 .is_err() 520 { ··· 522 .into_response(); 523 } 524 if mst 525 - .blocks_for_path(&key, &mut relevant_blocks) 526 .await 527 .is_err() 528 { 529 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 530 .into_response(); 531 } 532 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 533 - let mut written_cids = tracking_store.get_all_relevant_cids(); 534 - for cid in relevant_blocks.keys() { 535 - if !written_cids.contains(cid) { 536 - written_cids.push(*cid); 537 - } 538 - } 539 - let written_cids_str = written_cids 540 - .iter() 541 - .map(|c| c.to_string()) 542 - .collect::<Vec<_>>(); 543 let is_update = existing_cid.is_some(); 544 let blob_cids = extract_blob_cids(&input.record); 545 let commit_result = match commit_and_log( 546 &state, 547 CommitParams { ··· 553 ops: vec![op], 554 blocks_cids: &written_cids_str, 555 blobs: &blob_cids, 556 }, 557 ) 558 .await
··· 266 rkey: rkey.to_string(), 267 cid: record_cid, 268 }; 269 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 270 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 271 if new_mst 272 + .blocks_for_path(&key, &mut new_mst_blocks) 273 .await 274 .is_err() 275 { ··· 277 .into_response(); 278 } 279 if mst 280 + .blocks_for_path(&key, &mut old_mst_blocks) 281 .await 282 .is_err() 283 { 284 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 285 .into_response(); 286 } 287 + let mut relevant_blocks = new_mst_blocks.clone(); 288 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 289 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 290 + let written_cids: Vec<Cid> = tracking_store 291 + .get_all_relevant_cids() 292 + .into_iter() 293 + .chain(relevant_blocks.keys().copied()) 294 + .collect::<std::collections::HashSet<_>>() 295 + .into_iter() 296 + .collect(); 297 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 298 let blob_cids = extract_blob_cids(&input.record); 299 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 300 + .chain( 301 + old_mst_blocks 302 + .keys() 303 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 304 + .copied(), 305 + ) 306 + .collect(); 307 let commit_result = match commit_and_log( 308 &state, 309 CommitParams { ··· 315 ops: vec![op], 316 blocks_cids: &written_cids_str, 317 blobs: &blob_cids, 318 + obsolete_cids, 319 }, 320 ) 321 .await ··· 522 cid: record_cid, 523 } 524 }; 525 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 526 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 527 if new_mst 528 + .blocks_for_path(&key, &mut new_mst_blocks) 529 .await 530 .is_err() 531 { ··· 533 .into_response(); 534 } 535 if mst 536 + .blocks_for_path(&key, &mut old_mst_blocks) 537 .await 538 .is_err() 539 { 540 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 541 .into_response(); 542 } 543 + let mut relevant_blocks = new_mst_blocks.clone(); 544 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 545 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 546 + let written_cids: Vec<Cid> = tracking_store 547 + .get_all_relevant_cids() 548 + .into_iter() 549 + .chain(relevant_blocks.keys().copied()) 550 + .collect::<std::collections::HashSet<_>>() 551 + .into_iter() 552 + .collect(); 553 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 554 let is_update = existing_cid.is_some(); 555 let blob_cids = extract_blob_cids(&input.record); 556 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 557 + .chain( 558 + old_mst_blocks 559 + .keys() 560 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 561 + .copied(), 562 + ) 563 + .chain(existing_cid) 564 + .collect(); 565 let commit_result = match commit_and_log( 566 &state, 567 CommitParams { ··· 573 ops: vec![op], 574 blocks_cids: &written_cids_str, 575 blobs: &blob_cids, 576 + obsolete_cids, 577 }, 578 ) 579 .await
+159 -101
src/scheduled.rs
··· 226 } 227 }; 228 229 - let mut block_cids: Vec<Vec<u8>> = Vec::new(); 230 - let mut to_visit = vec![root_cid]; 231 - let mut visited = std::collections::HashSet::new(); 232 233 - while let Some(cid) = to_visit.pop() { 234 - if visited.contains(&cid) { 235 - continue; 236 } 237 - visited.insert(cid); 238 - block_cids.push(cid.to_bytes()); 239 - 240 - let block = match block_store.get(&cid).await { 241 - Ok(Some(b)) => b, 242 - _ => continue, 243 - }; 244 - 245 - if let Ok(commit) = Commit::from_cbor(&block) { 246 - to_visit.push(commit.data); 247 - if let Some(prev) = commit.prev { 248 - to_visit.push(prev); 249 - } 250 - } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 251 - if let Some(Ipld::Link(left_cid)) = obj.get("l") { 252 - to_visit.push(*left_cid); 253 - } 254 - if let Some(Ipld::List(entries)) = obj.get("e") { 255 - for entry in entries { 256 - if let Ipld::Map(entry_obj) = entry { 257 - if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 258 - to_visit.push(*tree_cid); 259 - } 260 - if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 261 - to_visit.push(*val_cid); 262 - } 263 - } 264 - } 265 - } 266 } 267 } 268 269 - if block_cids.is_empty() { 270 - failed += 1; 271 continue; 272 } 273 274 - if let Err(e) = sqlx::query!( 275 - r#" 276 - INSERT INTO user_blocks (user_id, block_cid) 277 - SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 278 - ON CONFLICT (user_id, block_cid) DO NOTHING 279 - "#, 280 - user.user_id, 281 - &block_cids 282 - ) 283 - .execute(db) 284 - .await 285 - { 286 - warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 287 - failed += 1; 288 - } else { 289 - info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 290 - success += 1; 291 } 292 } 293 294 - info!(success, failed, "Completed user_blocks backfill"); 295 } 296 297 pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { ··· 664 } 665 }; 666 667 - let car_result = generate_full_backup(block_store, &head_cid).await; 668 let car_bytes = match car_result { 669 Ok(bytes) => bytes, 670 Err(e) => { ··· 736 head_cid: &Cid, 737 ) -> Result<Vec<u8>, String> { 738 use jacquard_repo::storage::BlockStore; 739 - use std::io::Write; 740 741 - let mut car_bytes = 742 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 743 744 - let mut stack = vec![*head_cid]; 745 - let mut visited = std::collections::HashSet::new(); 746 747 - while let Some(cid) = stack.pop() { 748 - if visited.contains(&cid) { 749 - continue; 750 - } 751 - visited.insert(cid); 752 753 - if let Ok(Some(block)) = block_store.get(&cid).await { 754 - let cid_bytes = cid.to_bytes(); 755 - let total_len = cid_bytes.len() + block.len(); 756 - let mut writer = Vec::new(); 757 - crate::sync::car::write_varint(&mut writer, total_len as u64) 758 - .expect("Writing to Vec<u8> should never fail"); 759 - writer 760 - .write_all(&cid_bytes) 761 - .expect("Writing to Vec<u8> should never fail"); 762 - writer 763 - .write_all(&block) 764 - .expect("Writing to Vec<u8> should never fail"); 765 - car_bytes.extend_from_slice(&writer); 766 767 - if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 768 - extract_links(&value, &mut stack); 769 - } 770 } 771 } 772 773 Ok(car_bytes) 774 } 775 776 pub async fn generate_full_backup( 777 block_store: &PostgresBlockStore, 778 head_cid: &Cid, 779 ) -> Result<Vec<u8>, String> { 780 - generate_repo_car(block_store, head_cid).await 781 - } 782 - 783 - fn extract_links(value: &Ipld, stack: &mut Vec<Cid>) { 784 - match value { 785 - Ipld::Link(cid) => { 786 - stack.push(*cid); 787 - } 788 - Ipld::Map(map) => { 789 - for v in map.values() { 790 - extract_links(v, stack); 791 - } 792 - } 793 - Ipld::List(arr) => { 794 - for v in arr { 795 - extract_links(v, stack); 796 - } 797 - } 798 - _ => {} 799 - } 800 } 801 802 pub fn count_car_blocks(car_bytes: &[u8]) -> i32 {
··· 226 } 227 }; 228 229 + match collect_current_repo_blocks(&block_store, &root_cid).await { 230 + Ok(block_cids) => { 231 + if block_cids.is_empty() { 232 + failed += 1; 233 + continue; 234 + } 235 236 + if let Err(e) = sqlx::query!( 237 + r#" 238 + INSERT INTO user_blocks (user_id, block_cid) 239 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 240 + ON CONFLICT (user_id, block_cid) DO NOTHING 241 + "#, 242 + user.user_id, 243 + &block_cids 244 + ) 245 + .execute(db) 246 + .await 247 + { 248 + warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 249 + failed += 1; 250 + } else { 251 + info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 252 + success += 1; 253 + } 254 } 255 + Err(e) => { 256 + warn!(user_id = %user.user_id, error = %e, "Failed to collect repo blocks for backfill"); 257 + failed += 1; 258 } 259 } 260 + } 261 262 + info!(success, failed, "Completed user_blocks backfill"); 263 + } 264 + 265 + pub async fn collect_current_repo_blocks( 266 + block_store: &PostgresBlockStore, 267 + head_cid: &Cid, 268 + ) -> Result<Vec<Vec<u8>>, String> { 269 + let mut block_cids: Vec<Vec<u8>> = Vec::new(); 270 + let mut to_visit = vec![*head_cid]; 271 + let mut visited = std::collections::HashSet::new(); 272 + 273 + while let Some(cid) = to_visit.pop() { 274 + if visited.contains(&cid) { 275 continue; 276 } 277 + visited.insert(cid); 278 + block_cids.push(cid.to_bytes()); 279 280 + let block = match block_store.get(&cid).await { 281 + Ok(Some(b)) => b, 282 + Ok(None) => continue, 283 + Err(e) => return Err(format!("Failed to get block {}: {:?}", cid, e)), 284 + }; 285 + 286 + if let Ok(commit) = Commit::from_cbor(&block) { 287 + to_visit.push(commit.data); 288 + } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 289 + if let Some(Ipld::Link(left_cid)) = obj.get("l") { 290 + to_visit.push(*left_cid); 291 + } 292 + if let Some(Ipld::List(entries)) = obj.get("e") { 293 + to_visit.extend( 294 + entries 295 + .iter() 296 + .filter_map(|entry| match entry { 297 + Ipld::Map(entry_obj) => Some(entry_obj), 298 + _ => None, 299 + }) 300 + .flat_map(|entry_obj| { 301 + [entry_obj.get("t"), entry_obj.get("v")] 302 + .into_iter() 303 + .flatten() 304 + .filter_map(|v| match v { 305 + Ipld::Link(cid) => Some(*cid), 306 + _ => None, 307 + }) 308 + }), 309 + ); 310 + } 311 } 312 } 313 314 + Ok(block_cids) 315 } 316 317 pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { ··· 684 } 685 }; 686 687 + let car_result = generate_full_backup(db, block_store, user.user_id, &head_cid).await; 688 let car_bytes = match car_result { 689 Ok(bytes) => bytes, 690 Err(e) => { ··· 756 head_cid: &Cid, 757 ) -> Result<Vec<u8>, String> { 758 use jacquard_repo::storage::BlockStore; 759 760 + let block_cids_bytes = collect_current_repo_blocks(block_store, head_cid).await?; 761 + let block_cids: Vec<Cid> = block_cids_bytes 762 + .iter() 763 + .filter_map(|b| Cid::try_from(b.as_slice()).ok()) 764 + .collect(); 765 + 766 + let car_bytes = 767 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 768 769 + let blocks = block_store 770 + .get_many(&block_cids) 771 + .await 772 + .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?; 773 774 + let car_bytes = block_cids 775 + .iter() 776 + .zip(blocks.iter()) 777 + .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 778 + .fold(car_bytes, |mut acc, (cid, block)| { 779 + acc.extend(encode_car_block(cid, block)); 780 + acc 781 + }); 782 783 + Ok(car_bytes) 784 + } 785 + 786 + fn encode_car_block(cid: &Cid, block: &[u8]) -> Vec<u8> { 787 + use std::io::Write; 788 + let cid_bytes = cid.to_bytes(); 789 + let total_len = cid_bytes.len() + block.len(); 790 + let mut writer = Vec::new(); 791 + crate::sync::car::write_varint(&mut writer, total_len as u64) 792 + .expect("Writing to Vec<u8> should never fail"); 793 + writer 794 + .write_all(&cid_bytes) 795 + .expect("Writing to Vec<u8> should never fail"); 796 + writer 797 + .write_all(block) 798 + .expect("Writing to Vec<u8> should never fail"); 799 + writer 800 + } 801 802 + pub async fn generate_repo_car_from_user_blocks( 803 + db: &PgPool, 804 + block_store: &PostgresBlockStore, 805 + user_id: uuid::Uuid, 806 + head_cid: &Cid, 807 + ) -> Result<Vec<u8>, String> { 808 + use jacquard_repo::storage::BlockStore; 809 + 810 + let block_cid_bytes: Vec<Vec<u8>> = sqlx::query_scalar!( 811 + "SELECT block_cid FROM user_blocks WHERE user_id = $1", 812 + user_id 813 + ) 814 + .fetch_all(db) 815 + .await 816 + .map_err(|e| format!("Failed to fetch user_blocks: {}", e))?; 817 + 818 + if block_cid_bytes.is_empty() { 819 + let cids = collect_current_repo_blocks(block_store, head_cid).await?; 820 + if cids.is_empty() { 821 + return Err("No blocks found for repo".to_string()); 822 } 823 + return generate_repo_car(block_store, head_cid).await; 824 } 825 826 + let block_cids: Vec<Cid> = block_cid_bytes 827 + .iter() 828 + .filter_map(|bytes| Cid::try_from(bytes.as_slice()).ok()) 829 + .collect(); 830 + 831 + let car_bytes = 832 + encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 833 + 834 + let blocks = block_store 835 + .get_many(&block_cids) 836 + .await 837 + .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?; 838 + 839 + let car_bytes = block_cids 840 + .iter() 841 + .zip(blocks.iter()) 842 + .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 843 + .fold(car_bytes, |mut acc, (cid, block)| { 844 + acc.extend(encode_car_block(cid, block)); 845 + acc 846 + }); 847 + 848 Ok(car_bytes) 849 } 850 851 pub async fn generate_full_backup( 852 + db: &PgPool, 853 block_store: &PostgresBlockStore, 854 + user_id: uuid::Uuid, 855 head_cid: &Cid, 856 ) -> Result<Vec<u8>, String> { 857 + generate_repo_car_from_user_blocks(db, block_store, user_id, head_cid).await 858 } 859 860 pub fn count_car_blocks(car_bytes: &[u8]) -> i32 {
+12 -55
src/sync/repo.rs
··· 1 use crate::api::error::ApiError; 2 use crate::state::AppState; 3 use crate::sync::car::encode_car_header; 4 use crate::sync::util::assert_repo_availability; ··· 8 response::{IntoResponse, Response}, 9 }; 10 use cid::Cid; 11 - use ipld_core::ipld::Ipld; 12 use jacquard_repo::storage::BlockStore; 13 use serde::Deserialize; 14 use std::io::Write; 15 use std::str::FromStr; 16 use tracing::error; 17 - 18 - const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20 fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 21 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") ··· 138 return get_repo_since(&state, &query.did, &head_cid, since).await; 139 } 140 141 - let mut car_bytes = match encode_car_header(&head_cid) { 142 - Ok(h) => h, 143 Err(e) => { 144 - error!("Failed to encode CAR header: {}", e); 145 return ApiError::InternalError(None).into_response(); 146 } 147 }; 148 - let mut stack = vec![head_cid]; 149 - let mut visited = std::collections::HashSet::new(); 150 - let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 151 - while let Some(cid) = stack.pop() { 152 - if visited.contains(&cid) { 153 - continue; 154 - } 155 - visited.insert(cid); 156 - if remaining == 0 { 157 - break; 158 - } 159 - remaining -= 1; 160 - if let Ok(Some(block)) = state.block_store.get(&cid).await { 161 - let cid_bytes = cid.to_bytes(); 162 - let total_len = cid_bytes.len() + block.len(); 163 - let mut writer = Vec::new(); 164 - crate::sync::car::write_varint(&mut writer, total_len as u64) 165 - .expect("Writing to Vec<u8> should never fail"); 166 - writer 167 - .write_all(&cid_bytes) 168 - .expect("Writing to Vec<u8> should never fail"); 169 - writer 170 - .write_all(&block) 171 - .expect("Writing to Vec<u8> should never fail"); 172 - car_bytes.extend_from_slice(&writer); 173 - if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 174 - extract_links_ipld(&value, &mut stack); 175 - } 176 - } 177 - } 178 ( 179 StatusCode::OK, 180 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], ··· 273 car_bytes, 274 ) 275 .into_response() 276 - } 277 - 278 - fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 279 - match value { 280 - Ipld::Link(cid) => { 281 - stack.push(*cid); 282 - } 283 - Ipld::Map(map) => { 284 - for v in map.values() { 285 - extract_links_ipld(v, stack); 286 - } 287 - } 288 - Ipld::List(arr) => { 289 - for v in arr { 290 - extract_links_ipld(v, stack); 291 - } 292 - } 293 - _ => {} 294 - } 295 } 296 297 #[derive(Deserialize)]
··· 1 use crate::api::error::ApiError; 2 + use crate::scheduled::generate_repo_car_from_user_blocks; 3 use crate::state::AppState; 4 use crate::sync::car::encode_car_header; 5 use crate::sync::util::assert_repo_availability; ··· 9 response::{IntoResponse, Response}, 10 }; 11 use cid::Cid; 12 use jacquard_repo::storage::BlockStore; 13 use serde::Deserialize; 14 use std::io::Write; 15 use std::str::FromStr; 16 use tracing::error; 17 18 fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 19 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") ··· 136 return get_repo_since(&state, &query.did, &head_cid, since).await; 137 } 138 139 + let car_bytes = match generate_repo_car_from_user_blocks( 140 + &state.db, 141 + &state.block_store, 142 + account.user_id, 143 + &head_cid, 144 + ) 145 + .await 146 + { 147 + Ok(bytes) => bytes, 148 Err(e) => { 149 + error!("Failed to generate repo CAR: {}", e); 150 return ApiError::InternalError(None).into_response(); 151 } 152 }; 153 + 154 ( 155 StatusCode::OK, 156 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], ··· 249 car_bytes, 250 ) 251 .into_response() 252 } 253 254 #[derive(Deserialize)]
+8 -2
tests/account_lifecycle.rs
··· 93 let body3: Value = status3.json().await.unwrap(); 94 let after_delete_blocks = body3["repoBlocks"].as_i64().unwrap(); 95 assert!( 96 - after_delete_blocks >= after_create_blocks, 97 - "Block count should not decrease after deleting a record (was {}, now {})", 98 after_create_blocks, 99 after_delete_blocks 100 ); 101 }
··· 93 let body3: Value = status3.json().await.unwrap(); 94 let after_delete_blocks = body3["repoBlocks"].as_i64().unwrap(); 95 assert!( 96 + after_delete_blocks <= after_create_blocks, 97 + "Block count should decrease or stay same after deleting a record (was {}, now {})", 98 after_create_blocks, 99 + after_delete_blocks 100 + ); 101 + assert!( 102 + after_delete_blocks >= initial_blocks, 103 + "Block count after delete should be at least initial count (initial {}, now {})", 104 + initial_blocks, 105 after_delete_blocks 106 ); 107 }
+3 -3
tests/delete_account.rs
··· 174 .send() 175 .await 176 .expect("Failed to send delete request"); 177 - assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); 178 let body: Value = delete_res.json().await.unwrap(); 179 assert_eq!(body["error"], "InvalidToken"); 180 } ··· 228 .send() 229 .await 230 .expect("Failed to send delete request"); 231 - assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); 232 let body: Value = delete_res.json().await.unwrap(); 233 assert_eq!(body["error"], "ExpiredToken"); 234 } ··· 280 .send() 281 .await 282 .expect("Failed to send delete request"); 283 - assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); 284 let body: Value = delete_res.json().await.unwrap(); 285 assert_eq!(body["error"], "InvalidToken"); 286 }
··· 174 .send() 175 .await 176 .expect("Failed to send delete request"); 177 + assert_eq!(delete_res.status(), StatusCode::UNAUTHORIZED); 178 let body: Value = delete_res.json().await.unwrap(); 179 assert_eq!(body["error"], "InvalidToken"); 180 } ··· 228 .send() 229 .await 230 .expect("Failed to send delete request"); 231 + assert_eq!(delete_res.status(), StatusCode::UNAUTHORIZED); 232 let body: Value = delete_res.json().await.unwrap(); 233 assert_eq!(body["error"], "ExpiredToken"); 234 } ··· 280 .send() 281 .await 282 .expect("Failed to send delete request"); 283 + assert_eq!(delete_res.status(), StatusCode::UNAUTHORIZED); 284 let body: Value = delete_res.json().await.unwrap(); 285 assert_eq!(body["error"], "InvalidToken"); 286 }
+2 -2
tests/email_update.rs
··· 193 .send() 194 .await 195 .expect("Failed to attempt email update"); 196 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 197 let body: Value = res.json().await.expect("Invalid JSON"); 198 assert_eq!(body["error"], "InvalidToken"); 199 } ··· 390 .send() 391 .await 392 .expect("Failed to confirm email"); 393 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 394 let body: Value = res.json().await.expect("Invalid JSON"); 395 assert_eq!(body["error"], "InvalidToken"); 396 }
··· 193 .send() 194 .await 195 .expect("Failed to attempt email update"); 196 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 197 let body: Value = res.json().await.expect("Invalid JSON"); 198 assert_eq!(body["error"], "InvalidToken"); 199 } ··· 390 .send() 391 .await 392 .expect("Failed to confirm email"); 393 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 394 let body: Value = res.json().await.expect("Invalid JSON"); 395 assert_eq!(body["error"], "InvalidToken"); 396 }
+16 -7
tests/oauth.rs
··· 261 .to_string(); 262 } 263 assert!( 264 - location.starts_with(redirect_uri), 265 - "Redirect to wrong URI: {}", 266 location 267 ); 268 - assert!(location.contains("code="), "No code in redirect"); 269 assert!( 270 - location.contains(&format!("state={}", state)), 271 - "Wrong state" 272 ); 273 let code = location 274 .split("code=") ··· 527 ); 528 let twofa_body: Value = twofa_res.json().await.unwrap(); 529 let final_location = twofa_body["redirect_uri"].as_str().unwrap(); 530 - assert!(final_location.starts_with(redirect_uri) && final_location.contains("code=")); 531 let auth_code = final_location 532 .split("code=") 533 .nth(1) ··· 805 ); 806 let twofa_body: Value = twofa_res.json().await.unwrap(); 807 let final_location = twofa_body["redirect_uri"].as_str().unwrap(); 808 - assert!(final_location.starts_with(redirect_uri) && final_location.contains("code=")); 809 let final_code = final_location 810 .split("code=") 811 .nth(1)
··· 261 .to_string(); 262 } 263 assert!( 264 + location.contains("code="), 265 + "No code in redirect URI: {}", 266 location 267 ); 268 assert!( 269 + location.contains(&format!("state={}", state)) 270 + || location.contains(&format!("state%3D{}", state)), 271 + "Wrong state in redirect: {}", 272 + location 273 ); 274 let code = location 275 .split("code=") ··· 528 ); 529 let twofa_body: Value = twofa_res.json().await.unwrap(); 530 let final_location = twofa_body["redirect_uri"].as_str().unwrap(); 531 + assert!( 532 + final_location.contains("code="), 533 + "No code in redirect URI: {}", 534 + final_location 535 + ); 536 let auth_code = final_location 537 .split("code=") 538 .nth(1) ··· 810 ); 811 let twofa_body: Value = twofa_res.json().await.unwrap(); 812 let final_location = twofa_body["redirect_uri"].as_str().unwrap(); 813 + assert!( 814 + final_location.contains("code="), 815 + "No code in redirect URI: {}", 816 + final_location 817 + ); 818 let final_code = final_location 819 .split("code=") 820 .nth(1)
+2 -2
tests/password_reset.rs
··· 177 .send() 178 .await 179 .expect("Failed to reset password"); 180 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 181 let body: Value = res.json().await.expect("Invalid JSON"); 182 assert_eq!(body["error"], "InvalidToken"); 183 } ··· 241 .send() 242 .await 243 .expect("Failed to reset password"); 244 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 245 let body: Value = res.json().await.expect("Invalid JSON"); 246 assert_eq!(body["error"], "ExpiredToken"); 247 }
··· 177 .send() 178 .await 179 .expect("Failed to reset password"); 180 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 181 let body: Value = res.json().await.expect("Invalid JSON"); 182 assert_eq!(body["error"], "InvalidToken"); 183 } ··· 241 .send() 242 .await 243 .expect("Failed to reset password"); 244 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 245 let body: Value = res.json().await.expect("Invalid JSON"); 246 assert_eq!(body["error"], "ExpiredToken"); 247 }
+1 -1
tests/plc_operations.rs
··· 76 .send() 77 .await 78 .unwrap(); 79 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 80 let body: serde_json::Value = res.json().await.unwrap(); 81 assert!(body["error"] == "InvalidToken" || body["error"] == "ExpiredToken"); 82 }
··· 76 .send() 77 .await 78 .unwrap(); 79 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 80 let body: serde_json::Value = res.json().await.unwrap(); 81 assert!(body["error"] == "InvalidToken" || body["error"] == "ExpiredToken"); 82 }