at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[db] refactor gauge updates to use shared state and macro

ptr.pet 3dd776db 25e6d3c7

verified
+163 -99
+27 -34
src/api/repo.rs
··· 1 1 use crate::api::AppState; 2 2 use crate::db::{Db, keys, ser_repo_state}; 3 - use crate::types::RepoState; 3 + use crate::types::{GaugeState, RepoState}; 4 4 use axum::{Json, Router, extract::State, http::StatusCode, routing::post}; 5 5 use jacquard::types::did::Did; 6 6 use serde::Deserialize; ··· 51 51 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 52 52 53 53 state.db.update_count_async("repos", added).await; 54 - state.db.update_count_async("pending", added).await; 54 + state 55 + .db 56 + .update_gauge_diff_async(&GaugeState::Synced, &GaugeState::Pending) 57 + .await; 55 58 56 59 // trigger backfill worker 57 60 state.notify_backfill(); ··· 71 74 let db = &state.db; 72 75 let mut batch = db.inner.batch(); 73 76 let mut removed_repos = 0; 74 - let mut removed_pending = 0; 75 - let mut removed_resync = 0; 76 77 77 78 for did_str in req.dids { 78 79 let did = Did::new_owned(did_str.as_str()) ··· 95 96 | crate::types::RepoStatus::Suspended 96 97 ); 97 98 98 - batch.remove(&db.repos, &did_key); 99 - 100 - if was_pending { 101 - batch.remove(&db.pending, &did_key); 102 - removed_pending -= 1; 103 - } 104 - if let Some(resync_bytes) = Db::get(db.resync.clone(), &did_key) 99 + let old_gauge = if was_pending { 100 + GaugeState::Pending 101 + } else if let Some(resync_bytes) = Db::get(db.resync.clone(), &did_key) 105 102 .await 106 103 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 107 104 { 108 105 let resync_state: crate::types::ResyncState = rmp_serde::from_slice(&resync_bytes) 109 106 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 110 107 111 - if let crate::types::ResyncState::Error { kind, .. } = resync_state { 112 - match kind { 113 - crate::types::ResyncErrorKind::Ratelimited => { 114 - state.db.update_count_async("error_ratelimited", -1).await 115 - } 116 - crate::types::ResyncErrorKind::Transport => { 117 - state.db.update_count_async("error_transport", -1).await 118 - } 119 - crate::types::ResyncErrorKind::Generic => { 120 - state.db.update_count_async("error_generic", -1).await 121 - } 122 - } 123 - } 108 + let kind = if let crate::types::ResyncState::Error { kind, .. } = resync_state { 109 + Some(kind) 110 + } else { 111 + None 112 + }; 113 + GaugeState::Resync(kind) 114 + } else { 115 + GaugeState::Synced 116 + }; 124 117 118 + batch.remove(&db.repos, &did_key); 119 + if was_pending { 120 + batch.remove(&db.pending, &did_key); 121 + } 122 + if old_gauge.is_resync() { 125 123 batch.remove(&db.resync, &did_key); 126 - removed_resync -= 1; 127 124 } 125 + 126 + state 127 + .db 128 + .update_gauge_diff_async(&old_gauge, &GaugeState::Synced) 129 + .await; 128 130 129 131 removed_repos -= 1; 130 132 } ··· 137 139 138 140 if removed_repos != 0 { 139 141 state.db.update_count_async("repos", removed_repos).await; 140 - } 141 - if removed_pending != 0 { 142 - state 143 - .db 144 - .update_count_async("pending", removed_pending) 145 - .await; 146 - } 147 - if removed_resync != 0 { 148 - state.db.update_count_async("resync", removed_resync).await; 149 142 } 150 143 151 144 Ok(StatusCode::OK)
+48 -60
src/backfill/mod.rs
··· 3 3 use crate::ops; 4 4 use crate::resolver::ResolverError; 5 5 use crate::state::AppState; 6 - use crate::types::{AccountEvt, BroadcastEvent, RepoState, RepoStatus, ResyncState, StoredEvent}; 6 + use crate::types::{ 7 + AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncState, StoredEvent, 8 + }; 7 9 8 10 use fjall::Slice; 9 11 use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; ··· 235 237 Ok(previous_state) => { 236 238 let did_key = keys::repo_key(&did); 237 239 238 - let was_pending = matches!(previous_state.status, RepoStatus::Backfilling); 239 - let was_resync = matches!( 240 - previous_state.status, 240 + // determine old gauge state 241 + // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 242 + // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status. 243 + // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status. 244 + let old_gauge = match previous_state.status { 245 + RepoStatus::Backfilling => GaugeState::Pending, 241 246 RepoStatus::Error(_) 242 - | RepoStatus::Deactivated 243 - | RepoStatus::Takendown 244 - | RepoStatus::Suspended 245 - ); 247 + | RepoStatus::Deactivated 248 + | RepoStatus::Takendown 249 + | RepoStatus::Suspended => { 250 + // we need to fetch the resync state to know the kind 251 + // if it's missing, we assume Generic (or handle gracefully) 252 + // this is an extra read, but necessary for accurate gauges. 253 + let resync_state = Db::get(db.resync.clone(), &did_key).await.ok().flatten(); 254 + let kind = resync_state.and_then(|b| { 255 + rmp_serde::from_slice::<ResyncState>(&b) 256 + .ok() 257 + .and_then(|s| match s { 258 + ResyncState::Error { kind, .. } => Some(kind), 259 + _ => None, 260 + }) 261 + }); 262 + GaugeState::Resync(kind) 263 + } 264 + RepoStatus::Synced => GaugeState::Synced, 265 + }; 246 266 247 267 let mut batch = db.inner.batch(); 248 268 // remove from pending 249 - if was_pending { 269 + if old_gauge == GaugeState::Pending { 250 270 batch.remove(&db.pending, pending_key); 251 271 } 252 272 // remove from resync 253 - if was_resync { 273 + if old_gauge.is_resync() { 254 274 batch.remove(&db.resync, &did_key); 255 275 } 256 276 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 257 277 .await 258 278 .into_diagnostic()??; 259 - if was_pending { 260 - state.db.update_count_async("pending", -1).await; 261 - } 262 - if was_resync { 263 - state.db.update_count_async("resync", -1).await; 264 - } 279 + 280 + state 281 + .db 282 + .update_gauge_diff_async(&old_gauge, &GaugeState::Synced) 283 + .await; 265 284 266 285 let state = state.clone(); 267 286 tokio::task::spawn_blocking(move || { ··· 349 368 350 369 let mut batch = state.db.inner.batch(); 351 370 batch.insert(&state.db.resync, &did_key, serialized_resync_state); 352 - batch.remove(&state.db.pending, pending_key); 371 + batch.remove(&state.db.pending, pending_key.clone()); 353 372 if let Some(state_bytes) = serialized_repo_state { 354 373 batch.insert(&state.db.repos, &did_key, state_bytes); 355 374 } ··· 359 378 .await 360 379 .into_diagnostic()??; 361 380 362 - state.db.update_count_async("resync", 1).await; 363 - state.db.update_count_async("pending", -1).await; 381 + let old_gauge = if let Some(k) = prev_kind { 382 + GaugeState::Resync(Some(k)) 383 + } else { 384 + GaugeState::Pending 385 + }; 386 + 387 + let new_gauge = GaugeState::Resync(Some(error_kind)); 388 + 389 + state 390 + .db 391 + .update_gauge_diff_async(&old_gauge, &new_gauge) 392 + .await; 364 393 365 - // update gauges 366 - if let Some(prev) = prev_kind { 367 - if prev != error_kind { 368 - match prev { 369 - crate::types::ResyncErrorKind::Ratelimited => { 370 - state.db.update_count_async("error_ratelimited", -1).await 371 - } 372 - crate::types::ResyncErrorKind::Transport => { 373 - state.db.update_count_async("error_transport", -1).await 374 - } 375 - crate::types::ResyncErrorKind::Generic => { 376 - state.db.update_count_async("error_generic", -1).await 377 - } 378 - } 379 - match error_kind { 380 - crate::types::ResyncErrorKind::Ratelimited => { 381 - state.db.update_count_async("error_ratelimited", 1).await 382 - } 383 - crate::types::ResyncErrorKind::Transport => { 384 - state.db.update_count_async("error_transport", 1).await 385 - } 386 - crate::types::ResyncErrorKind::Generic => { 387 - state.db.update_count_async("error_generic", 1).await 388 - } 389 - } 390 - } 391 - // if same, do nothing (count already accurate) 392 - } else { 393 - // new error 394 - match error_kind { 395 - crate::types::ResyncErrorKind::Ratelimited => { 396 - state.db.update_count_async("error_ratelimited", 1).await 397 - } 398 - crate::types::ResyncErrorKind::Transport => { 399 - state.db.update_count_async("error_transport", 1).await 400 - } 401 - crate::types::ResyncErrorKind::Generic => { 402 - state.db.update_count_async("error_generic", 1).await 403 - } 404 - } 405 - } 406 394 Err(e) 407 395 } 408 396 }
+62
src/db/mod.rs
··· 35 35 pub counts_map: HashMap<SmolStr, u64>, 36 36 } 37 37 38 + macro_rules! update_gauge_diff_impl { 39 + ($self:ident, $old:ident, $new:ident, $update_method:ident $(, $await:tt)?) => {{ 40 + use crate::types::GaugeState; 41 + 42 + if $old == $new { 43 + return; 44 + } 45 + 46 + // pending 47 + match ($old, $new) { 48 + (GaugeState::Pending, GaugeState::Pending) => {} 49 + (GaugeState::Pending, _) => $self.$update_method("pending", -1) $(.$await)?, 50 + (_, GaugeState::Pending) => $self.$update_method("pending", 1) $(.$await)?, 51 + _ => {} 52 + } 53 + 54 + // resync 55 + let old_resync = $old.is_resync(); 56 + let new_resync = $new.is_resync(); 57 + match (old_resync, new_resync) { 58 + (true, false) => $self.$update_method("resync", -1) $(.$await)?, 59 + (false, true) => $self.$update_method("resync", 1) $(.$await)?, 60 + _ => {} 61 + } 62 + 63 + // error kinds 64 + if let GaugeState::Resync(Some(kind)) = $old { 65 + let key = match kind { 66 + crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 67 + crate::types::ResyncErrorKind::Transport => "error_transport", 68 + crate::types::ResyncErrorKind::Generic => "error_generic", 69 + }; 70 + $self.$update_method(key, -1) $(.$await)?; 71 + } 72 + 73 + if let GaugeState::Resync(Some(kind)) = $new { 74 + let key = match kind { 75 + crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 76 + crate::types::ResyncErrorKind::Transport => "error_transport", 77 + crate::types::ResyncErrorKind::Generic => "error_generic", 78 + }; 79 + $self.$update_method(key, 1) $(.$await)?; 80 + } 81 + }}; 82 + } 83 + 38 84 impl Db { 39 85 pub fn open(cfg: &crate::config::Config) -> Result<Self> { 40 86 let db = Database::builder(&cfg.database_path) ··· 189 235 .read_async(key, |_, v| *v) 190 236 .await 191 237 .unwrap_or(0) 238 + } 239 + 240 + pub fn update_gauge_diff( 241 + &self, 242 + old: &crate::types::GaugeState, 243 + new: &crate::types::GaugeState, 244 + ) { 245 + update_gauge_diff_impl!(self, old, new, update_count); 246 + } 247 + 248 + pub async fn update_gauge_diff_async( 249 + &self, 250 + old: &crate::types::GaugeState, 251 + new: &crate::types::GaugeState, 252 + ) { 253 + update_gauge_diff_impl!(self, old, new, update_count_async, await); 192 254 } 193 255 194 256 pub fn update_repo_state<F, T>(
+12 -4
src/ingest/worker.rs
··· 3 3 use crate::ops; 4 4 use crate::resolver::{NoSigningKeyError, ResolverError}; 5 5 use crate::state::AppState; 6 - use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, RepoState, RepoStatus}; 6 + use crate::types::{AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoState, RepoStatus}; 7 7 use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; 8 8 9 9 use fjall::OwnedWriteBatch; ··· 361 361 )?; 362 362 batch.insert(&ctx.state.db.pending, keys::repo_key(did), &[]); 363 363 batch.commit().into_diagnostic()?; 364 - ctx.state.db.update_count("pending", 1); 364 + ctx.state 365 + .db 366 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 365 367 ctx.state.notify_backfill(); 366 368 return Ok(RepoProcessResult::Ok(repo_state)); 367 369 } ··· 507 509 )?; 508 510 batch.insert(&ctx.state.db.pending, keys::repo_key(did), &[]); 509 511 batch.commit().into_diagnostic()?; 510 - ctx.state.db.update_count("pending", 1); 512 + ctx.state.db.update_gauge_diff( 513 + &crate::types::GaugeState::Synced, 514 + &crate::types::GaugeState::Pending, 515 + ); 511 516 ctx.repo_cache 512 517 .insert(did.clone().into_static(), repo_state.clone().into_static()); 513 518 ctx.state.notify_backfill(); ··· 571 576 batch.commit().into_diagnostic()?; 572 577 573 578 ctx.state.db.update_count("repos", 1); 574 - ctx.state.db.update_count("pending", 1); 579 + ctx.state.db.update_gauge_diff( 580 + &crate::types::GaugeState::Synced, 581 + &crate::types::GaugeState::Pending, 582 + ); 575 583 576 584 ctx.state.notify_backfill(); 577 585
+14 -1
src/types.rs
··· 77 77 78 78 // from src/backfill/resync_state.rs 79 79 80 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 80 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 81 81 pub enum ResyncErrorKind { 82 82 Ratelimited, 83 83 Transport, ··· 185 185 #[serde(skip_serializing_if = "Option::is_none")] 186 186 pub cid: Option<IpldCid>, 187 187 } 188 + 189 + #[derive(Debug, PartialEq, Eq, Clone, Copy)] 190 + pub enum GaugeState { 191 + Synced, 192 + Pending, 193 + Resync(Option<ResyncErrorKind>), 194 + } 195 + 196 + impl GaugeState { 197 + pub fn is_resync(&self) -> bool { 198 + matches!(self, GaugeState::Resync(_)) 199 + } 200 + }