at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] fix resync count underflow race conditions

ptr.pet ddda6d79 6889adbe

verified
+18 -14
+18 -14
src/backfill/manager.rs
··· 2 2 use crate::db::{self, deser_repo_state}; 3 3 use crate::ops; 4 4 use crate::state::AppState; 5 - use crate::types::{RepoStatus, ResyncState}; 5 + use crate::types::{GaugeState, RepoStatus, ResyncState}; 6 6 use miette::{IntoDiagnostic, Result}; 7 7 use std::sync::Arc; 8 8 use std::time::Duration; ··· 10 10 11 11 pub fn queue_gone_backfills(state: &Arc<AppState>) -> Result<()> { 12 12 debug!("scanning for deactivated/takendown repos to retry..."); 13 - let mut count = 0; 13 + let mut transitions = Vec::new(); 14 14 15 15 let mut batch = state.db.inner.batch(); 16 16 ··· 43 43 RepoStatus::Backfilling, 44 44 )?; 45 45 46 - count += 1; 46 + transitions.push((GaugeState::Resync(None), GaugeState::Pending)); 47 47 } 48 48 } 49 49 } 50 50 51 - if count == 0 { 51 + if transitions.is_empty() { 52 52 return Ok(()); 53 53 } 54 54 55 55 batch.commit().into_diagnostic()?; 56 56 57 - state.db.update_count("resync", -count); 58 - state.db.update_count("pending", count); 57 + for (old_gauge, new_gauge) in &transitions { 58 + state.db.update_gauge_diff(old_gauge, new_gauge); 59 + } 59 60 60 61 state.notify_backfill(); 61 62 62 - info!("queued {count} gone backfills"); 63 + info!("queued {} gone backfills", transitions.len()); 63 64 Ok(()) 64 65 } 65 66 ··· 71 72 std::thread::sleep(Duration::from_secs(60)); 72 73 73 74 let now = chrono::Utc::now().timestamp(); 74 - let mut count = 0; 75 + let mut transitions = Vec::new(); 75 76 76 77 let mut batch = state.db.inner.batch(); 77 78 ··· 93 94 }; 94 95 95 96 match rmp_serde::from_slice::<ResyncState>(&value) { 96 - Ok(ResyncState::Error { next_retry, .. }) => { 97 + Ok(ResyncState::Error { 98 + kind, next_retry, .. 99 + }) => { 97 100 if next_retry <= now { 98 101 debug!("retrying backfill for {did}"); 99 102 ··· 128 131 continue; 129 132 } 130 133 131 - count += 1; 134 + transitions.push((GaugeState::Resync(Some(kind)), GaugeState::Pending)); 132 135 } 133 136 } 134 137 Ok(_) => { ··· 141 144 } 142 145 } 143 146 144 - if count == 0 { 147 + if transitions.is_empty() { 145 148 continue; 146 149 } 147 150 ··· 151 154 continue; 152 155 } 153 156 154 - state.db.update_count("resync", -count); 155 - state.db.update_count("pending", count); 157 + for (old_gauge, new_gauge) in &transitions { 158 + state.db.update_gauge_diff(old_gauge, new_gauge); 159 + } 156 160 state.notify_backfill(); 157 - info!("queued {count} retries"); 161 + info!("queued {} retries", transitions.len()); 158 162 } 159 163 }