tangled
alpha
login
or
join now
ptr.pet
/
hydrant
22
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
22
fork
atom
overview
issues
6
pulls
pipelines
[api] cancel stream handler thread properly
ptr.pet
2 weeks ago
784213e0
bbadc992
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+25
-3
1 changed file
expand all
collapse all
unified
split
src
api
stream.rs
+25
-3
src/api/stream.rs
···
31
32
async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) {
33
let (tx, mut rx) = mpsc::channel(500);
0
34
35
-
std::thread::Builder::new()
0
0
36
.name(format!(
37
"stream-handler-{}",
38
std::time::SystemTime::UNIX_EPOCH
···
41
.as_secs()
42
))
43
.spawn(move || {
0
44
let db = &state.db;
45
let mut event_rx = db.event_tx.subscribe();
46
let ks = db.events.clone();
···
51
max_id.saturating_sub(1)
52
}
53
};
0
54
55
loop {
56
// 1. catch up from DB
···
146
}
147
148
// 2. wait for live events
149
-
match event_rx.blocking_recv() {
0
0
0
0
0
0
0
0
0
0
0
150
Ok(BroadcastEvent::Persisted(_)) => {
151
// just wake up and run catch-up loop again
152
}
···
176
.expect("failed to spawn stream handler thread");
177
178
while let Some(msg) = rx.recv().await {
179
-
if socket.send(msg).await.is_err() {
0
180
break;
181
}
0
0
0
0
0
182
}
183
}
···
31
32
async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) {
33
let (tx, mut rx) = mpsc::channel(500);
34
+
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
35
36
+
let runtime = tokio::runtime::Handle::current();
37
+
38
+
let thread = std::thread::Builder::new()
39
.name(format!(
40
"stream-handler-{}",
41
std::time::SystemTime::UNIX_EPOCH
···
44
.as_secs()
45
))
46
.spawn(move || {
47
+
let mut cancel_rx = cancel_rx;
48
let db = &state.db;
49
let mut event_rx = db.event_tx.subscribe();
50
let ks = db.events.clone();
···
55
max_id.saturating_sub(1)
56
}
57
};
58
+
let _runtime_guard = runtime.enter();
59
60
loop {
61
// 1. catch up from DB
···
151
}
152
153
// 2. wait for live events
154
+
let next_event = runtime.block_on(async {
155
+
tokio::select! {
156
+
res = event_rx.recv() => Some(res),
157
+
_ = &mut cancel_rx => None,
158
+
}
159
+
});
160
+
161
+
let Some(next_event) = next_event else {
162
+
break;
163
+
};
164
+
165
+
match next_event {
166
Ok(BroadcastEvent::Persisted(_)) => {
167
// just wake up and run catch-up loop again
168
}
···
192
.expect("failed to spawn stream handler thread");
193
194
while let Some(msg) = rx.recv().await {
195
+
if let Err(e) = socket.send(msg).await {
196
+
error!("failed to send ws message: {e}");
197
break;
198
}
199
+
}
200
+
201
+
let _ = cancel_tx.send(());
202
+
if let Err(e) = thread.join() {
203
+
error!("stream handler thread panicked: {e:?}");
204
}
205
}