tangled
alpha
login
or
join now
ptr.pet
/
hydrant
28
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
28
fork
atom
overview
issues
6
pulls
pipelines
[all] more structured logging
ptr.pet
1 week ago
34afb38d
5ba9461d
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+141
-129
8 changed files
expand all
collapse all
unified
split
src
api
stream.rs
backfill
manager.rs
mod.rs
ingest
firehose.rs
stream.rs
worker.rs
main.rs
ops.rs
+9
-9
src/api/stream.rs
···
65
let (k, v) = match item.into_inner() {
66
Ok((k, v)) => (k, v),
67
Err(e) => {
68
-
error!("failed to read event from db: {e}");
69
break;
70
}
71
};
···
78
{
79
Ok(id) => id,
80
Err(e) => {
81
-
error!("failed to parse event id: {e}");
82
continue;
83
}
84
};
···
95
} = match rmp_serde::from_slice(&v) {
96
Ok(e) => e,
97
Err(e) => {
98
-
error!("failed to deserialize stored event: {e}");
99
continue;
100
}
101
};
···
134
let json_str = match serde_json::to_string(&marshallable) {
135
Ok(s) => s,
136
Err(e) => {
137
-
error!("failed to serialize ws event: {e}");
138
continue;
139
}
140
};
141
142
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
143
-
error!("failed to send ws message: {e}");
144
return;
145
}
146
···
172
let json_str = match serde_json::to_string(&evt) {
173
Ok(s) => s,
174
Err(e) => {
175
-
error!("failed to serialize ws event: {e}");
176
continue;
177
}
178
};
179
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
180
-
error!("failed to send ws message: {e}");
181
return;
182
}
183
}
···
194
195
while let Some(msg) = rx.recv().await {
196
if let Err(e) = socket.send(msg).await {
197
-
error!("failed to send ws message: {e}");
198
break;
199
}
200
}
201
202
let _ = cancel_tx.send(());
203
if let Err(e) = thread.join() {
204
-
error!("stream handler thread panicked: {e:?}");
205
}
206
}
···
65
let (k, v) = match item.into_inner() {
66
Ok((k, v)) => (k, v),
67
Err(e) => {
68
+
error!(err = %e, "failed to read event from db");
69
break;
70
}
71
};
···
78
{
79
Ok(id) => id,
80
Err(e) => {
81
+
error!(err = %e, "failed to parse event id");
82
continue;
83
}
84
};
···
95
} = match rmp_serde::from_slice(&v) {
96
Ok(e) => e,
97
Err(e) => {
98
+
error!(err = %e, "failed to deserialize stored event");
99
continue;
100
}
101
};
···
134
let json_str = match serde_json::to_string(&marshallable) {
135
Ok(s) => s,
136
Err(e) => {
137
+
error!(err = %e, "failed to serialize ws event");
138
continue;
139
}
140
};
141
142
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
143
+
error!(err = %e, "failed to send ws message");
144
return;
145
}
146
···
172
let json_str = match serde_json::to_string(&evt) {
173
Ok(s) => s,
174
Err(e) => {
175
+
error!(err = %e, "failed to serialize ws event");
176
continue;
177
}
178
};
179
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
180
+
error!(err = %e, "failed to send ws message");
181
return;
182
}
183
}
···
194
195
while let Some(msg) = rx.recv().await {
196
if let Err(e) = socket.send(msg).await {
197
+
error!(err = %e, "failed to send ws message");
198
break;
199
}
200
}
201
202
let _ = cancel_tx.send(());
203
if let Err(e) = thread.join() {
204
+
error!(err = ?e, "stream handler thread panicked");
205
}
206
}
+14
-14
src/backfill/manager.rs
···
19
let did = match TrimmedDid::try_from(key.as_ref()) {
20
Ok(did) => did.to_did(),
21
Err(e) => {
22
-
error!("invalid did in db, skipping: {e}");
23
continue;
24
}
25
};
26
27
if let Ok(resync_state) = rmp_serde::from_slice::<ResyncState>(&val) {
28
if matches!(resync_state, ResyncState::Gone { .. }) {
29
-
debug!("queuing retry for gone repo: {did}");
30
31
let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? else {
32
-
error!("repo state not found for {did}");
33
continue;
34
};
35
···
60
61
state.notify_backfill();
62
63
-
info!("queued {} gone backfills", transitions.len());
64
Ok(())
65
}
66
···
80
let (key, value) = match guard.into_inner() {
81
Ok(t) => t,
82
Err(e) => {
83
-
error!("failed to get resync state: {e}");
84
db::check_poisoned(&e);
85
continue;
86
}
···
88
let did = match TrimmedDid::try_from(key.as_ref()) {
89
Ok(did) => did.to_did(),
90
Err(e) => {
91
-
error!("invalid did in db, skipping: {e}");
92
continue;
93
}
94
};
···
98
kind, next_retry, ..
99
}) => {
100
if next_retry <= now {
101
-
debug!("retrying backfill for {did}");
102
103
let state_bytes = match state.db.repos.get(&key).into_diagnostic() {
104
Ok(b) => b,
105
Err(err) => {
106
-
error!("failed to get repo state for {did}: {err}");
107
continue;
108
}
109
};
110
let Some(state_bytes) = state_bytes else {
111
-
error!("repo state not found for {did}");
112
continue;
113
};
114
115
let repo_state = match deser_repo_state(&state_bytes) {
116
Ok(s) => s,
117
Err(e) => {
118
-
error!("failed to deserialize repo state for {did}: {e}");
119
continue;
120
}
121
};
···
127
RepoStatus::Backfilling,
128
);
129
if let Err(e) = res {
130
-
error!("failed to update repo status for {did}: {e}");
131
continue;
132
}
133
···
138
// not an error state, do nothing
139
}
140
Err(e) => {
141
-
error!("failed to deserialize resync state for {did}: {e}");
142
continue;
143
}
144
}
···
149
}
150
151
if let Err(e) = batch.commit() {
152
-
error!("failed to commit batch: {e}");
153
db::check_poisoned(&e);
154
continue;
155
}
···
158
state.db.update_gauge_diff(old_gauge, new_gauge);
159
}
160
state.notify_backfill();
161
-
info!("queued {} retries", transitions.len());
162
}
163
}
···
19
let did = match TrimmedDid::try_from(key.as_ref()) {
20
Ok(did) => did.to_did(),
21
Err(e) => {
22
+
error!(err = %e, "invalid did in db, skipping");
23
continue;
24
}
25
};
26
27
if let Ok(resync_state) = rmp_serde::from_slice::<ResyncState>(&val) {
28
if matches!(resync_state, ResyncState::Gone { .. }) {
29
+
debug!(did = %did, "queuing retry for gone repo");
30
31
let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? else {
32
+
error!(did = %did, "repo state not found");
33
continue;
34
};
35
···
60
61
state.notify_backfill();
62
63
+
info!(count = transitions.len(), "queued gone backfills");
64
Ok(())
65
}
66
···
80
let (key, value) = match guard.into_inner() {
81
Ok(t) => t,
82
Err(e) => {
83
+
error!(err = %e, "failed to get resync state");
84
db::check_poisoned(&e);
85
continue;
86
}
···
88
let did = match TrimmedDid::try_from(key.as_ref()) {
89
Ok(did) => did.to_did(),
90
Err(e) => {
91
+
error!(err = %e, "invalid did in db, skipping");
92
continue;
93
}
94
};
···
98
kind, next_retry, ..
99
}) => {
100
if next_retry <= now {
101
+
debug!(did = %did, "retrying backfill");
102
103
let state_bytes = match state.db.repos.get(&key).into_diagnostic() {
104
Ok(b) => b,
105
Err(err) => {
106
+
error!(did = %did, err = %err, "failed to get repo state");
107
continue;
108
}
109
};
110
let Some(state_bytes) = state_bytes else {
111
+
error!(did = %did, "repo state not found");
112
continue;
113
};
114
115
let repo_state = match deser_repo_state(&state_bytes) {
116
Ok(s) => s,
117
Err(e) => {
118
+
error!(did = %did, err = %e, "failed to deserialize repo state");
119
continue;
120
}
121
};
···
127
RepoStatus::Backfilling,
128
);
129
if let Err(e) = res {
130
+
error!(did = %did, err = %e, "failed to update repo status");
131
continue;
132
}
133
···
138
// not an error state, do nothing
139
}
140
Err(e) => {
141
+
error!(did = %did, err = %e, "failed to deserialize resync state");
142
continue;
143
}
144
}
···
149
}
150
151
if let Err(e) = batch.commit() {
152
+
error!(err = %e, "failed to commit batch");
153
db::check_poisoned(&e);
154
continue;
155
}
···
158
state.db.update_gauge_diff(old_gauge, new_gauge);
159
}
160
state.notify_backfill();
161
+
info!(count = transitions.len(), "queued retries");
162
}
163
}
+46
-35
src/backfill/mod.rs
···
89
let (key, value) = match guard.into_inner() {
90
Ok(kv) => kv,
91
Err(e) => {
92
-
error!("failed to read pending entry: {e}");
93
db::check_poisoned(&e);
94
continue;
95
}
···
98
let did = match TrimmedDid::try_from(value.as_ref()) {
99
Ok(d) => d.to_did(),
100
Err(e) => {
101
-
error!("invalid did in pending value: {e}");
102
continue;
103
}
104
};
···
129
let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await;
130
131
if let Err(e) = res {
132
-
error!("backfill process failed for {did}: {e}");
133
if let BackfillError::Generic(report) = &e {
134
db::check_poisoned_report(report);
135
}
···
204
205
// Notify completion to worker shard
206
if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) {
207
-
error!("failed to send BackfillFinished for {did}: {e}");
208
}
209
Ok(())
210
}
···
217
Err(e) => {
218
match &e {
219
BackfillError::Ratelimited => {
220
-
debug!("failed for {did}: too many requests");
221
}
222
BackfillError::Transport(reason) => {
223
-
error!("failed for {did}: transport error: {reason}");
224
}
225
BackfillError::Generic(e) => {
226
-
error!("failed for {did}: {e}");
227
}
228
}
229
···
359
did: &Did<'static>,
360
verify_signatures: bool,
361
) -> Result<Option<RepoState<'static>>, BackfillError> {
362
-
debug!("backfilling {}", did);
363
364
let db = &app_state.db;
365
let did_key = keys::repo_key(did);
···
375
let start = Instant::now();
376
let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?;
377
trace!(
378
-
"resolved {did} to pds {pds_url} handle {handle:?} in {:?}",
379
-
start.elapsed()
0
0
0
380
);
381
382
state.handle = handle.map(|h| h.into_static());
···
410
Ok(o) => o,
411
Err(XrpcError::Xrpc(e)) => {
412
if matches!(e, GetRepoError::RepoNotFound(_)) {
413
-
warn!("repo {did} not found, deleting");
414
let mut batch = db.inner.batch();
415
if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) {
416
-
tracing::error!("failed to wipe repo during backfill: {e}");
417
}
418
batch.commit().into_diagnostic()?;
419
return Ok(Some(previous_state)); // stop backfill
···
427
};
428
429
if let Some(status) = inactive_status {
430
-
warn!("repo {did} is {status:?}, stopping backfill");
431
432
emit_identity(&status);
433
···
459
emit_identity(&state.status);
460
461
trace!(
462
-
"fetched {} bytes for {did} in {:?}",
463
-
car_bytes.body.len(),
464
-
start.elapsed()
0
465
);
466
467
// 3. import repo
···
469
let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body)
470
.await
471
.into_diagnostic()?;
472
-
trace!("parsed car for {did} in {:?}", start.elapsed());
473
474
let start = Instant::now();
475
let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
476
trace!(
477
-
"stored {} blocks in memory for {did} in {:?}",
478
-
store.len(),
479
-
start.elapsed()
0
480
);
481
482
// 4. parse root commit to get mst root
···
488
489
let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?;
490
debug!(
491
-
"backfilling repo at revision {}, root cid {}",
492
-
root_commit.rev, root_commit.data
0
0
493
);
494
495
// 4.5. verify commit signature
···
498
root_commit
499
.verify(&pubkey)
500
.map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
501
-
trace!("signature verified for {did}");
502
}
503
504
// 5. walk mst
505
let start = Instant::now();
506
let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None);
507
let leaves = mst.leaves().await.into_diagnostic()?;
508
-
trace!("walked mst for {did} in {}", start.elapsed().as_secs_f64());
509
510
// 6. insert records into db
511
let start = Instant::now();
···
554
let mut signal_seen = filter.mode == FilterMode::Full || filter.signals.is_empty();
555
556
debug!(
557
-
"backfilling {did}: signal_seen initial={signal_seen}, mode={:?}, signals={:?}",
558
-
filter.mode, filter.signals
0
0
0
559
);
560
561
for (key, cid) in leaves {
···
571
}
572
573
if !signal_seen && filter.matches_signal(collection) {
574
-
debug!("backfill {did}: signal matched on {collection}");
575
signal_seen = true;
576
}
577
···
582
// check if this record already exists with same CID
583
let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) {
584
if existing_cid == cid_obj.as_str() {
585
-
trace!("skip {did}/{collection}/{rkey} ({cid})");
586
continue; // skip unchanged record
587
}
588
(DbAction::Update, false)
589
} else {
590
(DbAction::Create, true)
591
};
592
-
trace!("{action} {did}/{collection}/{rkey} ({cid})");
593
594
// key is did|collection|rkey
595
let db_key = keys::record_key(&did, collection, &rkey);
···
622
623
// remove any remaining existing records (they weren't in the new MST)
624
for ((collection, rkey), cid) in existing_cids {
625
-
trace!("remove {did}/{collection}/{rkey} ({cid})");
626
627
batch.remove(
628
&app_state.db.records,
···
647
}
648
649
if !signal_seen {
650
-
debug!("backfill {did}: no signal-matching records found, discarding repo");
651
return Ok::<_, miette::Report>(None);
652
}
653
···
694
return Ok(None);
695
};
696
697
-
trace!("did {count} ops for {did} in {:?}", start.elapsed());
698
699
// do the counts
700
if records_cnt_delta > 0 {
···
708
.await;
709
}
710
trace!(
711
-
"committed backfill batch for {did} in {:?}",
712
-
start.elapsed()
0
713
);
714
715
let _ = db.event_tx.send(BroadcastEvent::Persisted(
716
db.next_event_id.load(Ordering::SeqCst) - 1,
717
));
718
719
-
trace!("backfill complete for {did}");
720
Ok(Some(previous_state))
721
}
···
89
let (key, value) = match guard.into_inner() {
90
Ok(kv) => kv,
91
Err(e) => {
92
+
error!(err = %e, "failed to read pending entry");
93
db::check_poisoned(&e);
94
continue;
95
}
···
98
let did = match TrimmedDid::try_from(value.as_ref()) {
99
Ok(d) => d.to_did(),
100
Err(e) => {
101
+
error!(err = %e, "invalid did in pending value");
102
continue;
103
}
104
};
···
129
let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await;
130
131
if let Err(e) = res {
132
+
error!(did = %did, err = %e, "backfill process failed");
133
if let BackfillError::Generic(report) = &e {
134
db::check_poisoned_report(report);
135
}
···
204
205
// Notify completion to worker shard
206
if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) {
207
+
error!(did = %did, err = %e, "failed to send BackfillFinished");
208
}
209
Ok(())
210
}
···
217
Err(e) => {
218
match &e {
219
BackfillError::Ratelimited => {
220
+
debug!(did = %did, "too many requests");
221
}
222
BackfillError::Transport(reason) => {
223
+
error!(did = %did, %reason, "transport error");
224
}
225
BackfillError::Generic(e) => {
226
+
error!(did = %did, err = %e, "failed");
227
}
228
}
229
···
359
did: &Did<'static>,
360
verify_signatures: bool,
361
) -> Result<Option<RepoState<'static>>, BackfillError> {
362
+
debug!(did = %did, "backfilling");
363
364
let db = &app_state.db;
365
let did_key = keys::repo_key(did);
···
375
let start = Instant::now();
376
let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?;
377
trace!(
378
+
did = %did,
379
+
pds_url = %pds_url,
380
+
?handle,
381
+
elapsed = ?start.elapsed(),
382
+
"resolved to pds"
383
);
384
385
state.handle = handle.map(|h| h.into_static());
···
413
Ok(o) => o,
414
Err(XrpcError::Xrpc(e)) => {
415
if matches!(e, GetRepoError::RepoNotFound(_)) {
416
+
warn!(did = %did, "repo not found, deleting");
417
let mut batch = db.inner.batch();
418
if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) {
419
+
tracing::error!(err = %e, "failed to wipe repo during backfill");
420
}
421
batch.commit().into_diagnostic()?;
422
return Ok(Some(previous_state)); // stop backfill
···
430
};
431
432
if let Some(status) = inactive_status {
433
+
warn!(did = %did, ?status, "repo is inactive, stopping backfill");
434
435
emit_identity(&status);
436
···
462
emit_identity(&state.status);
463
464
trace!(
465
+
did = %did,
466
+
bytes = car_bytes.body.len(),
467
+
elapsed = ?start.elapsed(),
468
+
"fetched car bytes"
469
);
470
471
// 3. import repo
···
473
let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body)
474
.await
475
.into_diagnostic()?;
476
+
trace!(did = %did, elapsed = ?start.elapsed(), "parsed car");
477
478
let start = Instant::now();
479
let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
480
trace!(
481
+
did = %did,
482
+
blocks = store.len(),
483
+
elapsed = ?start.elapsed(),
484
+
"stored blocks in memory"
485
);
486
487
// 4. parse root commit to get mst root
···
493
494
let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?;
495
debug!(
496
+
did = %did,
497
+
rev = %root_commit.rev,
498
+
cid = %root_commit.data,
499
+
"backfilling repo at revision"
500
);
501
502
// 4.5. verify commit signature
···
505
root_commit
506
.verify(&pubkey)
507
.map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
508
+
trace!(did = %did, "signature verified");
509
}
510
511
// 5. walk mst
512
let start = Instant::now();
513
let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None);
514
let leaves = mst.leaves().await.into_diagnostic()?;
515
+
trace!(did = %did, elapsed = ?start.elapsed(), "walked mst");
516
517
// 6. insert records into db
518
let start = Instant::now();
···
561
let mut signal_seen = filter.mode == FilterMode::Full || filter.signals.is_empty();
562
563
debug!(
564
+
did = %did,
565
+
initial = signal_seen,
566
+
mode = ?filter.mode,
567
+
signals = ?filter.signals,
568
+
"backfilling signal check"
569
);
570
571
for (key, cid) in leaves {
···
581
}
582
583
if !signal_seen && filter.matches_signal(collection) {
584
+
debug!(did = %did, collection = %collection, "signal matched");
585
signal_seen = true;
586
}
587
···
592
// check if this record already exists with same CID
593
let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) {
594
if existing_cid == cid_obj.as_str() {
595
+
trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "skip unchanged record");
596
continue; // skip unchanged record
597
}
598
(DbAction::Update, false)
599
} else {
600
(DbAction::Create, true)
601
};
602
+
trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, ?action, "action record");
603
604
// key is did|collection|rkey
605
let db_key = keys::record_key(&did, collection, &rkey);
···
632
633
// remove any remaining existing records (they weren't in the new MST)
634
for ((collection, rkey), cid) in existing_cids {
635
+
trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "remove existing record");
636
637
batch.remove(
638
&app_state.db.records,
···
657
}
658
659
if !signal_seen {
660
+
debug!(did = %did, "no signal-matching records found, discarding repo");
661
return Ok::<_, miette::Report>(None);
662
}
663
···
704
return Ok(None);
705
};
706
707
+
trace!(did = %did, ops = count, elapsed = ?start.elapsed(), "did ops");
708
709
// do the counts
710
if records_cnt_delta > 0 {
···
718
.await;
719
}
720
trace!(
721
+
did = %did,
722
+
elapsed = ?start.elapsed(),
723
+
"committed backfill batch"
724
);
725
726
let _ = db.event_tx.send(BroadcastEvent::Persisted(
727
db.next_event_id.load(Ordering::SeqCst) - 1,
728
));
729
730
+
trace!(did = %did, "backfill complete");
731
Ok(Some(previous_state))
732
}
+12
-12
src/ingest/firehose.rs
···
46
db::get_firehose_cursor(&self.state.db).await?
47
};
48
match start_cursor {
49
-
Some(c) => info!("resuming from cursor: {c}"),
50
None => info!("no cursor found, live tailing"),
51
}
52
···
58
match FirehoseStream::connect(self.relay_host.clone(), start_cursor).await {
59
Ok(s) => s,
60
Err(e) => {
61
-
error!("failed to connect to firehose: {e}, retrying in 5s...");
62
tokio::time::sleep(Duration::from_secs(5)).await;
63
continue;
64
}
···
70
let bytes = match bytes_res {
71
Ok(b) => b,
72
Err(e) => {
73
-
error!("firehose stream error: {e}");
74
break;
75
}
76
};
77
match decode_frame(&bytes) {
78
Ok(msg) => self.handle_message(msg).await,
79
Err(e) => {
80
-
error!("firehose stream error: {e}");
81
break;
82
}
83
}
···
100
let process = self
101
.should_process(did)
102
.await
103
-
.inspect_err(|e| error!("failed to check if we should process {did}: {e}"))
104
.unwrap_or(false);
105
if !process {
106
-
trace!("skipping {did}: not in filter");
107
return;
108
}
109
-
trace!("forwarding message for {did} to ingest buffer");
110
111
if let Err(e) = self
112
.buffer_tx
113
.send(IngestMessage::Firehose(msg.into_static()))
114
{
115
-
error!("failed to send message to buffer processor: {e}");
116
}
117
}
118
···
139
rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
140
141
if repo_state.tracked {
142
-
trace!("{did} is a tracked repo, processing");
143
return Ok(true);
144
} else {
145
-
debug!("{did} is known but explicitly untracked, skipping");
146
return Ok(false);
147
}
148
}
149
150
if !filter.signals.is_empty() {
151
-
trace!("{did} is unknown — passing to worker for signal check");
152
Ok(true)
153
} else {
154
-
trace!("{did} is unknown and no signals configured, skipping");
155
Ok(false)
156
}
157
}
···
46
db::get_firehose_cursor(&self.state.db).await?
47
};
48
match start_cursor {
49
+
Some(c) => info!(cursor = %c, "resuming from cursor"),
50
None => info!("no cursor found, live tailing"),
51
}
52
···
58
match FirehoseStream::connect(self.relay_host.clone(), start_cursor).await {
59
Ok(s) => s,
60
Err(e) => {
61
+
error!(err = %e, "failed to connect to firehose, retrying in 5s");
62
tokio::time::sleep(Duration::from_secs(5)).await;
63
continue;
64
}
···
70
let bytes = match bytes_res {
71
Ok(b) => b,
72
Err(e) => {
73
+
error!(err = %e, "firehose stream error");
74
break;
75
}
76
};
77
match decode_frame(&bytes) {
78
Ok(msg) => self.handle_message(msg).await,
79
Err(e) => {
80
+
error!(err = %e, "firehose stream error");
81
break;
82
}
83
}
···
100
let process = self
101
.should_process(did)
102
.await
103
+
.inspect_err(|e| error!(did = %did, err = %e, "failed to check if we should process"))
104
.unwrap_or(false);
105
if !process {
106
+
trace!(did = %did, "skipping: not in filter");
107
return;
108
}
109
+
trace!(did = %did, "forwarding message to ingest buffer");
110
111
if let Err(e) = self
112
.buffer_tx
113
.send(IngestMessage::Firehose(msg.into_static()))
114
{
115
+
error!(err = %e, "failed to send message to buffer processor");
116
}
117
}
118
···
139
rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
140
141
if repo_state.tracked {
142
+
trace!(did = %did, "tracked repo, processing");
143
return Ok(true);
144
} else {
145
+
debug!(did = %did, "known but explicitly untracked, skipping");
146
return Ok(false);
147
}
148
}
149
150
if !filter.signals.is_empty() {
151
+
trace!(did = %did, "unknown — passing to worker for signal check");
152
Ok(true)
153
} else {
154
+
trace!(did = %did, "unknown and no signals configured, skipping");
155
Ok(false)
156
}
157
}
+1
-1
src/ingest/stream.rs
···
83
}
84
Ok(Message::Close(_)) => return None,
85
Ok(x) => {
86
-
trace!("relay sent unexpected message: {x:?}");
87
continue;
88
}
89
}
···
83
}
84
Ok(Message::Close(_)) => return None,
85
Ok(x) => {
86
+
trace!(msg = ?x, "relay sent unexpected message");
87
continue;
88
}
89
}
+41
-41
src/ingest/worker.rs
···
132
let shard_idx = (hash as usize) % self.num_shards;
133
134
if let Err(e) = shards[shard_idx].send(msg) {
135
-
error!("failed to send message to shard {shard_idx}: {e}");
136
// break if send fails; receiver likely closed
137
break;
138
}
···
154
handle: tokio::runtime::Handle,
155
) {
156
let _guard = handle.enter();
157
-
debug!("shard {id} started");
158
159
let mut broadcast_events = Vec::new();
160
···
177
178
match msg {
179
IngestMessage::BackfillFinished(did) => {
180
-
debug!("backfill finished for {did}, verifying state and draining buffer");
181
182
// load repo state to transition status and draining buffer
183
let repo_key = keys::repo_key(&did);
···
201
);
202
if let Err(e) = res {
203
// this can only fail if serde retry fails which would be really weird
204
-
error!("failed to transition {did} to synced: {e}");
205
}
206
}
207
// we don't have to handle this since drain_resync_buffer doesn't delete
···
210
RepoProcessResult::Deleted => {}
211
},
212
Err(e) => {
213
-
error!("failed to drain resync buffer for {did}: {e}")
214
}
215
};
216
}
217
-
Err(e) => error!("failed to deser repo state for {did}: {e}"),
218
}
219
}
220
}
···
232
Ok(RepoProcessResult::Deleted) => {}
233
Ok(RepoProcessResult::Syncing(Some(commit))) => {
234
if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) {
235
-
error!("failed to persist commit to resync_buffer for {did}: {e}");
236
}
237
}
238
Ok(RepoProcessResult::Syncing(None)) => {}
···
240
if let IngestError::Generic(e) = &e {
241
db::check_poisoned_report(e);
242
}
243
-
error!("error processing message for {did}: {e}");
244
if Self::check_if_retriable_failure(&e) {
245
if let SubscribeReposMessage::Commit(commit) = &msg {
246
if let Err(e) =
247
ops::persist_to_resync_buffer(&state.db, did, commit)
248
{
249
error!(
250
-
"failed to persist commit to resync_buffer for {did}: {e}"
0
251
);
252
}
253
}
···
262
}
263
264
if let Err(e) = batch.commit() {
265
-
error!("failed to commit batch in shard {id}: {e}");
266
}
267
268
if added_blocks > 0 {
···
305
306
match msg {
307
SubscribeReposMessage::Commit(commit) => {
308
-
trace!("processing buffered commit for {did}");
309
310
return Self::process_commit(ctx, did, repo_state, commit);
311
}
312
SubscribeReposMessage::Sync(sync) => {
313
-
debug!("processing buffered sync for {did}");
314
315
match ops::verify_sync_event(
316
sync.blocks.as_ref(),
···
319
Ok((root, rev)) => {
320
if let Some(current_data) = &repo_state.data {
321
if current_data == &root.to_ipld().expect("valid cid") {
322
-
debug!("skipping noop sync for {did}");
323
return Ok(RepoProcessResult::Ok(repo_state));
324
}
325
}
326
327
if let Some(current_rev) = &repo_state.rev {
328
if rev.as_str() <= current_rev.to_tid().as_str() {
329
-
debug!("skipping replayed sync for {did}");
330
return Ok(RepoProcessResult::Ok(repo_state));
331
}
332
}
333
334
-
warn!("sync event for {did}: triggering backfill");
335
let mut batch = ctx.state.db.inner.batch();
336
repo_state = ops::update_repo_status(
337
&mut batch,
···
348
return Ok(RepoProcessResult::Ok(repo_state));
349
}
350
Err(e) => {
351
-
error!("failed to process sync event for {did}: {e}");
352
}
353
}
354
}
355
SubscribeReposMessage::Identity(identity) => {
356
-
debug!("processing buffered identity for {did}");
357
let handle = identity
358
.handle
359
.as_ref()
···
367
.push(ops::make_identity_event(&ctx.state.db, evt));
368
}
369
SubscribeReposMessage::Account(account) => {
370
-
debug!("processing buffered account for {did}");
371
let evt = AccountEvt {
372
did: did.clone().into_static(),
373
active: account.active,
···
378
use crate::ingest::stream::AccountStatus;
379
match &account.status {
380
Some(AccountStatus::Deleted) => {
381
-
debug!("account {did} deleted, wiping data");
382
crate::ops::delete_repo(ctx.batch, &ctx.state.db, did, &repo_state)?;
383
return Ok(RepoProcessResult::Deleted);
384
}
···
399
}
400
AccountStatus::Other(s) => {
401
warn!(
402
-
"unknown account status for {did}, will put in error state: {s}"
0
403
);
404
RepoStatus::Error(s.to_smolstr())
405
}
406
},
407
None => {
408
-
warn!("account {did} inactive but no status provided");
409
RepoStatus::Error("unknown".into())
410
}
411
};
412
413
if repo_state.status == target_status {
414
-
debug!("account status unchanged for {did}: {target_status:?}");
415
return Ok(RepoProcessResult::Ok(repo_state));
416
}
417
···
437
.push(ops::make_account_event(&ctx.state.db, evt));
438
}
439
_ => {
440
-
warn!("unknown message type in buffer for {did}");
441
}
442
}
443
···
453
// check for replayed events (already seen revision)
454
if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) {
455
debug!(
456
-
"skipping replayed event for {}: {} <= {}",
457
-
did,
458
-
commit.rev,
459
-
repo_state
460
-
.rev
461
-
.as_ref()
462
-
.map(|r| r.to_tid())
463
-
.expect("we checked in if")
464
);
465
return Ok(RepoProcessResult::Ok(repo_state));
466
}
···
474
.wrap_err("invalid cid from relay")?
475
{
476
warn!(
477
-
"gap detected for {}: repo {} != commit prev {}. triggering backfill",
478
-
did, repo, prev_commit.0
0
0
479
);
480
481
let mut batch = ctx.state.db.inner.batch();
···
540
.map(|(col, _)| {
541
let m = filter.matches_signal(col);
542
debug!(
543
-
"signal check for {did}: op path={} col={col} signals={:?} -> {m}",
544
-
op.path, filter.signals
545
);
546
m
547
})
548
.unwrap_or(false)
549
});
550
if !touches_signal {
551
-
debug!("dropping {did}: commit has no signal-matching ops");
552
return Ok(RepoProcessResult::Syncing(None));
553
}
554
-
debug!("{did}: commit touches a signal, queuing backfill");
555
}
556
557
-
debug!("discovered new account {did} from firehose, queueing backfill");
558
559
let repo_state = RepoState::backfilling_untracked(rand::rng().next_u64());
560
let mut batch = ctx.state.db.inner.batch();
···
583
let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static();
584
585
if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling {
586
-
debug!("ignoring active status for {did} as it is explicitly untracked");
587
return Ok(RepoProcessResult::Syncing(None));
588
}
589
···
600
}
601
RepoStatus::Backfilling | RepoStatus::Error(_) => {
602
debug!(
603
-
"ignoring active status for {did} as it is {:?}",
604
-
repo_state.status
605
);
606
Ok(RepoProcessResult::Syncing(None))
607
}
···
132
let shard_idx = (hash as usize) % self.num_shards;
133
134
if let Err(e) = shards[shard_idx].send(msg) {
135
+
error!(shard = shard_idx, err = %e, "failed to send message to shard");
136
// break if send fails; receiver likely closed
137
break;
138
}
···
154
handle: tokio::runtime::Handle,
155
) {
156
let _guard = handle.enter();
157
+
debug!(shard = id, "shard started");
158
159
let mut broadcast_events = Vec::new();
160
···
177
178
match msg {
179
IngestMessage::BackfillFinished(did) => {
180
+
debug!(did = %did, "backfill finished, verifying state and draining buffer");
181
182
// load repo state to transition status and draining buffer
183
let repo_key = keys::repo_key(&did);
···
201
);
202
if let Err(e) = res {
203
// this can only fail if serde retry fails which would be really weird
204
+
error!(did = %did, err = %e, "failed to transition to synced");
205
}
206
}
207
// we don't have to handle this since drain_resync_buffer doesn't delete
···
210
RepoProcessResult::Deleted => {}
211
},
212
Err(e) => {
213
+
error!(did = %did, err = %e, "failed to drain resync buffer")
214
}
215
};
216
}
217
+
Err(e) => error!(did = %did, err = %e, "failed to deser repo state"),
218
}
219
}
220
}
···
232
Ok(RepoProcessResult::Deleted) => {}
233
Ok(RepoProcessResult::Syncing(Some(commit))) => {
234
if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) {
235
+
error!(did = %did, err = %e, "failed to persist commit to resync_buffer");
236
}
237
}
238
Ok(RepoProcessResult::Syncing(None)) => {}
···
240
if let IngestError::Generic(e) = &e {
241
db::check_poisoned_report(e);
242
}
243
+
error!(did = %did, err = %e, "error processing message");
244
if Self::check_if_retriable_failure(&e) {
245
if let SubscribeReposMessage::Commit(commit) = &msg {
246
if let Err(e) =
247
ops::persist_to_resync_buffer(&state.db, did, commit)
248
{
249
error!(
250
+
did = %did, err = %e,
251
+
"failed to persist commit to resync_buffer"
252
);
253
}
254
}
···
263
}
264
265
if let Err(e) = batch.commit() {
266
+
error!(shard = id, err = %e, "failed to commit batch");
267
}
268
269
if added_blocks > 0 {
···
306
307
match msg {
308
SubscribeReposMessage::Commit(commit) => {
309
+
trace!(did = %did, "processing buffered commit");
310
311
return Self::process_commit(ctx, did, repo_state, commit);
312
}
313
SubscribeReposMessage::Sync(sync) => {
314
+
debug!(did = %did, "processing buffered sync");
315
316
match ops::verify_sync_event(
317
sync.blocks.as_ref(),
···
320
Ok((root, rev)) => {
321
if let Some(current_data) = &repo_state.data {
322
if current_data == &root.to_ipld().expect("valid cid") {
323
+
debug!(did = %did, "skipping noop sync");
324
return Ok(RepoProcessResult::Ok(repo_state));
325
}
326
}
327
328
if let Some(current_rev) = &repo_state.rev {
329
if rev.as_str() <= current_rev.to_tid().as_str() {
330
+
debug!(did = %did, "skipping replayed sync");
331
return Ok(RepoProcessResult::Ok(repo_state));
332
}
333
}
334
335
+
warn!(did = %did, "sync event, triggering backfill");
336
let mut batch = ctx.state.db.inner.batch();
337
repo_state = ops::update_repo_status(
338
&mut batch,
···
349
return Ok(RepoProcessResult::Ok(repo_state));
350
}
351
Err(e) => {
352
+
error!(did = %did, err = %e, "failed to process sync event");
353
}
354
}
355
}
356
SubscribeReposMessage::Identity(identity) => {
357
+
debug!(did = %did, "processing buffered identity");
358
let handle = identity
359
.handle
360
.as_ref()
···
368
.push(ops::make_identity_event(&ctx.state.db, evt));
369
}
370
SubscribeReposMessage::Account(account) => {
371
+
debug!(did = %did, "processing buffered account");
372
let evt = AccountEvt {
373
did: did.clone().into_static(),
374
active: account.active,
···
379
use crate::ingest::stream::AccountStatus;
380
match &account.status {
381
Some(AccountStatus::Deleted) => {
382
+
debug!(did = %did, "account deleted, wiping data");
383
crate::ops::delete_repo(ctx.batch, &ctx.state.db, did, &repo_state)?;
384
return Ok(RepoProcessResult::Deleted);
385
}
···
400
}
401
AccountStatus::Other(s) => {
402
warn!(
403
+
did = %did, status = %s,
404
+
"unknown account status, will put in error state"
405
);
406
RepoStatus::Error(s.to_smolstr())
407
}
408
},
409
None => {
410
+
warn!(did = %did, "account inactive but no status provided");
411
RepoStatus::Error("unknown".into())
412
}
413
};
414
415
if repo_state.status == target_status {
416
+
debug!(did = %did, ?target_status, "account status unchanged");
417
return Ok(RepoProcessResult::Ok(repo_state));
418
}
419
···
439
.push(ops::make_account_event(&ctx.state.db, evt));
440
}
441
_ => {
442
+
warn!(did = %did, "unknown message type in buffer");
443
}
444
}
445
···
455
// check for replayed events (already seen revision)
456
if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) {
457
debug!(
458
+
did = %did,
459
+
commit_rev = %commit.rev,
460
+
state_rev = %repo_state.rev.as_ref().map(|r| r.to_tid()).expect("we checked in if"),
461
+
"skipping replayed event"
0
0
0
0
462
);
463
return Ok(RepoProcessResult::Ok(repo_state));
464
}
···
472
.wrap_err("invalid cid from relay")?
473
{
474
warn!(
475
+
did = %did,
476
+
repo = %repo,
477
+
prev_commit = %prev_commit.0,
478
+
"gap detected, triggering backfill"
479
);
480
481
let mut batch = ctx.state.db.inner.batch();
···
540
.map(|(col, _)| {
541
let m = filter.matches_signal(col);
542
debug!(
543
+
did = %did, path = %op.path, col = %col, signals = ?filter.signals, matched = m,
544
+
"signal check"
545
);
546
m
547
})
548
.unwrap_or(false)
549
});
550
if !touches_signal {
551
+
debug!(did = %did, "dropping commit, no signal-matching ops");
552
return Ok(RepoProcessResult::Syncing(None));
553
}
554
+
debug!(did = %did, "commit touches a signal, queuing backfill");
555
}
556
557
+
debug!(did = %did, "discovered new account from firehose, queueing backfill");
558
559
let repo_state = RepoState::backfilling_untracked(rand::rng().next_u64());
560
let mut batch = ctx.state.db.inner.batch();
···
583
let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static();
584
585
if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling {
586
+
debug!(did = %did, "ignoring active status as it is explicitly untracked");
587
return Ok(RepoProcessResult::Syncing(None));
588
}
589
···
600
}
601
RepoStatus::Backfilling | RepoStatus::Error(_) => {
602
debug!(
603
+
did = %did, status = ?repo_state.status,
604
+
"ignoring active status"
605
);
606
Ok(RepoProcessResult::Syncing(None))
607
}
+6
-6
src/main.rs
···
101
.await
102
.into_diagnostic()?
103
{
104
-
error!("failed to queue gone backfills: {e}");
105
db::check_poisoned_report(&e);
106
}
107
···
155
// persist firehose cursor
156
let seq = state.cur_firehose.load(Ordering::SeqCst);
157
if let Err(e) = set_firehose_cursor(&state.db, seq) {
158
-
error!("failed to save cursor: {e}");
159
db::check_poisoned_report(&e);
160
}
161
162
// persist counts
163
// TODO: make this more durable
164
if let Err(e) = db::persist_counts(&state.db) {
165
-
error!("failed to persist counts: {e}");
166
db::check_poisoned_report(&e);
167
}
168
169
// persist journal
170
if let Err(e) = state.db.persist() {
171
-
error!("db persist failed: {e}");
172
db::check_poisoned_report(&e);
173
}
174
}
···
197
crawler_resume_pending,
198
);
199
if let Err(e) = crawler.run().await {
200
-
error!("crawler error: {e}");
201
db::check_poisoned_report(&e);
202
}
203
});
···
264
265
let res = futures::future::select_all(tasks);
266
if let (Err(e), _, _) = res.await {
267
-
error!("critical worker died: {e}");
268
db::check_poisoned_report(&e);
269
}
270
···
101
.await
102
.into_diagnostic()?
103
{
104
+
error!(err = %e, "failed to queue gone backfills");
105
db::check_poisoned_report(&e);
106
}
107
···
155
// persist firehose cursor
156
let seq = state.cur_firehose.load(Ordering::SeqCst);
157
if let Err(e) = set_firehose_cursor(&state.db, seq) {
158
+
error!(err = %e, "failed to save cursor");
159
db::check_poisoned_report(&e);
160
}
161
162
// persist counts
163
// TODO: make this more durable
164
if let Err(e) = db::persist_counts(&state.db) {
165
+
error!(err = %e, "failed to persist counts");
166
db::check_poisoned_report(&e);
167
}
168
169
// persist journal
170
if let Err(e) = state.db.persist() {
171
+
error!(err = %e, "db persist failed");
172
db::check_poisoned_report(&e);
173
}
174
}
···
197
crawler_resume_pending,
198
);
199
if let Err(e) = crawler.run().await {
200
+
error!(err = %e, "crawler error");
201
db::check_poisoned_report(&e);
202
}
203
});
···
264
265
let res = futures::future::select_all(tasks);
266
if let (Err(e), _, _) = res.await {
267
+
error!(err = %e, "critical worker died");
268
db::check_poisoned_report(&e);
269
}
270
+12
-11
src/ops.rs
···
1
use fjall::OwnedWriteBatch;
2
-
use jacquard_common::CowStr;
3
-
use jacquard_common::IntoStatic;
4
use jacquard_common::types::cid::Cid;
5
use jacquard_common::types::crypto::PublicKey;
6
use jacquard_common::types::did::Did;
0
0
7
use jacquard_repo::car::reader::parse_car_bytes;
8
use miette::{Context, IntoDiagnostic, Result};
9
-
use rand::{Rng, rng};
10
use std::collections::HashMap;
11
use std::sync::atomic::Ordering;
12
use std::time::Instant;
13
use tracing::{debug, trace};
14
15
use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
16
-
use crate::db::{self, Db, keys, ser_repo_state};
17
use crate::filter::FilterConfig;
18
use crate::ingest::stream::Commit;
19
use crate::types::{
···
26
let value = rmp_serde::to_vec(commit).into_diagnostic()?;
27
db.resync_buffer.insert(key, value).into_diagnostic()?;
28
debug!(
29
-
"buffered commit seq {} for {did} to resync_buffer",
30
-
commit.seq
0
31
);
32
Ok(())
33
}
···
69
did: &Did,
70
repo_state: &RepoState,
71
) -> Result<()> {
72
-
debug!("deleting repo {did}");
73
74
let repo_key = keys::repo_key(did);
75
let pending_key = keys::pending_key(repo_state.index_id);
···
122
mut repo_state: RepoState<'s>,
123
new_status: RepoStatus,
124
) -> Result<RepoState<'s>> {
125
-
debug!("updating repo status for {did} to {new_status:?}");
126
127
let repo_key = keys::repo_key(did);
128
let pending_key = keys::pending_key(repo_state.index_id);
···
226
filter: &FilterConfig,
227
) -> Result<ApplyCommitResults<'s>> {
228
let did = &commit.repo;
229
-
debug!("applying commit {} for {did}", &commit.commit);
230
231
// 1. parse CAR blocks and store them in CAS
232
let start = Instant::now();
···
236
.into_diagnostic()
237
})?;
238
239
-
trace!("parsed car for {did} in {:?}", start.elapsed());
240
241
let root_bytes = parsed
242
.blocks
···
249
repo_commit
250
.verify(key)
251
.map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
252
-
trace!("signature verified for {did}");
253
}
254
255
repo_state.rev = Some((&commit.rev).into());
···
1
use fjall::OwnedWriteBatch;
0
0
2
use jacquard_common::types::cid::Cid;
3
use jacquard_common::types::crypto::PublicKey;
4
use jacquard_common::types::did::Did;
5
+
use jacquard_common::CowStr;
6
+
use jacquard_common::IntoStatic;
7
use jacquard_repo::car::reader::parse_car_bytes;
8
use miette::{Context, IntoDiagnostic, Result};
9
+
use rand::{rng, Rng};
10
use std::collections::HashMap;
11
use std::sync::atomic::Ordering;
12
use std::time::Instant;
13
use tracing::{debug, trace};
14
15
use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
16
+
use crate::db::{self, keys, ser_repo_state, Db};
17
use crate::filter::FilterConfig;
18
use crate::ingest::stream::Commit;
19
use crate::types::{
···
26
let value = rmp_serde::to_vec(commit).into_diagnostic()?;
27
db.resync_buffer.insert(key, value).into_diagnostic()?;
28
debug!(
29
+
did = %did,
30
+
seq = commit.seq,
31
+
"buffered commit to resync_buffer"
32
);
33
Ok(())
34
}
···
70
did: &Did,
71
repo_state: &RepoState,
72
) -> Result<()> {
73
+
debug!(did = %did, "deleting repo");
74
75
let repo_key = keys::repo_key(did);
76
let pending_key = keys::pending_key(repo_state.index_id);
···
123
mut repo_state: RepoState<'s>,
124
new_status: RepoStatus,
125
) -> Result<RepoState<'s>> {
126
+
debug!(did = %did, status = ?new_status, "updating repo status");
127
128
let repo_key = keys::repo_key(did);
129
let pending_key = keys::pending_key(repo_state.index_id);
···
227
filter: &FilterConfig,
228
) -> Result<ApplyCommitResults<'s>> {
229
let did = &commit.repo;
230
+
debug!(did = %did, commit = %commit.commit, "applying commit");
231
232
// 1. parse CAR blocks and store them in CAS
233
let start = Instant::now();
···
237
.into_diagnostic()
238
})?;
239
240
+
trace!(did = %did, elapsed = ?start.elapsed(), "parsed car");
241
242
let root_bytes = parsed
243
.blocks
···
250
repo_commit
251
.verify(key)
252
.map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
253
+
trace!(did = %did, "signature verified");
254
}
255
256
repo_state.rev = Some((&commit.rev).into());