at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[db] use random index_id for pending keys

- remove did from RepoState, add index_id field
- change handle type from SmolStr to Handle
- pending queue now uses random u64 keys with DID stored in value
- rework resync manager to use batched commits and ops::update_repo_status
- remove repo_cache and deleted set from ingest worker
- add gauge update for account status transitions

ptr.pet 452a704d 088eafdc

verified
+195 -135
+18 -16
src/api/repo.rs
··· 1 use crate::api::AppState; 2 use crate::db::{Db, keys, ser_repo_state}; 3 use crate::types::{GaugeState, RepoState}; 4 use axum::{Json, Router, extract::State, http::StatusCode, routing::post}; 5 use jacquard::types::did::Did; 6 use serde::Deserialize; 7 use std::sync::Arc; 8 ··· 33 .await 34 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 35 { 36 - let repo_state = RepoState::backfilling(&did); 37 let bytes = ser_repo_state(&repo_state) 38 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 39 40 batch.insert(&db.repos, &did_key, bytes); 41 - batch.insert(&db.pending, &did_key, Vec::new()); 42 43 added += 1; 44 } ··· 88 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 89 90 let was_pending = matches!(repo_state.status, crate::types::RepoStatus::Backfilling); 91 - let _was_resync = matches!( 92 - repo_state.status, 93 - crate::types::RepoStatus::Error(_) 94 - | crate::types::RepoStatus::Deactivated 95 - | crate::types::RepoStatus::Takendown 96 - | crate::types::RepoStatus::Suspended 97 - ); 98 99 let old_gauge = if was_pending { 100 GaugeState::Pending ··· 115 GaugeState::Synced 116 }; 117 118 - batch.remove(&db.repos, &did_key); 119 - if was_pending { 120 - batch.remove(&db.pending, &did_key); 121 - } 122 - if old_gauge.is_resync() { 123 - batch.remove(&db.resync, &did_key); 124 - } 125 126 state 127 .db
··· 1 use crate::api::AppState; 2 use crate::db::{Db, keys, ser_repo_state}; 3 + use crate::ops; 4 use crate::types::{GaugeState, RepoState}; 5 use axum::{Json, Router, extract::State, http::StatusCode, routing::post}; 6 use jacquard::types::did::Did; 7 + use rand::Rng; 8 use serde::Deserialize; 9 use std::sync::Arc; 10 ··· 35 .await 36 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 37 { 38 + let repo_state = RepoState::backfilling(rand::rng().next_u64()); 39 let bytes = ser_repo_state(&repo_state) 40 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 41 42 batch.insert(&db.repos, &did_key, bytes); 43 + batch.insert( 44 + &db.pending, 45 + keys::pending_key(repo_state.index_id), 46 + &did_key, 47 + ); 48 49 added += 1; 50 } ··· 94 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 95 96 let was_pending = matches!(repo_state.status, crate::types::RepoStatus::Backfilling); 97 + // todo: idk 98 + // let was_resync = matches!( 99 + // repo_state.status, 100 + // crate::types::RepoStatus::Error(_) 101 + // | crate::types::RepoStatus::Deactivated 102 + // | crate::types::RepoStatus::Takendown 103 + // | crate::types::RepoStatus::Suspended 104 + // ); 105 106 let old_gauge = if was_pending { 107 GaugeState::Pending ··· 122 GaugeState::Synced 123 }; 124 125 + ops::delete_repo(&mut batch, db, &did, repo_state) 126 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 127 128 state 129 .db
+85 -31
src/backfill/manager.rs
··· 1 use crate::db::types::TrimmedDid; 2 - use crate::db::{self, deser_repo_state, keys, ser_repo_state}; 3 use crate::state::AppState; 4 use crate::types::{RepoStatus, ResyncState}; 5 use miette::{IntoDiagnostic, Result}; ··· 11 debug!("scanning for deactivated/takendown repos to retry..."); 12 let mut count = 0; 13 14 for guard in state.db.resync.iter() { 15 let (key, val) = guard.into_inner().into_diagnostic()?; 16 let did = match TrimmedDid::try_from(key.as_ref()) { ··· 25 if matches!(resync_state, ResyncState::Gone { .. }) { 26 debug!("queuing retry for gone repo: {did}"); 27 28 - // move back to pending 29 - let mut batch = state.db.inner.batch(); 30 - batch.remove(&state.db.resync, key.clone()); 31 - batch.insert(&state.db.pending, key.clone(), Vec::new()); 32 33 // update repo state back to backfilling 34 - if let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? { 35 - let mut repo_state = deser_repo_state(&state_bytes)?; 36 - repo_state.status = RepoStatus::Backfilling; 37 - batch.insert(&state.db.repos, key, ser_repo_state(&repo_state)?); 38 - } 39 - 40 - batch.commit().into_diagnostic()?; 41 - state.db.update_count("resync", -1); 42 - state.db.update_count("pending", 1); 43 44 - state.notify_backfill(); 45 count += 1; 46 } 47 } 48 } 49 50 info!("queued {count} gone backfills"); 51 Ok(()) 52 } ··· 60 61 let now = chrono::Utc::now().timestamp(); 62 let mut count = 0; 63 64 for guard in db.resync.iter() { 65 let (key, value) = match guard.into_inner() { ··· 78 } 79 }; 80 81 - if let Ok(ResyncState::Error { next_retry, .. }) = 82 - rmp_serde::from_slice::<ResyncState>(&value) 83 - { 84 - if next_retry <= now { 85 - debug!("retrying backfill for {did}"); 86 87 - // move back to pending 88 - if let Err(e) = db.pending.insert(keys::repo_key(&did), Vec::new()) { 89 - error!("failed to move {did} to pending: {e}"); 90 - db::check_poisoned(&e); 91 - continue; 92 - } 93 - state.db.update_count("pending", 1); 94 95 - state.notify_backfill(); 96 - count += 1; 97 } 98 } 99 } 100 101 - if count > 0 { 102 - info!("queued {count} retries"); 103 } 104 } 105 }
··· 1 use crate::db::types::TrimmedDid; 2 + use crate::db::{self, deser_repo_state}; 3 + use crate::ops; 4 use crate::state::AppState; 5 use crate::types::{RepoStatus, ResyncState}; 6 use miette::{IntoDiagnostic, Result}; ··· 12 debug!("scanning for deactivated/takendown repos to retry..."); 13 let mut count = 0; 14 15 + let mut batch = state.db.inner.batch(); 16 + 17 for guard in state.db.resync.iter() { 18 let (key, val) = guard.into_inner().into_diagnostic()?; 19 let did = match TrimmedDid::try_from(key.as_ref()) { ··· 28 if matches!(resync_state, ResyncState::Gone { .. }) { 29 debug!("queuing retry for gone repo: {did}"); 30 31 + let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? else { 32 + error!("repo state not found for {did}"); 33 + continue; 34 + }; 35 36 // update repo state back to backfilling 37 + let repo_state = deser_repo_state(&state_bytes)?; 38 + ops::update_repo_status( 39 + &mut batch, 40 + &state.db, 41 + &did, 42 + repo_state, 43 + RepoStatus::Backfilling, 44 + )?; 45 46 count += 1; 47 } 48 } 49 } 50 51 + if count == 0 { 52 + return Ok(()); 53 + } 54 + 55 + batch.commit().into_diagnostic()?; 56 + 57 + state.db.update_count("resync", -count); 58 + state.db.update_count("pending", count); 59 + 60 + state.notify_backfill(); 61 + 62 info!("queued {count} gone backfills"); 63 Ok(()) 64 } ··· 72 73 let now = chrono::Utc::now().timestamp(); 74 let mut count = 0; 75 + 76 + let mut batch = state.db.inner.batch(); 77 78 for guard in db.resync.iter() { 79 let (key, value) = match guard.into_inner() { ··· 92 } 93 }; 94 95 + match rmp_serde::from_slice::<ResyncState>(&value) { 96 + Ok(ResyncState::Error { next_retry, .. }) => { 97 + if next_retry <= now { 98 + debug!("retrying backfill for {did}"); 99 100 + let state_bytes = match state.db.repos.get(&key).into_diagnostic() { 101 + Ok(b) => b, 102 + Err(err) => { 103 + error!("failed to get repo state for {did}: {err}"); 104 + continue; 105 + } 106 + }; 107 + let Some(state_bytes) = state_bytes else { 108 + error!("repo state not found for {did}"); 109 + continue; 110 + }; 111 112 + let repo_state = match deser_repo_state(&state_bytes) { 113 + Ok(s) => s, 114 + Err(e) => { 115 + error!("failed to deserialize repo state for {did}: {e}"); 116 + continue; 117 + } 118 + }; 119 + let res = ops::update_repo_status( 120 + &mut batch, 121 + &state.db, 122 + &did, 123 + repo_state, 124 + RepoStatus::Backfilling, 125 + ); 126 + if let Err(e) = res { 127 + error!("failed to update repo status for {did}: {e}"); 128 + continue; 129 + } 130 + 131 + count += 1; 132 + } 133 + } 134 + Ok(_) => { 135 + // not an error state, do nothing 136 + } 137 + Err(e) => { 138 + error!("failed to deserialize resync state for {did}: {e}"); 139 + continue; 140 } 141 } 142 } 143 144 + if count == 0 { 145 + continue; 146 } 147 + 148 + if let Err(e) = batch.commit() { 149 + error!("failed to commit batch: {e}"); 150 + db::check_poisoned(&e); 151 + continue; 152 + } 153 + 154 + state.db.update_count("resync", -count); 155 + state.db.update_count("pending", count); 156 + state.notify_backfill(); 157 + info!("queued {count} retries"); 158 } 159 }
+7 -10
src/backfill/mod.rs
··· 84 let mut spawned = 0; 85 86 for guard in self.state.db.pending.iter() { 87 - let key = match guard.key() { 88 - Ok(k) => k, 89 Err(e) => { 90 - error!("failed to read pending key: {e}"); 91 db::check_poisoned(&e); 92 continue; 93 } 94 }; 95 96 - let did = match TrimmedDid::try_from(key.as_ref()) { 97 Ok(d) => d.to_did(), 98 Err(e) => { 99 - error!("invalid did '{key:?}' in pending: {e}"); 100 continue; 101 } 102 }; ··· 168 169 // determine old gauge state 170 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 171 - // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status. 172 // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status. 173 let old_gauge = match previous_state.status { 174 RepoStatus::Backfilling => GaugeState::Pending, ··· 393 start.elapsed() 394 ); 395 396 - if let Some(h) = handle { 397 - state.handle = Some(h.to_smolstr()); 398 - } 399 400 let emit_identity = |status: &RepoStatus| { 401 let evt = AccountEvt { ··· 428 if matches!(e, GetRepoError::RepoNotFound(_)) { 429 warn!("repo {did} not found, deleting"); 430 let mut batch = db.inner.batch(); 431 - ops::delete_repo(&mut batch, db, did)?; 432 batch.commit().into_diagnostic()?; 433 return Ok(previous_state); // stop backfill 434 }
··· 84 let mut spawned = 0; 85 86 for guard in self.state.db.pending.iter() { 87 + let (key, value) = match guard.into_inner() { 88 + Ok(kv) => kv, 89 Err(e) => { 90 + error!("failed to read pending entry: {e}"); 91 db::check_poisoned(&e); 92 continue; 93 } 94 }; 95 96 + let did = match TrimmedDid::try_from(value.as_ref()) { 97 Ok(d) => d.to_did(), 98 Err(e) => { 99 + error!("invalid did in pending value: {e}"); 100 continue; 101 } 102 }; ··· 168 169 // determine old gauge state 170 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 171 // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status. 172 let old_gauge = match previous_state.status { 173 RepoStatus::Backfilling => GaugeState::Pending, ··· 392 start.elapsed() 393 ); 394 395 + state.handle = handle.map(|h| h.into_static()); 396 397 let emit_identity = |status: &RepoStatus| { 398 let evt = AccountEvt { ··· 425 if matches!(e, GetRepoError::RepoNotFound(_)) { 426 warn!("repo {did} not found, deleting"); 427 let mut batch = db.inner.batch(); 428 + ops::delete_repo(&mut batch, db, did, state)?; 429 batch.commit().into_diagnostic()?; 430 return Ok(previous_state); // stop backfill 431 }
+6 -2
src/crawler/mod.rs
··· 5 use jacquard::prelude::*; 6 use jacquard_common::CowStr; 7 use miette::{IntoDiagnostic, Result}; 8 use smol_str::SmolStr; 9 use std::sync::Arc; 10 use std::time::Duration; ··· 37 38 pub async fn run(self) -> Result<()> { 39 info!("crawler started"); 40 41 let db = &self.state.db; 42 ··· 140 if !Db::contains_key(db.repos.clone(), &did_key).await? { 141 trace!("crawler found new repo: {}", repo.did); 142 143 - let state = RepoState::backfilling(&repo.did); 144 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 145 - batch.insert(&db.pending, &did_key, Vec::new()); 146 to_queue.push(repo.did.clone()); 147 } 148 }
··· 5 use jacquard::prelude::*; 6 use jacquard_common::CowStr; 7 use miette::{IntoDiagnostic, Result}; 8 + use rand::Rng; 9 + use rand::rngs::SmallRng; 10 use smol_str::SmolStr; 11 use std::sync::Arc; 12 use std::time::Duration; ··· 39 40 pub async fn run(self) -> Result<()> { 41 info!("crawler started"); 42 + 43 + let mut rng: SmallRng = rand::make_rng(); 44 45 let db = &self.state.db; 46 ··· 144 if !Db::contains_key(db.repos.clone(), &did_key).await? { 145 trace!("crawler found new repo: {}", repo.did); 146 147 + let state = RepoState::backfilling(rng.next_u64()); 148 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 149 + batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 150 to_queue.push(repo.did.clone()); 151 } 152 }
+4
src/db/keys.rs
··· 15 vec 16 } 17 18 // prefix format: {DID}| (DID trimmed) 19 pub fn record_prefix_did(did: &Did) -> Vec<u8> { 20 let repo = TrimmedDid::from(did);
··· 15 vec 16 } 17 18 + pub fn pending_key(id: u64) -> [u8; 8] { 19 + id.to_be_bytes() 20 + } 21 + 22 // prefix format: {DID}| (DID trimmed) 23 pub fn record_prefix_did(did: &Did) -> Vec<u8> { 24 let repo = TrimmedDid::from(did);
+25 -51
src/ingest/worker.rs
··· 15 use jacquard_common::IntoStatic; 16 use jacquard_repo::error::CommitError; 17 use miette::{Context, Diagnostic, IntoDiagnostic, Result}; 18 use smol_str::ToSmolStr; 19 - use std::collections::{HashMap, HashSet, hash_map::DefaultHasher}; 20 use std::hash::{Hash, Hasher}; 21 use std::sync::Arc; 22 use thiserror::Error; ··· 64 struct WorkerContext<'a> { 65 verify_signatures: bool, 66 state: &'a AppState, 67 - repo_cache: &'a mut HashMap<Did<'static>, RepoState<'static>>, 68 batch: &'a mut OwnedWriteBatch, 69 added_blocks: &'a mut i64, 70 records_delta: &'a mut i64, ··· 156 let _guard = handle.enter(); 157 debug!("shard {id} started"); 158 159 - let mut repo_cache = HashMap::new(); 160 - let mut deleted = HashSet::new(); 161 let mut broadcast_events = Vec::new(); 162 163 while let Some(msg) = rx.blocking_recv() { 164 let mut batch = state.db.inner.batch(); 165 - repo_cache.clear(); 166 - deleted.clear(); 167 broadcast_events.clear(); 168 169 let mut added_blocks = 0; ··· 171 172 let mut ctx = WorkerContext { 173 state: &state, 174 - repo_cache: &mut repo_cache, 175 batch: &mut batch, 176 added_blocks: &mut added_blocks, 177 records_delta: &mut records_delta, ··· 197 // TODO: there might be a race condition here where we get a new commit 198 // while the resync buffer is being drained, we should handle that probably 199 // but also it should still be fine since we'll sync eventually anyway 200 - match ops::update_repo_status( 201 &mut batch, 202 &state.db, 203 &did, 204 s, 205 RepoStatus::Synced, 206 - ) { 207 - Ok(s) => { 208 - repo_cache.insert(did.clone(), s.into_static()); 209 - } 210 - Err(e) => { 211 - // this can only fail if serde retry fails which would be really weird 212 - error!( 213 - "failed to transition {did} to synced: {e}" 214 - ); 215 - } 216 } 217 - } 218 - RepoProcessResult::Deleted => { 219 - deleted.insert(did.clone()); 220 } 221 // we don't have to handle this since drain_resync_buffer doesn't delete 222 // the commits from the resync buffer so they will get retried later 223 RepoProcessResult::Syncing(_) => {} 224 }, 225 Err(e) => { 226 error!("failed to drain resync buffer for {did}: {e}") ··· 240 _ => continue, 241 }; 242 243 - if deleted.contains(did) { 244 - continue; 245 - } 246 - 247 match Self::process_message(&mut ctx, &msg, did) { 248 Ok(RepoProcessResult::Ok(_)) => {} 249 - Ok(RepoProcessResult::Deleted) => { 250 - deleted.insert(did.clone()); 251 - } 252 Ok(RepoProcessResult::Syncing(Some(commit))) => { 253 if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) { 254 error!("failed to persist commit to resync_buffer for {did}: {e}"); ··· 359 repo_state, 360 RepoStatus::Backfilling, 361 )?; 362 - batch.insert(&ctx.state.db.pending, keys::repo_key(did), &[]); 363 batch.commit().into_diagnostic()?; 364 ctx.state 365 .db ··· 399 match &account.status { 400 Some(AccountStatus::Deleted) => { 401 debug!("account {did} deleted, wiping data"); 402 - ops::delete_repo(ctx.batch, &ctx.state.db, did)?; 403 return Ok(RepoProcessResult::Deleted); 404 } 405 status => { ··· 442 repo_state, 443 target_status, 444 )?; 445 - ctx.repo_cache.insert( 446 - did.clone().into_static(), 447 - repo_state.clone().into_static(), 448 - ); 449 } 450 } 451 } else { ··· 500 ); 501 502 let mut batch = ctx.state.db.inner.batch(); 503 - let repo_state = ops::update_repo_status( 504 &mut batch, 505 &ctx.state.db, 506 did, 507 repo_state, 508 RepoStatus::Backfilling, 509 )?; 510 - batch.insert(&ctx.state.db.pending, keys::repo_key(did), &[]); 511 batch.commit().into_diagnostic()?; 512 ctx.state.db.update_gauge_diff( 513 &crate::types::GaugeState::Synced, 514 &crate::types::GaugeState::Pending, 515 ); 516 - ctx.repo_cache 517 - .insert(did.clone().into_static(), repo_state.clone().into_static()); 518 ctx.state.notify_backfill(); 519 return Ok(RepoProcessResult::Syncing(Some(commit))); 520 } ··· 527 Self::fetch_key(ctx, did)?.as_ref(), 528 )?; 529 let repo_state = res.repo_state; 530 - ctx.repo_cache 531 - .insert(did.clone().into_static(), repo_state.clone().into_static()); 532 *ctx.added_blocks += res.blocks_count; 533 *ctx.records_delta += res.records_delta; 534 ctx.broadcast_events.push(BroadcastEvent::Persisted( ··· 550 did: &Did<'_>, 551 msg: &'c SubscribeReposMessage<'static>, 552 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 553 - // check if we have this repo 554 - if let Some(state) = ctx.repo_cache.get(did) { 555 - return Ok(RepoProcessResult::Ok(state.clone())); 556 - } 557 - 558 let repo_key = keys::repo_key(&did); 559 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 560 // we don't know this repo, but we are receiving events for it 561 // this means we should backfill it before processing its events 562 debug!("discovered new account {did} from firehose, queueing backfill"); 563 564 - let new_state = RepoState::backfilling(did); 565 // using a separate batch here since we want to make it known its being backfilled 566 // immediately. we could use the batch for the unit of work we are doing but 567 // then we wouldn't be able to start backfilling until the unit of work is done 568 - 569 let mut batch = ctx.state.db.inner.batch(); 570 batch.insert( 571 &ctx.state.db.repos, 572 &repo_key, 573 - crate::db::ser_repo_state(&new_state)?, 574 ); 575 - batch.insert(&ctx.state.db.pending, repo_key, &[]); 576 batch.commit().into_diagnostic()?; 577 578 ctx.state.db.update_count("repos", 1); ··· 621 repo_state, 622 RepoStatus::Synced, 623 )?; 624 - ctx.repo_cache 625 - .insert(did.clone().into_static(), repo_state.clone()); 626 Ok(RepoProcessResult::Ok(repo_state)) 627 } 628 }
··· 15 use jacquard_common::IntoStatic; 16 use jacquard_repo::error::CommitError; 17 use miette::{Context, Diagnostic, IntoDiagnostic, Result}; 18 + use rand::Rng; 19 use smol_str::ToSmolStr; 20 + use std::collections::hash_map::DefaultHasher; 21 use std::hash::{Hash, Hasher}; 22 use std::sync::Arc; 23 use thiserror::Error; ··· 65 struct WorkerContext<'a> { 66 verify_signatures: bool, 67 state: &'a AppState, 68 batch: &'a mut OwnedWriteBatch, 69 added_blocks: &'a mut i64, 70 records_delta: &'a mut i64, ··· 156 let _guard = handle.enter(); 157 debug!("shard {id} started"); 158 159 let mut broadcast_events = Vec::new(); 160 161 while let Some(msg) = rx.blocking_recv() { 162 let mut batch = state.db.inner.batch(); 163 broadcast_events.clear(); 164 165 let mut added_blocks = 0; ··· 167 168 let mut ctx = WorkerContext { 169 state: &state, 170 batch: &mut batch, 171 added_blocks: &mut added_blocks, 172 records_delta: &mut records_delta, ··· 192 // TODO: there might be a race condition here where we get a new commit 193 // while the resync buffer is being drained, we should handle that probably 194 // but also it should still be fine since we'll sync eventually anyway 195 + let res = ops::update_repo_status( 196 &mut batch, 197 &state.db, 198 &did, 199 s, 200 RepoStatus::Synced, 201 + ); 202 + if let Err(e) = res { 203 + // this can only fail if serde retry fails which would be really weird 204 + error!("failed to transition {did} to synced: {e}"); 205 } 206 } 207 // we don't have to handle this since drain_resync_buffer doesn't delete 208 // the commits from the resync buffer so they will get retried later 209 RepoProcessResult::Syncing(_) => {} 210 + RepoProcessResult::Deleted => {} 211 }, 212 Err(e) => { 213 error!("failed to drain resync buffer for {did}: {e}") ··· 227 _ => continue, 228 }; 229 230 match Self::process_message(&mut ctx, &msg, did) { 231 Ok(RepoProcessResult::Ok(_)) => {} 232 + Ok(RepoProcessResult::Deleted) => {} 233 Ok(RepoProcessResult::Syncing(Some(commit))) => { 234 if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) { 235 error!("failed to persist commit to resync_buffer for {did}: {e}"); ··· 340 repo_state, 341 RepoStatus::Backfilling, 342 )?; 343 batch.commit().into_diagnostic()?; 344 ctx.state 345 .db ··· 379 match &account.status { 380 Some(AccountStatus::Deleted) => { 381 debug!("account {did} deleted, wiping data"); 382 + ops::delete_repo(ctx.batch, &ctx.state.db, did, repo_state)?; 383 return Ok(RepoProcessResult::Deleted); 384 } 385 status => { ··· 422 repo_state, 423 target_status, 424 )?; 425 + ctx.state 426 + .db 427 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None)); 428 } 429 } 430 } else { ··· 479 ); 480 481 let mut batch = ctx.state.db.inner.batch(); 482 + let _repo_state = ops::update_repo_status( 483 &mut batch, 484 &ctx.state.db, 485 did, 486 repo_state, 487 RepoStatus::Backfilling, 488 )?; 489 batch.commit().into_diagnostic()?; 490 ctx.state.db.update_gauge_diff( 491 &crate::types::GaugeState::Synced, 492 &crate::types::GaugeState::Pending, 493 ); 494 ctx.state.notify_backfill(); 495 return Ok(RepoProcessResult::Syncing(Some(commit))); 496 } ··· 503 Self::fetch_key(ctx, did)?.as_ref(), 504 )?; 505 let repo_state = res.repo_state; 506 *ctx.added_blocks += res.blocks_count; 507 *ctx.records_delta += res.records_delta; 508 ctx.broadcast_events.push(BroadcastEvent::Persisted( ··· 524 did: &Did<'_>, 525 msg: &'c SubscribeReposMessage<'static>, 526 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 527 let repo_key = keys::repo_key(&did); 528 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 529 // we don't know this repo, but we are receiving events for it 530 // this means we should backfill it before processing its events 531 debug!("discovered new account {did} from firehose, queueing backfill"); 532 533 + let repo_state = RepoState::backfilling(rand::rng().next_u64()); 534 // using a separate batch here since we want to make it known its being backfilled 535 // immediately. we could use the batch for the unit of work we are doing but 536 // then we wouldn't be able to start backfilling until the unit of work is done 537 let mut batch = ctx.state.db.inner.batch(); 538 batch.insert( 539 &ctx.state.db.repos, 540 &repo_key, 541 + crate::db::ser_repo_state(&repo_state)?, 542 + ); 543 + batch.insert( 544 + &ctx.state.db.pending, 545 + keys::pending_key(repo_state.index_id), 546 + &repo_key, 547 ); 548 batch.commit().into_diagnostic()?; 549 550 ctx.state.db.update_count("repos", 1); ··· 593 repo_state, 594 RepoStatus::Synced, 595 )?; 596 + ctx.state.db.update_gauge_diff( 597 + &crate::types::GaugeState::Resync(None), 598 + &crate::types::GaugeState::Synced, 599 + ); 600 Ok(RepoProcessResult::Ok(repo_state)) 601 } 602 }
+42 -15
src/ops.rs
··· 14 use jacquard_common::types::crypto::PublicKey; 15 use jacquard_repo::car::reader::parse_car_bytes; 16 use miette::{Context, IntoDiagnostic, Result}; 17 use std::collections::HashMap; 18 use std::sync::atomic::Ordering; 19 use std::time::Instant; ··· 65 batch: &'batch mut OwnedWriteBatch, 66 db: &Db, 67 did: &jacquard::types::did::Did, 68 ) -> Result<()> { 69 debug!("deleting repo {did}"); 70 let repo_key = keys::repo_key(did); 71 72 // 1. delete from repos, pending, resync 73 batch.remove(&db.repos, &repo_key); 74 - batch.remove(&db.pending, &repo_key); 75 - batch.remove(&db.resync, &repo_key); 76 77 let resync_prefix = keys::resync_buffer_prefix(did); 78 for guard in db.resync_buffer.prefix(&resync_prefix) { 79 let k = guard.key().into_diagnostic()?; 80 batch.remove(&db.resync_buffer, k); 81 } 82 83 - // 2. delete from records 84 let records_prefix = keys::record_prefix_did(did); 85 for guard in db.records.prefix(&records_prefix) { 86 let k = guard.key().into_diagnostic()?; 87 batch.remove(&db.records, k); 88 } 89 90 - // 3. reset collection counts 91 let mut count_prefix = Vec::new(); 92 count_prefix.push(b'r'); 93 count_prefix.push(keys::SEP); ··· 111 ) -> Result<RepoState<'s>> { 112 debug!("updating repo status for {did} to {new_status:?}"); 113 114 - let key = keys::repo_key(did); 115 116 // manage queues 117 match &new_status { 118 RepoStatus::Synced => { 119 - batch.remove(&db.pending, &key); 120 - batch.remove(&db.resync, &key); 121 } 122 RepoStatus::Backfilling => { 123 - batch.insert(&db.pending, &key, &[]); 124 - batch.remove(&db.resync, &key); 125 } 126 RepoStatus::Error(_msg) => { 127 - batch.remove(&db.pending, &key); 128 let resync_state = crate::types::ResyncState::Error { 129 - kind: crate::types::ResyncErrorKind::Generic, // ops errors are usually generic logic errors? or transport? 130 retry_count: 0, 131 next_retry: chrono::Utc::now().timestamp(), 132 }; 133 batch.insert( 134 &db.resync, 135 - &key, 136 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 137 ); 138 } 139 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => { 140 - batch.remove(&db.pending, &key); 141 let resync_state = ResyncState::Gone { 142 status: new_status.clone(), 143 }; 144 batch.insert( 145 &db.resync, 146 - &key, 147 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 148 ); 149 } ··· 152 repo_state.status = new_status; 153 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 154 155 - batch.insert(&db.repos, &key, ser_repo_state(&repo_state)?); 156 157 Ok(repo_state) 158 }
··· 14 use jacquard_common::types::crypto::PublicKey; 15 use jacquard_repo::car::reader::parse_car_bytes; 16 use miette::{Context, IntoDiagnostic, Result}; 17 + use rand::{Rng, rng}; 18 use std::collections::HashMap; 19 use std::sync::atomic::Ordering; 20 use std::time::Instant; ··· 66 batch: &'batch mut OwnedWriteBatch, 67 db: &Db, 68 did: &jacquard::types::did::Did, 69 + repo_state: RepoState, 70 ) -> Result<()> { 71 debug!("deleting repo {did}"); 72 + 73 let repo_key = keys::repo_key(did); 74 + let pending_key = keys::pending_key(repo_state.index_id); 75 76 // 1. delete from repos, pending, resync 77 batch.remove(&db.repos, &repo_key); 78 + match repo_state.status { 79 + RepoStatus::Synced => {} 80 + RepoStatus::Backfilling => { 81 + batch.remove(&db.pending, &pending_key); 82 + } 83 + _ => { 84 + batch.remove(&db.resync, &repo_key); 85 + } 86 + } 87 88 + // 2. delete from resync buffer 89 let resync_prefix = keys::resync_buffer_prefix(did); 90 for guard in db.resync_buffer.prefix(&resync_prefix) { 91 let k = guard.key().into_diagnostic()?; 92 batch.remove(&db.resync_buffer, k); 93 } 94 95 + // 3. delete from records 96 let records_prefix = keys::record_prefix_did(did); 97 for guard in db.records.prefix(&records_prefix) { 98 let k = guard.key().into_diagnostic()?; 99 batch.remove(&db.records, k); 100 } 101 102 + // 4. reset collection counts 103 let mut count_prefix = Vec::new(); 104 count_prefix.push(b'r'); 105 count_prefix.push(keys::SEP); ··· 123 ) -> Result<RepoState<'s>> { 124 debug!("updating repo status for {did} to {new_status:?}"); 125 126 + let repo_key = keys::repo_key(did); 127 + let pending_key = keys::pending_key(repo_state.index_id); 128 129 // manage queues 130 match &new_status { 131 RepoStatus::Synced => { 132 + batch.remove(&db.pending, &pending_key); 133 + // we dont have to remove from resync here because it has to transition resync -> pending first 134 } 135 RepoStatus::Backfilling => { 136 + // if we are coming from an error state, remove from resync 137 + if !matches!(repo_state.status, RepoStatus::Synced) { 138 + batch.remove(&db.resync, &repo_key); 139 + } 140 + // remove the old entry 141 + batch.remove(&db.pending, &pending_key); 142 + // add as new entry 143 + repo_state.index_id = rng().next_u64(); 144 + batch.insert( 145 + &db.pending, 146 + keys::pending_key(repo_state.index_id), 147 + &repo_key, 148 + ); 149 } 150 RepoStatus::Error(_msg) => { 151 + batch.remove(&db.pending, &pending_key); 152 + // TODO: we need to make errors have kind instead of "message" in repo status 153 + // and then pass it to resync error kind 154 let resync_state = crate::types::ResyncState::Error { 155 + kind: crate::types::ResyncErrorKind::Generic, 156 retry_count: 0, 157 next_retry: chrono::Utc::now().timestamp(), 158 }; 159 batch.insert( 160 &db.resync, 161 + &repo_key, 162 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 163 ); 164 } 165 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => { 166 + // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states 167 + // batch.remove(&db.pending, &pending_key); 168 let resync_state = ResyncState::Gone { 169 status: new_status.clone(), 170 }; 171 batch.insert( 172 &db.resync, 173 + &repo_key, 174 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 175 ); 176 } ··· 179 repo_state.status = new_status; 180 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 181 182 + batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?); 183 184 Ok(repo_state) 185 }
+8 -10
src/types.rs
··· 1 use std::fmt::Display; 2 3 - use jacquard::{CowStr, IntoStatic}; 4 use jacquard_common::types::string::Did; 5 use serde::{Deserialize, Serialize}; 6 use serde_json::Value; ··· 35 #[derive(Debug, Clone, Serialize, Deserialize)] 36 #[serde(bound(deserialize = "'i: 'de"))] 37 pub struct RepoState<'i> { 38 - #[serde(borrow)] 39 - pub did: TrimmedDid<'i>, 40 pub status: RepoStatus, 41 pub rev: Option<DbTid>, 42 pub data: Option<IpldCid>, 43 pub last_seq: Option<i64>, 44 pub last_updated_at: i64, // unix timestamp 45 - pub handle: Option<SmolStr>, 46 } 47 48 impl<'i> RepoState<'i> { 49 - pub fn backfilling(did: &'i Did<'i>) -> Self { 50 Self { 51 - did: TrimmedDid::from(did), 52 status: RepoStatus::Backfilling, 53 rev: None, 54 data: None, 55 last_seq: None, 56 last_updated_at: chrono::Utc::now().timestamp(), 57 handle: None, 58 } 59 } 60 } ··· 64 65 fn into_static(self) -> Self::Output { 66 RepoState { 67 - did: self.did.into_static(), 68 status: self.status, 69 rev: self.rev, 70 data: self.data, 71 last_seq: self.last_seq, 72 last_updated_at: self.last_updated_at, 73 - handle: self.handle, 74 } 75 } 76 } 77 - 78 - // from src/backfill/resync_state.rs 79 80 #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 81 pub enum ResyncErrorKind {
··· 1 use std::fmt::Display; 2 3 + use jacquard::{CowStr, IntoStatic, types::string::Handle}; 4 use jacquard_common::types::string::Did; 5 use serde::{Deserialize, Serialize}; 6 use serde_json::Value; ··· 35 #[derive(Debug, Clone, Serialize, Deserialize)] 36 #[serde(bound(deserialize = "'i: 'de"))] 37 pub struct RepoState<'i> { 38 pub status: RepoStatus, 39 pub rev: Option<DbTid>, 40 pub data: Option<IpldCid>, 41 pub last_seq: Option<i64>, 42 pub last_updated_at: i64, // unix timestamp 43 + #[serde(borrow)] 44 + pub handle: Option<Handle<'i>>, 45 + pub index_id: u64, 46 } 47 48 impl<'i> RepoState<'i> { 49 + pub fn backfilling(index_id: u64) -> Self { 50 Self { 51 status: RepoStatus::Backfilling, 52 rev: None, 53 data: None, 54 last_seq: None, 55 last_updated_at: chrono::Utc::now().timestamp(), 56 handle: None, 57 + index_id, 58 } 59 } 60 } ··· 64 65 fn into_static(self) -> Self::Output { 66 RepoState { 67 status: self.status, 68 rev: self.rev, 69 data: self.data, 70 last_seq: self.last_seq, 71 last_updated_at: self.last_updated_at, 72 + handle: self.handle.map(|s| s.into_static()), 73 + index_id: self.index_id, 74 } 75 } 76 } 77 78 #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 79 pub enum ResyncErrorKind {