tangled
alpha
login
or
join now
ptr.pet
/
hydrant
18
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
18
fork
atom
overview
issues
5
pulls
pipelines
[ingest] make firehose connection and processing resilient
ptr.pet
1 month ago
cd268330
98c34176
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+148
-113
1 changed file
expand all
collapse all
unified
split
src
ingest
mod.rs
+148
-113
src/ingest/mod.rs
···
75
pub async fn run(mut self) -> Result<()> {
76
let base_url = Url::parse(&self.relay_host).into_diagnostic()?;
77
78
-
// 1. load cursor
79
-
let cursor_key = b"firehose_cursor";
80
-
let start_cursor = if let Ok(Some(bytes)) =
81
-
Db::get(self.state.db.cursors.clone(), cursor_key.to_vec()).await
82
-
{
83
-
let s = String::from_utf8_lossy(&bytes);
84
-
debug!("resuming from cursor: {}", s);
85
-
Some(s.parse::<i64>().unwrap_or(0))
86
-
} else {
87
-
info!("no cursor found, live tailing");
88
-
None
89
-
};
0
0
0
0
0
0
90
91
-
if let Some(c) = start_cursor {
92
-
self.state.cur_firehose.store(c, Ordering::SeqCst);
93
-
}
94
95
-
// 2. connect
96
-
let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
97
-
let params = if let Some(c) = start_cursor {
98
-
SubscribeRepos::new().cursor(c).build()
99
-
} else {
100
-
SubscribeRepos::new().build()
101
-
};
0
0
0
0
0
0
0
0
0
102
103
-
let stream = client.subscribe(¶ms).await.into_diagnostic()?;
104
-
let (_sink, mut messages) = stream.into_stream();
105
106
-
info!("firehose connected");
107
108
-
// 3. process loop
109
-
while let Some(msg_res) = messages.next().await {
110
-
match msg_res {
111
-
Ok(msg) => {
112
-
self.handle_message(msg).await?;
113
-
}
114
-
Err(e) => {
115
-
error!("firehose stream error: {}", e);
116
-
break;
0
0
0
117
}
118
}
119
-
}
120
121
-
Ok(())
0
0
122
}
123
124
async fn handle_message(&mut self, msg: SubscribeReposMessage<'_>) -> Result<()> {
125
-
let db = self.state.db.clone();
126
match msg {
127
SubscribeReposMessage::Commit(commit) => {
128
self.state.cur_firehose.store(commit.seq, Ordering::SeqCst);
129
130
-
let did = &commit.repo;
0
0
0
0
0
0
0
0
0
131
132
-
let mut should_process = self.full_network;
133
-
let did_key = keys::repo_key(&did);
0
0
0
0
134
135
-
if !should_process {
136
-
if Db::contains_key(db.repos.clone(), did_key).await? {
137
-
should_process = true;
138
-
}
139
-
}
140
141
-
if !should_process {
142
-
return Ok(());
143
-
}
0
0
144
145
-
// check repo state
146
-
let state_bytes = Db::get(db.repos.clone(), did_key).await?;
0
0
0
0
147
148
-
let repo_state = if let Some(bytes) = state_bytes {
149
-
rmp_serde::from_slice::<RepoState>(&bytes).ok()
150
-
} else {
151
-
None
152
-
};
153
154
-
let status = repo_state
155
-
.as_ref()
156
-
.map(|s| s.status.clone())
157
-
.unwrap_or(RepoStatus::New);
158
159
-
match status {
160
-
RepoStatus::New => {
161
-
info!("new repo detected: {}", did);
162
-
// 1. save state as backfilling
163
-
let mut new_state = RepoState::new(commit.repo.clone().into_static());
164
-
new_state.status = RepoStatus::Backfilling;
165
-
let bytes = rmp_serde::to_vec(&new_state).into_diagnostic()?;
166
167
-
let mut batch = db.inner.batch();
168
-
batch.insert(&db.repos, did_key, bytes);
169
-
batch.insert(&db.pending, did_key, Vec::new());
170
171
-
tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
172
-
.await
173
-
.into_diagnostic()??;
174
175
-
// 2. queue for backfill
176
-
if let Err(e) = self.state.backfill_tx.send(did.clone().into_static()) {
177
-
error!("failed to queue backfill for {}: {}", did, e);
178
-
}
179
180
-
// 3. buffer this event
181
-
self.buffer_event(&commit).await?;
182
-
}
183
-
RepoStatus::Backfilling => {
184
-
debug!("buffering event for backfilling repo: {}", did);
185
-
self.buffer_event(&commit).await?;
0
0
0
0
0
0
0
0
0
0
186
}
187
-
RepoStatus::Synced => {
188
-
// check revision
189
-
if let Some(state) = repo_state {
190
-
if !state.rev.is_empty() && commit.rev.as_str() <= state.rev.as_str() {
191
-
debug!(
192
-
"skipping replayed event for {}: {} <= {}",
193
-
did,
194
-
commit.rev,
195
-
state.rev
196
-
);
197
-
return Ok(());
198
-
}
199
-
}
200
201
-
// apply immediately
202
-
let db = db.clone();
203
-
let commit = commit.clone().into_static();
204
-
let did = did.clone().into_static();
0
205
206
-
tokio::task::spawn_blocking(move || {
207
-
if let Err(e) = ops::apply_commit(&db, &commit, true) {
208
-
error!("failed to apply live commit for {}: {}", did, e);
209
-
} else {
210
-
debug!("synced event for {}, {} ops", did, commit.ops.len());
211
-
}
212
-
})
213
-
.await
214
-
.into_diagnostic()?;
215
-
}
216
-
RepoStatus::Error(_) => {
217
-
// maybe retry? for now ignore.
218
-
}
219
}
220
}
221
-
_ => {} // ignore identity/account/etc for now
0
0
222
}
223
Ok(())
224
}
···
75
pub async fn run(mut self) -> Result<()> {
76
let base_url = Url::parse(&self.relay_host).into_diagnostic()?;
77
78
+
loop {
79
+
// 1. load cursor
80
+
let current_cursor = self.state.cur_firehose.load(Ordering::SeqCst);
81
+
let start_cursor = if current_cursor > 0 {
82
+
Some(current_cursor)
83
+
} else {
84
+
let cursor_key = b"firehose_cursor";
85
+
if let Ok(Some(bytes)) =
86
+
Db::get(self.state.db.cursors.clone(), cursor_key.to_vec()).await
87
+
{
88
+
let s = String::from_utf8_lossy(&bytes);
89
+
debug!("resuming from cursor: {}", s);
90
+
s.parse::<i64>().ok()
91
+
} else {
92
+
info!("no cursor found, live tailing");
93
+
None
94
+
}
95
+
};
96
97
+
if let Some(c) = start_cursor {
98
+
self.state.cur_firehose.store(c, Ordering::SeqCst);
99
+
}
100
101
+
// 2. connect
102
+
let client = TungsteniteSubscriptionClient::from_base_uri(base_url.clone());
103
+
let params = if let Some(c) = start_cursor {
104
+
SubscribeRepos::new().cursor(c).build()
105
+
} else {
106
+
SubscribeRepos::new().build()
107
+
};
108
+
109
+
let stream = match client.subscribe(¶ms).await {
110
+
Ok(s) => s,
111
+
Err(e) => {
112
+
error!("failed to connect to firehose: {e}, retrying in 5s...");
113
+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
114
+
continue;
115
+
}
116
+
};
117
118
+
let (_sink, mut messages) = stream.into_stream();
0
119
120
+
info!("firehose connected");
121
122
+
// 3. process loop
123
+
while let Some(msg_res) = messages.next().await {
124
+
match msg_res {
125
+
Ok(msg) => {
126
+
if let Err(e) = self.handle_message(msg).await {
127
+
error!("failed to handle firehose message: {e}");
128
+
}
129
+
}
130
+
Err(e) => {
131
+
error!("firehose stream error: {e}");
132
+
break;
133
+
}
134
}
135
}
0
136
137
+
error!("firehose disconnected, reconnecting in 5s...");
138
+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
139
+
}
140
}
141
142
async fn handle_message(&mut self, msg: SubscribeReposMessage<'_>) -> Result<()> {
0
143
match msg {
144
SubscribeReposMessage::Commit(commit) => {
145
self.state.cur_firehose.store(commit.seq, Ordering::SeqCst);
146
147
+
if let Err(e) = self.process_commit(&commit).await {
148
+
error!("failed to process commit {}: {e}", commit.seq);
149
+
// buffer for later inspection/retry
150
+
let _ = self.buffer_event(&commit).await;
151
+
}
152
+
}
153
+
_ => {} // ignore identity/account/etc for now
154
+
}
155
+
Ok(())
156
+
}
157
158
+
async fn process_commit(
159
+
&mut self,
160
+
commit: &jacquard::api::com_atproto::sync::subscribe_repos::Commit<'_>,
161
+
) -> Result<()> {
162
+
let db = self.state.db.clone();
163
+
let did = &commit.repo;
164
165
+
let mut should_process = self.full_network;
166
+
let did_key = keys::repo_key(&did);
0
0
0
167
168
+
if !should_process {
169
+
if Db::contains_key(db.repos.clone(), did_key).await? {
170
+
should_process = true;
171
+
}
172
+
}
173
174
+
if !should_process {
175
+
return Ok(());
176
+
}
177
+
178
+
// check repo state
179
+
let state_bytes = Db::get(db.repos.clone(), did_key).await?;
180
181
+
let repo_state = if let Some(bytes) = state_bytes {
182
+
rmp_serde::from_slice::<RepoState>(&bytes).ok()
183
+
} else {
184
+
None
185
+
};
186
187
+
let status = repo_state
188
+
.as_ref()
189
+
.map(|s| s.status.clone())
190
+
.unwrap_or(RepoStatus::New);
191
192
+
match status {
193
+
RepoStatus::New => {
194
+
info!("new repo detected: {}", did);
195
+
// 1. save state as backfilling
196
+
let mut new_state = RepoState::new(commit.repo.clone().into_static());
197
+
new_state.status = RepoStatus::Backfilling;
198
+
let bytes = rmp_serde::to_vec(&new_state).into_diagnostic()?;
199
200
+
let mut batch = db.inner.batch();
201
+
batch.insert(&db.repos, did_key, bytes);
202
+
batch.insert(&db.pending, did_key, Vec::new());
203
204
+
tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
205
+
.await
206
+
.into_diagnostic()??;
207
208
+
// 2. queue for backfill
209
+
if let Err(e) = self.state.backfill_tx.send(did.clone().into_static()) {
210
+
error!("failed to queue backfill for {}: {}", did, e);
211
+
}
212
213
+
// 3. buffer this event
214
+
self.buffer_event(commit).await?;
215
+
}
216
+
RepoStatus::Backfilling => {
217
+
debug!("buffering event for backfilling repo: {}", did);
218
+
self.buffer_event(commit).await?;
219
+
}
220
+
RepoStatus::Synced => {
221
+
// check revision
222
+
if let Some(state) = repo_state {
223
+
if !state.rev.is_empty() && commit.rev.as_str() <= state.rev.as_str() {
224
+
debug!(
225
+
"skipping replayed event for {}: {} <= {}",
226
+
did, commit.rev, state.rev
227
+
);
228
+
return Ok(());
229
}
230
+
}
231
+
232
+
// apply immediately
233
+
let db = db.clone();
234
+
let commit_static = commit.clone().into_static();
235
+
let did_static = did.clone().into_static();
0
0
0
0
0
0
0
236
237
+
let res = tokio::task::spawn_blocking(move || {
238
+
ops::apply_commit(&db, &commit_static, true)
239
+
})
240
+
.await
241
+
.into_diagnostic()?;
242
243
+
if let Err(e) = res {
244
+
error!("failed to apply live commit for {}: {}", did_static, e);
245
+
self.buffer_event(commit).await?;
246
+
} else {
247
+
debug!(
248
+
"synced event for {}, {} ops",
249
+
did_static,
250
+
commit.ops.len()
251
+
);
0
0
0
0
252
}
253
}
254
+
RepoStatus::Error(_) => {
255
+
// maybe retry? for now ignore.
256
+
}
257
}
258
Ok(())
259
}