at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::types::TrimmedDid;
2use crate::db::{self, deser_repo_state};
3use crate::ops;
4use crate::state::AppState;
5use crate::types::{GaugeState, RepoStatus, ResyncState};
6use miette::{IntoDiagnostic, Result};
7use std::sync::Arc;
8use std::time::Duration;
9use tracing::{debug, error, info, warn};
10
11pub fn queue_gone_backfills(state: &Arc<AppState>) -> Result<()> {
12 debug!("scanning for deactivated/takendown repos to retry...");
13 let mut transitions = Vec::new();
14
15 let mut batch = state.db.inner.batch();
16
17 for guard in state.db.resync.iter() {
18 let (key, val) = guard.into_inner().into_diagnostic()?;
19 let did = match TrimmedDid::try_from(key.as_ref()) {
20 Ok(did) => did.to_did(),
21 Err(e) => {
22 error!(err = %e, "invalid did in db, skipping");
23 continue;
24 }
25 };
26
27 if let Ok(resync_state) = rmp_serde::from_slice::<ResyncState>(&val) {
28 if matches!(resync_state, ResyncState::Gone { .. }) {
29 debug!(did = %did, "queuing retry for gone repo");
30
31 let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? else {
32 warn!(did = %did, "repo state not found");
33 continue;
34 };
35
36 // update repo state back to backfilling
37 let repo_state = deser_repo_state(&state_bytes)?;
38 ops::update_repo_status(
39 &mut batch,
40 &state.db,
41 &did,
42 repo_state,
43 RepoStatus::Backfilling,
44 )?;
45
46 transitions.push((GaugeState::Resync(None), GaugeState::Pending));
47 }
48 }
49 }
50
51 if transitions.is_empty() {
52 return Ok(());
53 }
54
55 batch.commit().into_diagnostic()?;
56
57 for (old_gauge, new_gauge) in &transitions {
58 state.db.update_gauge_diff(old_gauge, new_gauge);
59 }
60
61 state.notify_backfill();
62
63 info!(count = transitions.len(), "queued gone backfills");
64 Ok(())
65}
66
67pub fn retry_worker(state: Arc<AppState>) {
68 let db = &state.db;
69 info!("retry worker started");
70 loop {
71 // sleep first (e.g., check every minute)
72 std::thread::sleep(Duration::from_secs(60));
73
74 let now = chrono::Utc::now().timestamp();
75 let mut transitions = Vec::new();
76
77 let mut batch = state.db.inner.batch();
78
79 for guard in db.resync.iter() {
80 let (key, value) = match guard.into_inner() {
81 Ok(t) => t,
82 Err(e) => {
83 error!(err = %e, "failed to get resync state");
84 db::check_poisoned(&e);
85 continue;
86 }
87 };
88 let did = match TrimmedDid::try_from(key.as_ref()) {
89 Ok(did) => did.to_did(),
90 Err(e) => {
91 error!(err = %e, "invalid did in db, skipping");
92 continue;
93 }
94 };
95
96 match rmp_serde::from_slice::<ResyncState>(&value) {
97 Ok(ResyncState::Error {
98 kind, next_retry, ..
99 }) => {
100 if next_retry <= now {
101 debug!(did = %did, "retrying backfill");
102
103 let state_bytes = match state.db.repos.get(&key).into_diagnostic() {
104 Ok(b) => b,
105 Err(err) => {
106 error!(did = %did, err = %err, "failed to get repo state");
107 continue;
108 }
109 };
110 let Some(state_bytes) = state_bytes else {
111 error!(did = %did, "repo state not found");
112 continue;
113 };
114
115 let repo_state = match deser_repo_state(&state_bytes) {
116 Ok(s) => s,
117 Err(e) => {
118 error!(did = %did, err = %e, "failed to deserialize repo state");
119 continue;
120 }
121 };
122 let res = ops::update_repo_status(
123 &mut batch,
124 &state.db,
125 &did,
126 repo_state,
127 RepoStatus::Backfilling,
128 );
129 if let Err(e) = res {
130 error!(did = %did, err = %e, "failed to update repo status");
131 continue;
132 }
133
134 transitions.push((GaugeState::Resync(Some(kind)), GaugeState::Pending));
135 }
136 }
137 Ok(_) => {
138 // not an error state, do nothing
139 }
140 Err(e) => {
141 error!(did = %did, err = %e, "failed to deserialize resync state");
142 continue;
143 }
144 }
145 }
146
147 if transitions.is_empty() {
148 continue;
149 }
150
151 if let Err(e) = batch.commit() {
152 error!(err = %e, "failed to commit batch");
153 db::check_poisoned(&e);
154 continue;
155 }
156
157 for (old_gauge, new_gauge) in &transitions {
158 state.db.update_gauge_diff(old_gauge, new_gauge);
159 }
160 state.notify_backfill();
161 info!(count = transitions.len(), "queued retries");
162 }
163}