tangled
alpha
login
or
join now
ptr.pet
/
hydrant
28
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
28
fork
atom
overview
issues
6
pulls
pipelines
[backfill] enhance handle resolution and error classification
ptr.pet
1 month ago
83a7028b
a7ec3380
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+117
-40
2 changed files
expand all
collapse all
unified
split
src
backfill
mod.rs
resolver.rs
+106
-35
src/backfill/mod.rs
···
1
1
use crate::db::{keys, Db};
2
2
use crate::ops;
3
3
use crate::state::{AppState, BackfillRx};
4
4
-
use crate::types::{ErrorState, RepoState, RepoStatus, StoredEvent};
4
4
+
use crate::types::{BroadcastEvent, ErrorState, IdentityEvt, RepoState, RepoStatus, StoredEvent};
5
5
use futures::TryFutureExt;
6
6
-
use jacquard::api::com_atproto::sync::get_repo::GetRepo;
6
6
+
use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError};
7
7
use jacquard::api::com_atproto::sync::subscribe_repos::Commit;
8
8
use jacquard::prelude::*;
9
9
use jacquard::types::did::Did;
10
10
+
use jacquard_common::xrpc::XrpcError;
10
11
use jacquard_repo::mst::Mst;
11
12
use jacquard_repo::MemoryBlockStore;
12
13
use miette::{IntoDiagnostic, Result};
···
17
18
use std::sync::Arc;
18
19
use std::time::{Duration, Instant};
19
20
use tokio::sync::Semaphore;
20
20
-
use tracing::{debug, error, info, trace};
21
21
+
use tracing::{debug, error, info, trace, warn};
21
22
22
23
pub mod manager;
23
24
···
100
101
if is_error {
101
102
batch.remove(&db.errors, did_key);
102
103
}
103
103
-
104
104
tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
105
105
.await
106
106
.into_diagnostic()??;
···
119
119
futures::future::join_all(pending_fut.into_iter().chain(error_fut))
120
120
});
121
121
122
122
+
let state = state.clone();
122
123
tokio::task::spawn_blocking(move || {
123
124
state
124
125
.db
···
152
153
next_retry,
153
154
};
154
155
155
155
-
let mut batch = db.inner.batch();
156
156
+
let state = state.clone();
157
157
+
let did_key = did_key.to_vec();
156
158
157
157
-
// 3. save to errors
158
158
-
let bytes = rmp_serde::to_vec(&err_state).into_diagnostic()?;
159
159
-
batch.insert(&db.errors, did_key, bytes);
159
159
+
tokio::task::spawn_blocking(move || {
160
160
+
// 3. we will save to errors
161
161
+
let serialized_error_state = rmp_serde::to_vec(&err_state).into_diagnostic()?;
160
162
161
161
-
// 4. update main repo state
162
162
-
if let Some(state_bytes) = Db::get(db.repos.clone(), did_key).await? {
163
163
-
let mut state: RepoState =
164
164
-
rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
165
165
-
state.status = RepoStatus::Error(e.to_string().into());
166
166
-
let state_bytes = rmp_serde::to_vec(&state).into_diagnostic()?;
167
167
-
batch.insert(&db.repos, did_key, state_bytes);
168
168
-
}
163
163
+
// 4. and update the main repo state
164
164
+
let serialized_repo_state = if let Some(state_bytes) =
165
165
+
state.db.repos.get(&did_key).into_diagnostic()?
166
166
+
{
167
167
+
let mut state: RepoState =
168
168
+
rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
169
169
+
state.status = RepoStatus::Error(e.to_string().into());
170
170
+
Some(rmp_serde::to_vec(&state).into_diagnostic()?)
171
171
+
} else {
172
172
+
None
173
173
+
};
174
174
+
175
175
+
let mut batch = state.db.inner.batch();
176
176
+
177
177
+
batch.insert(&state.db.errors, &did_key, serialized_error_state);
178
178
+
179
179
+
if let Some(state_bytes) = serialized_repo_state {
180
180
+
batch.insert(&state.db.repos, &did_key, state_bytes);
181
181
+
}
169
182
170
170
-
// 5. remove from pending (it's now in errors)
171
171
-
batch.remove(&db.pending, did_key);
183
183
+
// 5. remove from pending (it's now in errors)
184
184
+
batch.remove(&state.db.pending, &did_key);
172
185
173
173
-
tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
174
174
-
.await
175
175
-
.into_diagnostic()??;
186
186
+
batch.commit().into_diagnostic()
187
187
+
})
188
188
+
.await
189
189
+
.into_diagnostic()??;
176
190
177
191
Ok(())
178
192
}
···
197
211
198
212
// 1. resolve pds
199
213
let start = Instant::now();
200
200
-
let pds_url = app_state.resolver.resolve_pds(did).await?;
214
214
+
let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?;
201
215
trace!(
202
202
-
"resolved {} to pds {} in {:?}",
203
203
-
did,
204
204
-
pds_url,
216
216
+
"resolved {did} to pds {pds_url} handle {handle:?} in {:?}",
205
217
start.elapsed()
206
218
);
207
219
220
220
+
if let Some(h) = handle {
221
221
+
state.handle = Some(h.to_smolstr());
222
222
+
}
223
223
+
224
224
+
let emit_identity = |status: &RepoStatus| {
225
225
+
let evt = IdentityEvt {
226
226
+
did: did.as_str().into(),
227
227
+
handle: state.handle.clone().unwrap_or_default(),
228
228
+
is_active: !matches!(
229
229
+
status,
230
230
+
RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended
231
231
+
),
232
232
+
status: match status {
233
233
+
RepoStatus::Deactivated => "deactivated",
234
234
+
RepoStatus::Takendown => "takendown",
235
235
+
RepoStatus::Suspended => "suspended",
236
236
+
_ => "active",
237
237
+
}
238
238
+
.into(),
239
239
+
};
240
240
+
ops::emit_identity_event(db, evt);
241
241
+
};
242
242
+
208
243
// 2. fetch repo (car)
209
244
let start = Instant::now();
210
245
let req = GetRepo::new().did(did.clone()).build();
211
211
-
let car_bytes = http
212
212
-
.xrpc(pds_url)
213
213
-
.send(&req)
214
214
-
.await
215
215
-
.into_diagnostic()?
216
216
-
.into_output()
217
217
-
.into_diagnostic()?;
246
246
+
let resp = http.xrpc(pds_url).send(&req).await.into_diagnostic()?;
247
247
+
248
248
+
let car_bytes = match resp.into_output() {
249
249
+
Ok(o) => o,
250
250
+
Err(XrpcError::Xrpc(e)) => {
251
251
+
if matches!(e, GetRepoError::RepoNotFound(_)) {
252
252
+
warn!("repo {did} not found, deleting");
253
253
+
ops::delete_repo(db, did)?;
254
254
+
return Ok(previous_state); // stop backfill
255
255
+
}
256
256
+
257
257
+
let inactive_status = match e {
258
258
+
GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated),
259
259
+
GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown),
260
260
+
GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended),
261
261
+
_ => None,
262
262
+
};
263
263
+
264
264
+
if let Some(status) = inactive_status {
265
265
+
warn!("repo {did} is {status:?}, stopping backfill");
266
266
+
267
267
+
emit_identity(&status);
268
268
+
app_state
269
269
+
.db
270
270
+
.update_repo_state_async(did, move |state, _| {
271
271
+
state.status = status;
272
272
+
Ok((true, ()))
273
273
+
})
274
274
+
.await?;
275
275
+
276
276
+
// return success so wrapper stops retrying
277
277
+
return Ok(previous_state);
278
278
+
}
279
279
+
280
280
+
return Err(e).into_diagnostic();
281
281
+
}
282
282
+
Err(e) => return Err(e).into_diagnostic(),
283
283
+
};
284
284
+
285
285
+
// emit identity event so any consumers know
286
286
+
emit_identity(&state.status);
287
287
+
218
288
trace!(
219
289
"fetched {} bytes for {} in {:?}",
220
290
car_bytes.body.len(),
···
332
402
// 6. update status to synced (inside batch)
333
403
state.status = RepoStatus::Synced;
334
404
state.rev = loop_rev.as_str().into();
405
405
+
state.data = commit.data.to_smolstr();
335
406
state.last_updated_at = chrono::Utc::now().timestamp();
336
407
337
408
let did_key = keys::repo_key(&loop_did);
···
386
457
start.elapsed()
387
458
);
388
459
389
389
-
let _ = db
390
390
-
.event_tx
391
391
-
.send(db.next_event_id.load(Ordering::SeqCst) - 1);
460
460
+
let _ = db.event_tx.send(BroadcastEvent::Persisted(
461
461
+
db.next_event_id.load(Ordering::SeqCst) - 1,
462
462
+
));
392
463
393
464
debug!("marked {did} as synced, draining buffer...");
394
465
+11
-5
src/resolver.rs
···
1
1
+
use std::ops::Not;
2
2
+
3
3
+
use jacquard::types::string::Handle;
1
4
use jacquard::IntoStatic;
2
5
use jacquard_common::types::ident::AtIdentifier;
3
6
use jacquard_common::types::string::Did;
···
31
34
}
32
35
}
33
36
34
34
-
pub async fn resolve_pds(&self, did: &Did<'_>) -> Result<Url> {
37
37
+
pub async fn resolve_identity_info(&self, did: &Did<'_>) -> Result<(Url, Option<Handle<'_>>)> {
35
38
let doc_resp = self.inner.resolve_did_doc(did).await.into_diagnostic()?;
36
39
let doc = doc_resp.parse().into_diagnostic()?;
37
40
38
38
-
if let Some(url) = doc.pds_endpoint() {
39
39
-
return Ok(url);
40
40
-
}
41
41
+
let pds = doc
42
42
+
.pds_endpoint()
43
43
+
.ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?;
44
44
+
45
45
+
let mut handles = doc.handles();
46
46
+
let handle = handles.is_empty().not().then(|| handles.remove(0));
41
47
42
42
-
Err(miette::miette!("no PDS service found in DID Doc for {did}"))
48
48
+
Ok((pds, handle))
43
49
}
44
50
}