tangled
alpha
login
or
join now
ptr.pet
/
hydrant
28
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
28
fork
atom
overview
issues
6
pulls
pipelines
[backfill] refactor resync state with error kind and gauges
ptr.pet
3 weeks ago
bbdf905a
0e84e201
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+135
-45
4 changed files
expand all
collapse all
unified
split
src
api
repo.rs
backfill
mod.rs
ops.rs
types.rs
+61
-7
src/api/repo.rs
···
70
70
) -> Result<StatusCode, (StatusCode, String)> {
71
71
let db = &state.db;
72
72
let mut batch = db.inner.batch();
73
73
-
let mut removed = 0;
73
73
+
let mut removed_repos = 0;
74
74
+
let mut removed_pending = 0;
75
75
+
let mut removed_resync = 0;
74
76
75
77
for did_str in req.dids {
76
78
let did = Did::new_owned(did_str.as_str())
77
79
.map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
78
80
let did_key = keys::repo_key(&did);
79
79
-
if Db::contains_key(db.repos.clone(), &did_key)
81
81
+
82
82
+
if let Some(state_bytes) = Db::get(db.repos.clone(), &did_key)
80
83
.await
81
84
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
82
85
{
86
86
+
let repo_state = crate::db::deser_repo_state(&state_bytes)
87
87
+
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
88
88
+
89
89
+
let was_pending = matches!(repo_state.status, crate::types::RepoStatus::Backfilling);
90
90
+
let _was_resync = matches!(
91
91
+
repo_state.status,
92
92
+
crate::types::RepoStatus::Error(_)
93
93
+
| crate::types::RepoStatus::Deactivated
94
94
+
| crate::types::RepoStatus::Takendown
95
95
+
| crate::types::RepoStatus::Suspended
96
96
+
);
97
97
+
83
98
batch.remove(&db.repos, &did_key);
84
84
-
batch.remove(&db.pending, &did_key);
85
85
-
batch.remove(&db.resync, &did_key);
86
86
-
removed -= 1;
99
99
+
100
100
+
if was_pending {
101
101
+
batch.remove(&db.pending, &did_key);
102
102
+
removed_pending -= 1;
103
103
+
}
104
104
+
if let Some(resync_bytes) = Db::get(db.resync.clone(), &did_key)
105
105
+
.await
106
106
+
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
107
107
+
{
108
108
+
let resync_state: crate::types::ResyncState = rmp_serde::from_slice(&resync_bytes)
109
109
+
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
110
110
+
111
111
+
if let crate::types::ResyncState::Error { kind, .. } = resync_state {
112
112
+
match kind {
113
113
+
crate::types::ResyncErrorKind::Ratelimited => {
114
114
+
state.db.update_count_async("error_ratelimited", -1).await
115
115
+
}
116
116
+
crate::types::ResyncErrorKind::Transport => {
117
117
+
state.db.update_count_async("error_transport", -1).await
118
118
+
}
119
119
+
crate::types::ResyncErrorKind::Generic => {
120
120
+
state.db.update_count_async("error_generic", -1).await
121
121
+
}
122
122
+
}
123
123
+
}
124
124
+
125
125
+
batch.remove(&db.resync, &did_key);
126
126
+
removed_resync -= 1;
127
127
+
}
128
128
+
129
129
+
removed_repos -= 1;
87
130
}
88
131
}
89
132
···
91
134
.await
92
135
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
93
136
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
94
94
-
state.db.update_count_async("repos", removed).await;
95
95
-
state.db.update_count_async("pending", removed).await;
137
137
+
138
138
+
if removed_repos != 0 {
139
139
+
state.db.update_count_async("repos", removed_repos).await;
140
140
+
}
141
141
+
if removed_pending != 0 {
142
142
+
state
143
143
+
.db
144
144
+
.update_count_async("pending", removed_pending)
145
145
+
.await;
146
146
+
}
147
147
+
if removed_resync != 0 {
148
148
+
state.db.update_count_async("resync", removed_resync).await;
149
149
+
}
96
150
97
151
Ok(StatusCode::OK)
98
152
}
+63
-34
src/backfill/mod.rs
···
270
270
Ok(())
271
271
}
272
272
Err(e) => {
273
273
-
let mut was_ratelimited = false;
274
273
match &e {
275
274
BackfillError::Ratelimited => {
276
276
-
was_ratelimited = true;
277
275
debug!("failed for {did}: too many requests");
278
276
}
279
277
BackfillError::Transport(reason) => {
···
284
282
}
285
283
}
286
284
285
285
+
let error_kind = match &e {
286
286
+
BackfillError::Ratelimited => crate::types::ResyncErrorKind::Ratelimited,
287
287
+
BackfillError::Transport(_) => crate::types::ResyncErrorKind::Transport,
288
288
+
BackfillError::Generic(_) => crate::types::ResyncErrorKind::Generic,
289
289
+
};
290
290
+
287
291
let did_key = keys::repo_key(&did);
288
292
289
293
// 1. get current retry count
290
290
-
let mut resync_state = Db::get(db.resync.clone(), &did_key)
291
291
-
.await
292
292
-
.and_then(|b| {
293
293
-
b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic())
294
294
-
.transpose()
295
295
-
})?
296
296
-
.and_then(|s| {
297
297
-
matches!(s, ResyncState::Gone { .. })
298
298
-
.then_some(None)
299
299
-
.unwrap_or(Some(s))
300
300
-
})
301
301
-
.unwrap_or_else(|| ResyncState::Error {
302
302
-
message: SmolStr::new_static(""),
303
303
-
retry_count: 0,
304
304
-
next_retry: 0,
305
305
-
});
294
294
+
let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| {
295
295
+
b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic())
296
296
+
.transpose()
297
297
+
})?;
298
298
+
299
299
+
let (mut retry_count, prev_kind) = match existing_state {
300
300
+
Some(ResyncState::Error {
301
301
+
kind, retry_count, ..
302
302
+
}) => (retry_count, Some(kind)),
303
303
+
Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really?
304
304
+
None => (0, None),
305
305
+
};
306
306
+
307
307
+
// Calculate new stats
308
308
+
retry_count += 1;
309
309
+
let next_retry = ResyncState::next_backoff(retry_count);
306
310
307
307
-
let ResyncState::Error {
308
308
-
message,
311
311
+
let resync_state = ResyncState::Error {
312
312
+
kind: error_kind.clone(),
309
313
retry_count,
310
314
next_retry,
311
311
-
} = &mut resync_state
312
312
-
else {
313
313
-
unreachable!("we handled the gone case above");
314
315
};
315
316
316
316
-
// 2. calculate backoff and update the other fields
317
317
-
*retry_count += was_ratelimited.then_some(1).unwrap_or(1);
318
318
-
*next_retry = ResyncState::next_backoff(*retry_count);
319
319
-
*message = e.to_smolstr();
320
320
-
321
317
let error_string = e.to_string();
322
318
323
319
tokio::task::spawn_blocking({
···
355
351
state.db.update_count_async("resync", 1).await;
356
352
state.db.update_count_async("pending", -1).await;
357
353
358
358
-
// add error stats
359
359
-
if was_ratelimited {
360
360
-
state.db.update_count_async("error_ratelimited", 1).await;
361
361
-
} else if let BackfillError::Transport(_) = &e {
362
362
-
state.db.update_count_async("error_transport", 1).await;
354
354
+
// update gauges
355
355
+
if let Some(prev) = prev_kind {
356
356
+
if prev != error_kind {
357
357
+
match prev {
358
358
+
crate::types::ResyncErrorKind::Ratelimited => {
359
359
+
state.db.update_count_async("error_ratelimited", -1).await
360
360
+
}
361
361
+
crate::types::ResyncErrorKind::Transport => {
362
362
+
state.db.update_count_async("error_transport", -1).await
363
363
+
}
364
364
+
crate::types::ResyncErrorKind::Generic => {
365
365
+
state.db.update_count_async("error_generic", -1).await
366
366
+
}
367
367
+
}
368
368
+
match error_kind {
369
369
+
crate::types::ResyncErrorKind::Ratelimited => {
370
370
+
state.db.update_count_async("error_ratelimited", 1).await
371
371
+
}
372
372
+
crate::types::ResyncErrorKind::Transport => {
373
373
+
state.db.update_count_async("error_transport", 1).await
374
374
+
}
375
375
+
crate::types::ResyncErrorKind::Generic => {
376
376
+
state.db.update_count_async("error_generic", 1).await
377
377
+
}
378
378
+
}
379
379
+
}
380
380
+
// if same, do nothing (count already accurate)
363
381
} else {
364
364
-
state.db.update_count_async("error_generic", 1).await;
382
382
+
// new error
383
383
+
match error_kind {
384
384
+
crate::types::ResyncErrorKind::Ratelimited => {
385
385
+
state.db.update_count_async("error_ratelimited", 1).await
386
386
+
}
387
387
+
crate::types::ResyncErrorKind::Transport => {
388
388
+
state.db.update_count_async("error_transport", 1).await
389
389
+
}
390
390
+
crate::types::ResyncErrorKind::Generic => {
391
391
+
state.db.update_count_async("error_generic", 1).await
392
392
+
}
393
393
+
}
365
394
}
366
395
Err(e)
367
396
}
+3
-3
src/ops.rs
···
131
131
batch.insert(&db.pending, &key, &[]);
132
132
batch.remove(&db.resync, &key);
133
133
}
134
134
-
RepoStatus::Error(msg) => {
134
134
+
RepoStatus::Error(_msg) => {
135
135
batch.remove(&db.pending, &key);
136
136
-
let resync_state = ResyncState::Error {
137
137
-
message: msg.clone(),
136
136
+
let resync_state = crate::types::ResyncState::Error {
137
137
+
kind: crate::types::ResyncErrorKind::Generic, // ops errors are usually generic logic errors? or transport?
138
138
retry_count: 0,
139
139
next_retry: chrono::Utc::now().timestamp(),
140
140
};
+8
-1
src/types.rs
···
77
77
78
78
// from src/backfill/resync_state.rs
79
79
80
80
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
81
81
+
pub enum ResyncErrorKind {
82
82
+
Ratelimited,
83
83
+
Transport,
84
84
+
Generic,
85
85
+
}
86
86
+
80
87
#[derive(Debug, Clone, Serialize, Deserialize)]
81
88
pub enum ResyncState {
82
89
Error {
83
83
-
message: SmolStr,
90
90
+
kind: ResyncErrorKind,
84
91
retry_count: u32,
85
92
next_retry: i64, // unix timestamp
86
93
},