at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] don't use downcast for handling ratelimited and transport errors

ptr.pet 2a4c0a2c c07a8ef5

verified
+429 -405
+429 -405
src/backfill/mod.rs
··· 16 16 use reqwest::StatusCode; 17 17 use smol_str::{SmolStr, ToSmolStr}; 18 18 use std::collections::HashMap; 19 + use std::fmt::Display; 19 20 use std::sync::Arc; 20 21 use std::sync::atomic::Ordering; 21 22 use std::time::{Duration, Instant}; ··· 118 119 119 120 tokio::spawn(async move { 120 121 let _guard = guard; 121 - Self::process_did_wrapper( 122 + did_task( 122 123 state, 123 124 http, 124 125 buffer_tx_clone, ··· 141 142 } 142 143 } 143 144 } 144 - 145 - async fn process_did_wrapper( 146 - state: Arc<AppState>, 147 - http: reqwest::Client, 148 - buffer_tx: BufferTx, 149 - did: Did<'static>, 150 - _permit: tokio::sync::OwnedSemaphorePermit, 151 - verify_signatures: bool, 152 - ) -> Result<()> { 153 - let db = &state.db; 145 + } 154 146 155 - match Self::process_did(&state, &http, &did, verify_signatures).await { 156 - Ok(previous_state) => { 157 - let did_key = keys::repo_key(&did); 147 + async fn did_task( 148 + state: Arc<AppState>, 149 + http: reqwest::Client, 150 + buffer_tx: BufferTx, 151 + did: Did<'static>, 152 + _permit: tokio::sync::OwnedSemaphorePermit, 153 + verify_signatures: bool, 154 + ) -> Result<()> { 155 + let db = &state.db; 158 156 159 - let was_pending = matches!(previous_state.status, RepoStatus::Backfilling); 160 - let was_resync = matches!( 161 - previous_state.status, 162 - RepoStatus::Error(_) 163 - | RepoStatus::Deactivated 164 - | RepoStatus::Takendown 165 - | RepoStatus::Suspended 166 - ); 157 + match process_did(&state, &http, &did, verify_signatures).await { 158 + Ok(previous_state) => { 159 + let did_key = keys::repo_key(&did); 167 160 168 - let mut batch = db.inner.batch(); 169 - // remove from pending 170 - if was_pending { 171 - batch.remove(&db.pending, &did_key); 172 - } 173 - // remove from resync 174 - if was_resync { 175 - batch.remove(&db.resync, &did_key); 176 - } 177 - tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 178 - .await 179 - .into_diagnostic()??; 180 - if was_pending { 181 - state.db.update_count_async("pending", -1).await; 182 - } 183 - if was_resync { 184 - state.db.update_count_async("resync", -1).await; 185 - } 161 + let was_pending = matches!(previous_state.status, RepoStatus::Backfilling); 162 + let was_resync = matches!( 163 + previous_state.status, 164 + RepoStatus::Error(_) 165 + | RepoStatus::Deactivated 166 + | RepoStatus::Takendown 167 + | RepoStatus::Suspended 168 + ); 186 169 187 - let state_for_persist = state.clone(); 188 - tokio::task::spawn_blocking(move || { 189 - state_for_persist 190 - .db 191 - .inner 192 - .persist(fjall::PersistMode::Buffer) 193 - .into_diagnostic() 194 - }) 170 + let mut batch = db.inner.batch(); 171 + // remove from pending 172 + if was_pending { 173 + batch.remove(&db.pending, &did_key); 174 + } 175 + // remove from resync 176 + if was_resync { 177 + batch.remove(&db.resync, &did_key); 178 + } 179 + tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 195 180 .await 196 181 .into_diagnostic()??; 182 + if was_pending { 183 + state.db.update_count_async("pending", -1).await; 184 + } 185 + if was_resync { 186 + state.db.update_count_async("resync", -1).await; 187 + } 197 188 198 - // Notify completion to worker shard 199 - if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) { 200 - error!("failed to send BackfillFinished for {did}: {e}"); 189 + let state = state.clone(); 190 + tokio::task::spawn_blocking(move || { 191 + state 192 + .db 193 + .inner 194 + .persist(fjall::PersistMode::Buffer) 195 + .into_diagnostic() 196 + }) 197 + .await 198 + .into_diagnostic()??; 199 + 200 + // Notify completion to worker shard 201 + if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) { 202 + error!("failed to send BackfillFinished for {did}: {e}"); 203 + } 204 + } 205 + Err(e) => { 206 + let mut was_ratelimited = false; 207 + match &e { 208 + BackfillError::Ratelimited => { 209 + was_ratelimited = true; 210 + debug!("failed for {did}: too many requests"); 211 + } 212 + BackfillError::Transport(reason) => { 213 + error!("failed for {did}: transport error: {reason}"); 214 + } 215 + BackfillError::Generic(e) => { 216 + error!("failed for {did}: {e}"); 201 217 } 202 218 } 203 - Err(e) => { 204 - let mut was_ratelimited = false; 205 - 'err: { 206 - let Some(e) = e.downcast_ref::<ClientError>() else { 207 - error!("failed for {did}: {e}"); 208 - break 'err; 209 - }; 210 219 211 - match e.kind() { 212 - ClientErrorKind::Http { 213 - status: StatusCode::TOO_MANY_REQUESTS, 214 - } => { 215 - debug!("failed for {did}: too many requests"); 216 - was_ratelimited = true; 217 - break 'err; 218 - } 219 - ClientErrorKind::Transport => { 220 - let reason = e.source_err().unwrap(); 221 - error!("failed for {did}: {e}: {reason}"); 222 - break 'err; 223 - } 224 - _ => { 225 - error!("failed for {did}: {e}"); 226 - break 'err; 227 - } 228 - } 229 - } 220 + let did_key = keys::repo_key(&did); 230 221 231 - let did_key = keys::repo_key(&did); 222 + // 1. get current retry count 223 + let mut resync_state = Db::get(db.resync.clone(), &did_key) 224 + .await 225 + .and_then(|b| { 226 + b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic()) 227 + .transpose() 228 + })? 229 + .and_then(|s| { 230 + matches!(s, ResyncState::Gone { .. }) 231 + .then_some(None) 232 + .unwrap_or(Some(s)) 233 + }) 234 + .unwrap_or_else(|| ResyncState::Error { 235 + message: SmolStr::new_static(""), 236 + retry_count: 0, 237 + next_retry: 0, 238 + }); 232 239 233 - // 1. get current retry count 234 - let mut resync_state = Db::get(db.resync.clone(), &did_key) 235 - .await 236 - .and_then(|b| { 237 - b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic()) 238 - .transpose() 239 - })? 240 - .and_then(|s| { 241 - matches!(s, ResyncState::Gone { .. }) 242 - .then_some(None) 243 - .unwrap_or(Some(s)) 244 - }) 245 - .unwrap_or_else(|| ResyncState::Error { 246 - message: SmolStr::new_static(""), 247 - retry_count: 0, 248 - next_retry: 0, 249 - }); 240 + let ResyncState::Error { 241 + message, 242 + retry_count, 243 + next_retry, 244 + } = &mut resync_state 245 + else { 246 + unreachable!("we handled the gone case above"); 247 + }; 250 248 251 - let ResyncState::Error { 252 - message, 253 - retry_count, 254 - next_retry, 255 - } = &mut resync_state 256 - else { 257 - unreachable!("we handled the gone case above"); 258 - }; 249 + // 2. calculate backoff and update the other fields 250 + *retry_count += was_ratelimited.then_some(3).unwrap_or(1); 251 + *next_retry = ResyncState::next_backoff(*retry_count); 252 + *message = e.to_smolstr(); 259 253 260 - // 2. calculate backoff and update the other fields 261 - *retry_count += was_ratelimited.then_some(3).unwrap_or(1); 262 - *next_retry = ResyncState::next_backoff(*retry_count); 263 - *message = e.to_smolstr(); 254 + tokio::task::spawn_blocking({ 255 + let state = state.clone(); 256 + let did_key = did_key.into_static(); 257 + move || { 258 + // 3. save to resync 259 + let serialized_resync_state = 260 + rmp_serde::to_vec(&resync_state).into_diagnostic()?; 264 261 265 - tokio::task::spawn_blocking({ 266 - let state = state.clone(); 267 - let did_key = did_key.into_static(); 268 - move || { 269 - // 3. save to resync 270 - let serialized_resync_state = 271 - rmp_serde::to_vec(&resync_state).into_diagnostic()?; 272 - 273 - // 4. and update the main repo state 274 - let serialized_repo_state = if let Some(state_bytes) = 275 - state.db.repos.get(&did_key).into_diagnostic()? 276 - { 277 - let mut state: RepoState = 278 - rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 279 - state.status = RepoStatus::Error(e.to_string().into()); 280 - Some(rmp_serde::to_vec(&state).into_diagnostic()?) 281 - } else { 282 - None 283 - }; 262 + // 4. and update the main repo state 263 + let serialized_repo_state = if let Some(state_bytes) = 264 + state.db.repos.get(&did_key).into_diagnostic()? 265 + { 266 + let mut state: RepoState = 267 + rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 268 + state.status = RepoStatus::Error(e.to_string().into()); 269 + Some(rmp_serde::to_vec(&state).into_diagnostic()?) 270 + } else { 271 + None 272 + }; 284 273 285 - let mut batch = state.db.inner.batch(); 286 - batch.insert(&state.db.resync, &did_key, serialized_resync_state); 287 - batch.remove(&state.db.pending, &did_key); 288 - if let Some(state_bytes) = serialized_repo_state { 289 - batch.insert(&state.db.repos, &did_key, state_bytes); 290 - } 291 - batch.commit().into_diagnostic() 274 + let mut batch = state.db.inner.batch(); 275 + batch.insert(&state.db.resync, &did_key, serialized_resync_state); 276 + batch.remove(&state.db.pending, &did_key); 277 + if let Some(state_bytes) = serialized_repo_state { 278 + batch.insert(&state.db.repos, &did_key, state_bytes); 292 279 } 293 - }) 294 - .await 295 - .into_diagnostic()??; 280 + batch.commit().into_diagnostic() 281 + } 282 + }) 283 + .await 284 + .into_diagnostic()??; 296 285 297 - state.db.update_count_async("resync", 1).await; 298 - state.db.update_count_async("pending", -1).await; 299 - } 286 + state.db.update_count_async("resync", 1).await; 287 + state.db.update_count_async("pending", -1).await; 300 288 } 301 - 302 - // wake worker to pick up more 303 - state.backfill_notify.notify_one(); 304 - Ok(()) 305 289 } 306 290 307 - async fn process_did<'i>( 308 - app_state: &Arc<AppState>, 309 - http: &reqwest::Client, 310 - did: &Did<'static>, 311 - verify_signatures: bool, 312 - ) -> Result<RepoState<'static>> { 313 - debug!("backfilling {}", did); 314 - 315 - let db = &app_state.db; 316 - let did_key = keys::repo_key(did); 317 - let state_bytes = Db::get(db.repos.clone(), did_key) 318 - .await? 319 - .ok_or_else(|| miette::miette!("!!!THIS IS A BUG!!! repo state for {did} missing"))?; 320 - let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes) 321 - .into_diagnostic()? 322 - .into_static(); 323 - let previous_state = state.clone(); 291 + // wake worker to pick up more 292 + state.backfill_notify.notify_one(); 293 + Ok(()) 294 + } 324 295 325 - // 1. resolve pds 326 - let start = Instant::now(); 327 - let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?; 328 - trace!( 329 - "resolved {did} to pds {pds_url} handle {handle:?} in {:?}", 330 - start.elapsed() 331 - ); 296 + enum BackfillError { 297 + Generic(miette::Report), 298 + Ratelimited, 299 + Transport(SmolStr), 300 + } 332 301 333 - if let Some(h) = handle { 334 - state.handle = Some(h.to_smolstr()); 302 + impl From<ClientError> for BackfillError { 303 + fn from(e: ClientError) -> Self { 304 + match e.kind() { 305 + ClientErrorKind::Http { 306 + status: StatusCode::TOO_MANY_REQUESTS, 307 + } => Self::Ratelimited, 308 + ClientErrorKind::Transport => Self::Transport( 309 + e.source_err() 310 + .expect("transport error without source") 311 + .to_smolstr(), 312 + ), 313 + _ => Self::Generic(e.into()), 335 314 } 336 - 337 - let emit_identity = |status: &RepoStatus| { 338 - let evt = AccountEvt { 339 - did: did.clone(), 340 - active: !matches!( 341 - status, 342 - RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 343 - ), 344 - status: Some( 345 - match status { 346 - RepoStatus::Deactivated => "deactivated", 347 - RepoStatus::Takendown => "takendown", 348 - RepoStatus::Suspended => "suspended", 349 - _ => "active", 350 - } 351 - .into(), 352 - ), 353 - }; 354 - let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt)); 355 - }; 356 - 357 - // 2. fetch repo (car) 358 - let start = Instant::now(); 359 - let req = GetRepo::new().did(did.clone()).build(); 360 - let resp = http.xrpc(pds_url).send(&req).await?; 315 + } 316 + } 361 317 362 - let car_bytes = match resp.into_output() { 363 - Ok(o) => o, 364 - Err(XrpcError::Xrpc(e)) => { 365 - if matches!(e, GetRepoError::RepoNotFound(_)) { 366 - warn!("repo {did} not found, deleting"); 367 - let mut batch = db.inner.batch(); 368 - ops::delete_repo(&mut batch, db, did)?; 369 - batch.commit().into_diagnostic()?; 370 - return Ok(previous_state); // stop backfill 371 - } 318 + impl From<miette::Report> for BackfillError { 319 + fn from(e: miette::Report) -> Self { 320 + Self::Generic(e) 321 + } 322 + } 372 323 373 - let inactive_status = match e { 374 - GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated), 375 - GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown), 376 - GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended), 377 - _ => None, 378 - }; 324 + impl Display for BackfillError { 325 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 326 + match self { 327 + BackfillError::Generic(e) => e.fmt(f), 328 + BackfillError::Ratelimited => write!(f, "too many requests"), 329 + BackfillError::Transport(reason) => write!(f, "transport error: {reason}"), 330 + } 331 + } 332 + } 379 333 380 - if let Some(status) = inactive_status { 381 - warn!("repo {did} is {status:?}, stopping backfill"); 334 + async fn process_did<'i>( 335 + app_state: &Arc<AppState>, 336 + http: &reqwest::Client, 337 + did: &Did<'static>, 338 + verify_signatures: bool, 339 + ) -> Result<RepoState<'static>, BackfillError> { 340 + debug!("backfilling {}", did); 382 341 383 - emit_identity(&status); 342 + let db = &app_state.db; 343 + let did_key = keys::repo_key(did); 344 + let state_bytes = Db::get(db.repos.clone(), did_key) 345 + .await? 346 + .ok_or_else(|| miette::miette!("!!!THIS IS A BUG!!! repo state for {did} missing"))?; 347 + let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes) 348 + .into_diagnostic()? 349 + .into_static(); 350 + let previous_state = state.clone(); 384 351 385 - let resync_state = ResyncState::Gone { 386 - status: status.clone(), 387 - }; 388 - let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?; 352 + // 1. resolve pds 353 + let start = Instant::now(); 354 + let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?; 355 + trace!( 356 + "resolved {did} to pds {pds_url} handle {handle:?} in {:?}", 357 + start.elapsed() 358 + ); 389 359 390 - let app_state_clone = app_state.clone(); 391 - app_state 392 - .db 393 - .update_repo_state_async(did, move |state, (key, batch)| { 394 - state.status = status; 395 - batch.insert(&app_state_clone.db.resync, key, resync_bytes); 396 - Ok((true, ())) 397 - }) 398 - .await?; 360 + if let Some(h) = handle { 361 + state.handle = Some(h.to_smolstr()); 362 + } 399 363 400 - // return success so wrapper stops retrying 401 - return Ok(previous_state); 364 + let emit_identity = |status: &RepoStatus| { 365 + let evt = AccountEvt { 366 + did: did.clone(), 367 + active: !matches!( 368 + status, 369 + RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 370 + ), 371 + status: Some( 372 + match status { 373 + RepoStatus::Deactivated => "deactivated", 374 + RepoStatus::Takendown => "takendown", 375 + RepoStatus::Suspended => "suspended", 376 + _ => "active", 402 377 } 378 + .into(), 379 + ), 380 + }; 381 + let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt)); 382 + }; 403 383 404 - return Err(e).into_diagnostic(); 384 + // 2. fetch repo (car) 385 + let start = Instant::now(); 386 + let req = GetRepo::new().did(did.clone()).build(); 387 + let resp = http.xrpc(pds_url).send(&req).await?; 388 + 389 + let car_bytes = match resp.into_output() { 390 + Ok(o) => o, 391 + Err(XrpcError::Xrpc(e)) => { 392 + if matches!(e, GetRepoError::RepoNotFound(_)) { 393 + warn!("repo {did} not found, deleting"); 394 + let mut batch = db.inner.batch(); 395 + ops::delete_repo(&mut batch, db, did)?; 396 + batch.commit().into_diagnostic()?; 397 + return Ok(previous_state); // stop backfill 405 398 } 406 - Err(e) => return Err(e).into_diagnostic(), 407 - }; 408 399 409 - // emit identity event so any consumers know 410 - emit_identity(&state.status); 400 + let inactive_status = match e { 401 + GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated), 402 + GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown), 403 + GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended), 404 + _ => None, 405 + }; 411 406 412 - trace!( 413 - "fetched {} bytes for {did} in {:?}", 414 - car_bytes.body.len(), 415 - start.elapsed() 416 - ); 407 + if let Some(status) = inactive_status { 408 + warn!("repo {did} is {status:?}, stopping backfill"); 417 409 418 - // 3. import repo 419 - let start = Instant::now(); 420 - let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 421 - .await 422 - .into_diagnostic()?; 423 - trace!("parsed car for {did} in {:?}", start.elapsed()); 410 + emit_identity(&status); 424 411 425 - let start = Instant::now(); 426 - let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 427 - trace!( 428 - "stored {} blocks in memory for {did} in {:?}", 429 - store.len(), 430 - start.elapsed() 431 - ); 412 + let resync_state = ResyncState::Gone { 413 + status: status.clone(), 414 + }; 415 + let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?; 432 416 433 - // 4. parse root commit to get mst root 434 - let root_bytes = store 435 - .get(&parsed.root) 436 - .await 437 - .into_diagnostic()? 438 - .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 417 + let app_state_clone = app_state.clone(); 418 + app_state 419 + .db 420 + .update_repo_state_async(did, move |state, (key, batch)| { 421 + state.status = status; 422 + batch.insert(&app_state_clone.db.resync, key, resync_bytes); 423 + Ok((true, ())) 424 + }) 425 + .await?; 439 426 440 - let root_commit = 441 - jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?; 442 - debug!( 443 - "backfilling repo at revision {}, root cid {}", 444 - root_commit.rev, root_commit.data 445 - ); 427 + // return success so wrapper stops retrying 428 + return Ok(previous_state); 429 + } 446 430 447 - // 4.5. verify commit signature 448 - if verify_signatures { 449 - let pubkey = app_state.resolver.resolve_signing_key(did).await?; 450 - root_commit 451 - .verify(&pubkey) 452 - .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 453 - trace!("signature verified for {did}"); 431 + Err(e).into_diagnostic()? 454 432 } 433 + Err(e) => Err(e).into_diagnostic()?, 434 + }; 455 435 456 - // 5. walk mst 457 - let start = Instant::now(); 458 - let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); 459 - let leaves = mst.leaves().await.into_diagnostic()?; 460 - trace!("walked mst for {did} in {}", start.elapsed().as_secs_f64()); 436 + // emit identity event so any consumers know 437 + emit_identity(&state.status); 461 438 462 - // 6. insert records into db 463 - let start = Instant::now(); 464 - let (_state, records_cnt_delta, added_blocks, count) = { 465 - let app_state = app_state.clone(); 466 - let did = did.clone(); 467 - let rev = root_commit.rev; 439 + trace!( 440 + "fetched {} bytes for {did} in {:?}", 441 + car_bytes.body.len(), 442 + start.elapsed() 443 + ); 468 444 469 - tokio::task::spawn_blocking(move || { 470 - let mut count = 0; 471 - let mut delta = 0; 472 - let mut added_blocks = 0; 473 - let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 474 - let mut batch = app_state.db.inner.batch(); 475 - let store = mst.storage(); 445 + // 3. import repo 446 + let start = Instant::now(); 447 + let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 448 + .await 449 + .into_diagnostic()?; 450 + trace!("parsed car for {did} in {:?}", start.elapsed()); 476 451 477 - let prefix = keys::record_prefix(&did); 478 - let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 452 + let start = Instant::now(); 453 + let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 454 + trace!( 455 + "stored {} blocks in memory for {did} in {:?}", 456 + store.len(), 457 + start.elapsed() 458 + ); 479 459 480 - let mut partitions = Vec::new(); 481 - app_state.db.record_partitions.iter_sync(|col, ks| { 482 - partitions.push((col.clone(), ks.clone())); 483 - true 484 - }); 460 + // 4. parse root commit to get mst root 461 + let root_bytes = store 462 + .get(&parsed.root) 463 + .await 464 + .into_diagnostic()? 465 + .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 485 466 486 - for (col_name, ks) in partitions { 487 - for guard in ks.prefix(&prefix) { 488 - let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 489 - let rkey: DbRkey = 490 - rmp_serde::from_slice(&key[prefix.len()..]).into_diagnostic()?; 491 - let cid = if let Ok(c) = cid::Cid::read_bytes(cid_bytes.as_ref()) { 492 - c.to_string().to_smolstr() 493 - } else { 494 - error!("invalid cid for {did}: {cid_bytes:?}"); 495 - continue; 496 - }; 467 + let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?; 468 + debug!( 469 + "backfilling repo at revision {}, root cid {}", 470 + root_commit.rev, root_commit.data 471 + ); 497 472 498 - existing_cids.insert((col_name.as_str().into(), rkey), cid); 499 - } 500 - } 473 + // 4.5. verify commit signature 474 + if verify_signatures { 475 + let pubkey = app_state.resolver.resolve_signing_key(did).await?; 476 + root_commit 477 + .verify(&pubkey) 478 + .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 479 + trace!("signature verified for {did}"); 480 + } 501 481 502 - for (key, cid) in leaves { 503 - let val_bytes = tokio::runtime::Handle::current() 504 - .block_on(store.get(&cid)) 505 - .into_diagnostic()?; 482 + // 5. walk mst 483 + let start = Instant::now(); 484 + let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); 485 + let leaves = mst.leaves().await.into_diagnostic()?; 486 + trace!("walked mst for {did} in {}", start.elapsed().as_secs_f64()); 487 + 488 + // 6. insert records into db 489 + let start = Instant::now(); 490 + let (_state, records_cnt_delta, added_blocks, count) = { 491 + let app_state = app_state.clone(); 492 + let did = did.clone(); 493 + let rev = root_commit.rev; 494 + 495 + tokio::task::spawn_blocking(move || { 496 + let mut count = 0; 497 + let mut delta = 0; 498 + let mut added_blocks = 0; 499 + let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 500 + let mut batch = app_state.db.inner.batch(); 501 + let store = mst.storage(); 502 + 503 + let prefix = keys::record_prefix(&did); 504 + let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 505 + 506 + let mut partitions = Vec::new(); 507 + app_state.db.record_partitions.iter_sync(|col, ks| { 508 + partitions.push((col.clone(), ks.clone())); 509 + true 510 + }); 506 511 507 - if let Some(val) = val_bytes { 508 - let (collection, rkey) = ops::parse_path(&key)?; 509 - let rkey = DbRkey::new(rkey); 510 - let path = (collection.to_smolstr(), rkey.clone()); 511 - let cid_obj = Cid::ipld(cid); 512 - let partition = app_state.db.record_partition(collection)?; 512 + for (col_name, ks) in partitions { 513 + for guard in ks.prefix(&prefix) { 514 + let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 515 + let rkey: DbRkey = 516 + rmp_serde::from_slice(&key[prefix.len()..]).into_diagnostic()?; 517 + let cid = if let Ok(c) = cid::Cid::read_bytes(cid_bytes.as_ref()) { 518 + c.to_string().to_smolstr() 519 + } else { 520 + error!("invalid cid for {did}: {cid_bytes:?}"); 521 + continue; 522 + }; 513 523 514 - // check if this record already exists with same CID 515 - let (action, is_new) = 516 - if let Some(existing_cid) = existing_cids.remove(&path) { 517 - if existing_cid == cid_obj.as_str() { 518 - debug!("skip {did}/{collection}/{rkey} ({cid})"); 519 - continue; // skip unchanged record 520 - } 521 - (DbAction::Update, false) 522 - } else { 523 - (DbAction::Create, true) 524 - }; 525 - debug!("{action} {did}/{collection}/{rkey} ({cid})"); 524 + existing_cids.insert((col_name.as_str().into(), rkey), cid); 525 + } 526 + } 526 527 527 - // Key is just did|rkey 528 - let db_key = keys::record_key(&did, &rkey); 528 + for (key, cid) in leaves { 529 + let val_bytes = tokio::runtime::Handle::current() 530 + .block_on(store.get(&cid)) 531 + .into_diagnostic()?; 529 532 530 - batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 531 - batch.insert(&partition, db_key, cid.to_bytes()); 533 + if let Some(val) = val_bytes { 534 + let (collection, rkey) = ops::parse_path(&key)?; 535 + let rkey = DbRkey::new(rkey); 536 + let path = (collection.to_smolstr(), rkey.clone()); 537 + let cid_obj = Cid::ipld(cid); 538 + let partition = app_state.db.record_partition(collection)?; 532 539 533 - added_blocks += 1; 534 - if is_new { 535 - delta += 1; 536 - *collection_counts.entry(path.0.clone()).or_default() += 1; 540 + // check if this record already exists with same CID 541 + let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) { 542 + if existing_cid == cid_obj.as_str() { 543 + debug!("skip {did}/{collection}/{rkey} ({cid})"); 544 + continue; // skip unchanged record 537 545 } 538 - 539 - let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 540 - let evt = StoredEvent { 541 - live: false, 542 - did: TrimmedDid::from(&did), 543 - rev: DbTid::from(&rev), 544 - collection: CowStr::Borrowed(collection), 545 - rkey, 546 - action, 547 - cid: Some(cid_obj.to_ipld().expect("valid cid")), 548 - }; 549 - let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 550 - batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 546 + (DbAction::Update, false) 547 + } else { 548 + (DbAction::Create, true) 549 + }; 550 + debug!("{action} {did}/{collection}/{rkey} ({cid})"); 551 551 552 - count += 1; 553 - } 554 - } 552 + // Key is just did|rkey 553 + let db_key = keys::record_key(&did, &rkey); 555 554 556 - // remove any remaining existing records (they weren't in the new MST) 557 - for ((collection, rkey), cid) in existing_cids { 558 - debug!("remove {did}/{collection}/{rkey} ({cid})"); 559 - let partition = app_state.db.record_partition(collection.as_str())?; 555 + batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 556 + batch.insert(&partition, db_key, cid.to_bytes()); 560 557 561 - batch.remove(&partition, keys::record_key(&did, &rkey)); 558 + added_blocks += 1; 559 + if is_new { 560 + delta += 1; 561 + *collection_counts.entry(path.0.clone()).or_default() += 1; 562 + } 562 563 563 564 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 564 565 let evt = StoredEvent { 565 566 live: false, 566 567 did: TrimmedDid::from(&did), 567 568 rev: DbTid::from(&rev), 568 - collection: CowStr::Borrowed(&collection), 569 + collection: CowStr::Borrowed(collection), 569 570 rkey, 570 - action: DbAction::Delete, 571 - cid: None, 571 + action, 572 + cid: Some(cid_obj.to_ipld().expect("valid cid")), 572 573 }; 573 574 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 574 575 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 575 576 576 - delta -= 1; 577 577 count += 1; 578 578 } 579 + } 579 580 580 - // 6. update data, status is updated in worker shard 581 - state.rev = Some((&rev).into()); 582 - state.data = Some(root_commit.data); 583 - state.last_updated_at = chrono::Utc::now().timestamp(); 581 + // remove any remaining existing records (they weren't in the new MST) 582 + for ((collection, rkey), cid) in existing_cids { 583 + debug!("remove {did}/{collection}/{rkey} ({cid})"); 584 + let partition = app_state.db.record_partition(collection.as_str())?; 584 585 585 - batch.insert( 586 - &app_state.db.repos, 587 - keys::repo_key(&did), 588 - ser_repo_state(&state)?, 589 - ); 586 + batch.remove(&partition, keys::record_key(&did, &rkey)); 590 587 591 - // add the counts 592 - for (col, cnt) in collection_counts { 593 - db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt); 594 - } 588 + let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 589 + let evt = StoredEvent { 590 + live: false, 591 + did: TrimmedDid::from(&did), 592 + rev: DbTid::from(&rev), 593 + collection: CowStr::Borrowed(&collection), 594 + rkey, 595 + action: DbAction::Delete, 596 + cid: None, 597 + }; 598 + let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 599 + batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 595 600 596 - batch.commit().into_diagnostic()?; 601 + delta -= 1; 602 + count += 1; 603 + } 597 604 598 - Ok::<_, miette::Report>((state, delta, added_blocks, count)) 599 - }) 600 - .await 601 - .into_diagnostic()?? 602 - }; 603 - trace!("did {count} ops for {did} in {:?}", start.elapsed()); 605 + // 6. update data, status is updated in worker shard 606 + state.rev = Some((&rev).into()); 607 + state.data = Some(root_commit.data); 608 + state.last_updated_at = chrono::Utc::now().timestamp(); 604 609 605 - // do the counts 606 - if records_cnt_delta > 0 { 607 - app_state 608 - .db 609 - .update_count_async("records", records_cnt_delta) 610 - .await; 611 - app_state 612 - .db 613 - .update_count_async("blocks", added_blocks) 614 - .await; 615 - } 616 - trace!( 617 - "committed backfill batch for {did} in {:?}", 618 - start.elapsed() 619 - ); 610 + batch.insert( 611 + &app_state.db.repos, 612 + keys::repo_key(&did), 613 + ser_repo_state(&state)?, 614 + ); 620 615 621 - let _ = db.event_tx.send(BroadcastEvent::Persisted( 622 - db.next_event_id.load(Ordering::SeqCst) - 1, 623 - )); 616 + // add the counts 617 + for (col, cnt) in collection_counts { 618 + db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt); 619 + } 620 + 621 + batch.commit().into_diagnostic()?; 624 622 625 - // buffer processing is handled by BufferProcessor when blocked flag is cleared 626 - debug!("backfill complete for {did}"); 627 - Ok(previous_state) 623 + Ok::<_, miette::Report>((state, delta, added_blocks, count)) 624 + }) 625 + .await 626 + .into_diagnostic()?? 627 + }; 628 + trace!("did {count} ops for {did} in {:?}", start.elapsed()); 629 + 630 + // do the counts 631 + if records_cnt_delta > 0 { 632 + app_state 633 + .db 634 + .update_count_async("records", records_cnt_delta) 635 + .await; 636 + app_state 637 + .db 638 + .update_count_async("blocks", added_blocks) 639 + .await; 628 640 } 641 + trace!( 642 + "committed backfill batch for {did} in {:?}", 643 + start.elapsed() 644 + ); 645 + 646 + let _ = db.event_tx.send(BroadcastEvent::Persisted( 647 + db.next_event_id.load(Ordering::SeqCst) - 1, 648 + )); 649 + 650 + // buffer processing is handled by BufferProcessor when blocked flag is cleared 651 + debug!("backfill complete for {did}"); 652 + Ok(previous_state) 629 653 }