tangled
alpha
login
or
join now
ptr.pet
/
hydrant
28
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
28
fork
atom
overview
issues
6
pulls
pipelines
[backfill] flush buffers after backfilling a repo successfully
ptr.pet
1 month ago
8db23453
f9abfdbb
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+31
-34
3 changed files
expand all
collapse all
unified
split
src
backfill
manager.rs
mod.rs
ingest
mod.rs
+7
-11
src/backfill/manager.rs
···
6
use miette::{IntoDiagnostic, Result};
7
use std::sync::Arc;
8
use std::time::Duration;
9
-
use tracing::{debug, error, info, warn};
10
11
pub async fn queue_pending_backfills(state: &AppState) -> Result<()> {
12
info!("scanning for pending backfills...");
···
33
34
debug!("queuing did {did}");
35
if let Err(e) = state.backfill_tx.send(did) {
36
-
warn!("failed to queue pending backfill for did:{did_str}: {e}");
37
} else {
38
count += 1;
39
}
···
64
})
65
.await
66
.into_diagnostic()
67
-
.unwrap_or_else(|e| {
68
-
warn!("failed to scan errors: {e}");
69
-
Db::check_poisoned_report(&e);
70
-
Ok(Vec::new())
71
-
})
72
.unwrap_or_else(|e| {
73
-
warn!("failed to scan errors: {e}");
74
Db::check_poisoned_report(&e);
75
Vec::new()
76
});
···
83
};
84
if let Ok(err_state) = rmp_serde::from_slice::<ErrorState>(&value) {
85
if err_state.next_retry <= now {
86
-
debug!("retrying backfill for {did}");
87
88
// move back to pending
89
if let Err(e) = Db::insert(db.pending.clone(), key, Vec::new()).await {
90
-
warn!("failed to move {did} to pending: {e}");
91
Db::check_poisoned_report(&e);
92
continue;
93
}
94
95
// queue
96
if let Err(e) = state.backfill_tx.send(did.to_owned()) {
97
-
warn!("failed to queue retry for {did}: {e}");
98
} else {
99
count += 1;
100
}
···
6
use miette::{IntoDiagnostic, Result};
7
use std::sync::Arc;
8
use std::time::Duration;
9
+
use tracing::{debug, error, info};
10
11
pub async fn queue_pending_backfills(state: &AppState) -> Result<()> {
12
info!("scanning for pending backfills...");
···
33
34
debug!("queuing did {did}");
35
if let Err(e) = state.backfill_tx.send(did) {
36
+
error!("failed to queue pending backfill for did:{did_str}: {e}");
37
} else {
38
count += 1;
39
}
···
64
})
65
.await
66
.into_diagnostic()
67
+
.flatten()
0
0
0
0
68
.unwrap_or_else(|e| {
69
+
error!("failed to scan errors: {e}");
70
Db::check_poisoned_report(&e);
71
Vec::new()
72
});
···
79
};
80
if let Ok(err_state) = rmp_serde::from_slice::<ErrorState>(&value) {
81
if err_state.next_retry <= now {
82
+
info!("retrying backfill for {did}");
83
84
// move back to pending
85
if let Err(e) = Db::insert(db.pending.clone(), key, Vec::new()).await {
86
+
error!("failed to move {did} to pending: {e}");
87
Db::check_poisoned_report(&e);
88
continue;
89
}
90
91
// queue
92
if let Err(e) = state.backfill_tx.send(did.to_owned()) {
93
+
error!("failed to queue retry for {did}: {e}");
94
} else {
95
count += 1;
96
}
+21
-15
src/backfill/mod.rs
···
106
.into_diagnostic()??;
107
108
tokio::spawn({
109
-
let state = state.clone();
110
-
async move {
111
-
if is_pending {
112
-
let _ = state
113
-
.db
114
-
.increment_count(keys::count_keyspace_key("pending"), -1)
115
-
.await;
116
-
}
117
-
if is_error {
118
-
let _ = state
119
-
.db
120
-
.increment_count(keys::count_keyspace_key("errors"), -1)
121
-
.await;
122
-
}
123
-
}
124
});
0
0
0
0
0
0
0
0
0
0
125
126
Ok(())
127
}
···
106
.into_diagnostic()??;
107
108
tokio::spawn({
109
+
let pending_fut = is_pending.then(|| {
110
+
state
111
+
.db
112
+
.increment_count(keys::count_keyspace_key("pending"), -1)
113
+
});
114
+
let error_fut = is_error.then(|| {
115
+
state
116
+
.db
117
+
.increment_count(keys::count_keyspace_key("errors"), -1)
118
+
});
119
+
futures::future::join_all(pending_fut.into_iter().chain(error_fut))
0
0
0
0
120
});
121
+
122
+
tokio::task::spawn_blocking(move || {
123
+
state
124
+
.db
125
+
.inner
126
+
.persist(fjall::PersistMode::Buffer)
127
+
.into_diagnostic()
128
+
})
129
+
.await
130
+
.into_diagnostic()??;
131
132
Ok(())
133
}
+3
-8
src/ingest/mod.rs
···
11
use std::sync::atomic::Ordering;
12
use std::sync::Arc;
13
use tokio::sync::mpsc;
14
-
use tracing::{debug, error, info};
15
use url::Url;
16
17
pub struct Ingestor {
···
125
// 3. process loop
126
while let Some(msg_res) = messages.next().await {
127
match msg_res {
128
-
Ok(msg) => {
129
-
if let Err(e) = self.handle_message(msg).await {
130
-
error!("failed to handle firehose message: {e}");
131
-
}
132
-
}
133
Err(e) => {
134
error!("firehose stream error: {e}");
135
break;
···
142
}
143
}
144
145
-
async fn handle_message(&mut self, msg: SubscribeReposMessage<'_>) -> Result<()> {
146
match msg {
147
SubscribeReposMessage::Commit(commit) => {
148
self.state.cur_firehose.store(commit.seq, Ordering::SeqCst);
···
156
}
157
_ => {} // ignore identity/account/etc for now
158
}
159
-
Ok(())
160
}
161
162
async fn process_commit(
···
11
use std::sync::atomic::Ordering;
12
use std::sync::Arc;
13
use tokio::sync::mpsc;
14
+
use tracing::{debug, error, info, warn};
15
use url::Url;
16
17
pub struct Ingestor {
···
125
// 3. process loop
126
while let Some(msg_res) = messages.next().await {
127
match msg_res {
128
+
Ok(msg) => self.handle_message(msg).await,
0
0
0
0
129
Err(e) => {
130
error!("firehose stream error: {e}");
131
break;
···
138
}
139
}
140
141
+
async fn handle_message(&mut self, msg: SubscribeReposMessage<'_>) {
142
match msg {
143
SubscribeReposMessage::Commit(commit) => {
144
self.state.cur_firehose.store(commit.seq, Ordering::SeqCst);
···
152
}
153
_ => {} // ignore identity/account/etc for now
154
}
0
155
}
156
157
async fn process_commit(