use crate::db::types::TrimmedDid; use crate::db::{self, deser_repo_state}; use crate::ops; use crate::state::AppState; use crate::types::{GaugeState, RepoStatus, ResyncState}; use miette::{IntoDiagnostic, Result}; use std::sync::Arc; use std::time::Duration; use tracing::{debug, error, info}; pub fn queue_gone_backfills(state: &Arc) -> Result<()> { debug!("scanning for deactivated/takendown repos to retry..."); let mut transitions = Vec::new(); let mut batch = state.db.inner.batch(); for guard in state.db.resync.iter() { let (key, val) = guard.into_inner().into_diagnostic()?; let did = match TrimmedDid::try_from(key.as_ref()) { Ok(did) => did.to_did(), Err(e) => { error!("invalid did in db, skipping: {e}"); continue; } }; if let Ok(resync_state) = rmp_serde::from_slice::(&val) { if matches!(resync_state, ResyncState::Gone { .. }) { debug!("queuing retry for gone repo: {did}"); let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? else { error!("repo state not found for {did}"); continue; }; // update repo state back to backfilling let repo_state = deser_repo_state(&state_bytes)?; ops::update_repo_status( &mut batch, &state.db, &did, repo_state, RepoStatus::Backfilling, )?; transitions.push((GaugeState::Resync(None), GaugeState::Pending)); } } } if transitions.is_empty() { return Ok(()); } batch.commit().into_diagnostic()?; for (old_gauge, new_gauge) in &transitions { state.db.update_gauge_diff(old_gauge, new_gauge); } state.notify_backfill(); info!("queued {} gone backfills", transitions.len()); Ok(()) } pub fn retry_worker(state: Arc) { let db = &state.db; info!("retry worker started"); loop { // sleep first (e.g., check every minute) std::thread::sleep(Duration::from_secs(60)); let now = chrono::Utc::now().timestamp(); let mut transitions = Vec::new(); let mut batch = state.db.inner.batch(); for guard in db.resync.iter() { let (key, value) = match guard.into_inner() { Ok(t) => t, Err(e) => { error!("failed to get resync state: {e}"); db::check_poisoned(&e); continue; } }; let did = match TrimmedDid::try_from(key.as_ref()) { Ok(did) => did.to_did(), Err(e) => { error!("invalid did in db, skipping: {e}"); continue; } }; match rmp_serde::from_slice::(&value) { Ok(ResyncState::Error { kind, next_retry, .. }) => { if next_retry <= now { debug!("retrying backfill for {did}"); let state_bytes = match state.db.repos.get(&key).into_diagnostic() { Ok(b) => b, Err(err) => { error!("failed to get repo state for {did}: {err}"); continue; } }; let Some(state_bytes) = state_bytes else { error!("repo state not found for {did}"); continue; }; let repo_state = match deser_repo_state(&state_bytes) { Ok(s) => s, Err(e) => { error!("failed to deserialize repo state for {did}: {e}"); continue; } }; let res = ops::update_repo_status( &mut batch, &state.db, &did, repo_state, RepoStatus::Backfilling, ); if let Err(e) = res { error!("failed to update repo status for {did}: {e}"); continue; } transitions.push((GaugeState::Resync(Some(kind)), GaugeState::Pending)); } } Ok(_) => { // not an error state, do nothing } Err(e) => { error!("failed to deserialize resync state for {did}: {e}"); continue; } } } if transitions.is_empty() { continue; } if let Err(e) = batch.commit() { error!("failed to commit batch: {e}"); db::check_poisoned(&e); continue; } for (old_gauge, new_gauge) in &transitions { state.db.update_gauge_diff(old_gauge, new_gauge); } state.notify_backfill(); info!("queued {} retries", transitions.len()); } }