tangled
alpha
login
or
join now
ptr.pet
/
hydrant
24
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
24
fork
atom
overview
issues
6
pulls
pipelines
[api] unnest stream body
ptr.pet
5 days ago
98b08acb
6201caf0
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+170
-158
1 changed file
expand all
collapse all
unified
split
src
api
stream.rs
+170
-158
src/api/stream.rs
···
12
12
use miette::{Context, IntoDiagnostic};
13
13
use serde::Deserialize;
14
14
use std::sync::Arc;
15
15
-
use tokio::sync::{broadcast, mpsc};
15
15
+
use tokio::sync::{broadcast, mpsc, oneshot};
16
16
use tracing::{error, info_span, warn};
17
17
18
18
#[derive(Deserialize)]
···
30
30
31
31
async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) {
32
32
let (tx, mut rx) = mpsc::channel(500);
33
33
-
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
33
33
+
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
34
34
35
35
let runtime = tokio::runtime::Handle::current();
36
36
let id = std::time::SystemTime::UNIX_EPOCH
···
39
39
.as_secs();
40
40
41
41
let thread = std::thread::Builder::new()
42
42
-
.name(format!(
43
43
-
"stream-handler-{id}",
44
44
-
45
45
-
))
42
42
+
.name(format!("stream-handler-{id}"))
46
43
.spawn(move || {
47
47
-
let mut cancel_rx = cancel_rx;
48
48
-
let db = &state.db;
49
49
-
let mut event_rx = db.event_tx.subscribe();
50
50
-
let ks = db.events.clone();
51
51
-
let mut current_id = match query.cursor {
52
52
-
Some(cursor) => cursor.saturating_sub(1),
53
53
-
None => {
54
54
-
let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst);
55
55
-
max_id.saturating_sub(1)
56
56
-
}
57
57
-
};
58
44
let _runtime_guard = runtime.enter();
59
59
-
let span = info_span!("stream", id);
60
60
-
let _entered_span = span.enter();
61
61
-
62
62
-
loop {
63
63
-
// 1. catch up from DB
64
64
-
loop {
65
65
-
let mut found = false;
66
66
-
for item in ks.range(keys::event_key(current_id + 1)..) {
67
67
-
let (k, v) = match item.into_inner() {
68
68
-
Ok((k, v)) => (k, v),
69
69
-
Err(e) => {
70
70
-
error!(err = %e, "failed to read event from db");
71
71
-
break;
72
72
-
}
73
73
-
};
74
74
-
let id = match k
75
75
-
.as_ref()
76
76
-
.try_into()
77
77
-
.into_diagnostic()
78
78
-
.wrap_err("expected event id to be 8 bytes")
79
79
-
.map(u64::from_be_bytes)
80
80
-
{
81
81
-
Ok(id) => id,
82
82
-
Err(e) => {
83
83
-
error!(err = %e, "failed to parse event id");
84
84
-
continue;
85
85
-
}
86
86
-
};
87
87
-
current_id = id;
45
45
+
stream(state, cancel_rx, tx, query, id);
46
46
+
})
47
47
+
.expect("failed to spawn stream handler thread");
88
48
89
89
-
let StoredEvent {
90
90
-
live,
91
91
-
did,
92
92
-
rev,
93
93
-
collection,
94
94
-
rkey,
95
95
-
action,
96
96
-
cid,
97
97
-
} = match rmp_serde::from_slice(&v) {
98
98
-
Ok(e) => e,
99
99
-
Err(e) => {
100
100
-
error!(err = %e, "failed to deserialize stored event");
101
101
-
continue;
102
102
-
}
103
103
-
};
104
104
-
105
105
-
let _entered = info_span!("record", cid = ?cid.map(|c| c.to_string())).entered();
106
106
-
107
107
-
let marshallable = {
108
108
-
let record_val;
109
109
-
let block_bytes = cid
110
110
-
.map(|cid| db.blocks.get(&cid.to_bytes()))
111
111
-
.transpose()
112
112
-
.map(Option::flatten);
113
113
-
match block_bytes {
114
114
-
Ok(Some(block_bytes)) => match serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block_bytes) {
115
115
-
Ok(val) => record_val = val,
116
116
-
Err(e) => {
117
117
-
error!(err = %e, "cant parse block, must be corrupted?");
118
118
-
return;
119
119
-
}
120
120
-
}
121
121
-
Ok(None) => {
122
122
-
warn!("block not found, possibly repo deleted but events not evicted yet?");
123
123
-
continue;
124
124
-
}
125
125
-
Err(e) => {
126
126
-
error!(err = %e, "can't get block");
127
127
-
crate::db::check_poisoned(&e);
128
128
-
return;
129
129
-
}
130
130
-
}
49
49
+
while let Some(msg) = rx.recv().await {
50
50
+
if let Err(e) = socket.send(msg).await {
51
51
+
error!(err = %e, "failed to send ws message");
52
52
+
break;
53
53
+
}
54
54
+
}
131
55
132
132
-
MarshallableEvt {
133
133
-
id,
134
134
-
event_type: "record".into(),
135
135
-
record: Some(RecordEvt {
136
136
-
live,
137
137
-
did: did.to_did(),
138
138
-
rev: CowStr::Owned(rev.to_tid().into()),
139
139
-
collection,
140
140
-
rkey: CowStr::Owned(rkey.to_smolstr().into()),
141
141
-
action: CowStr::Borrowed(action.as_str()),
142
142
-
record: Some(record_val),
143
143
-
cid: cid
144
144
-
.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()),
145
145
-
}),
146
146
-
identity: None,
147
147
-
account: None,
148
148
-
}
149
149
-
};
56
56
+
let _ = cancel_tx.send(());
57
57
+
if let Err(e) = thread.join() {
58
58
+
error!(err = ?e, "stream handler thread panicked");
59
59
+
}
60
60
+
}
150
61
151
151
-
let json_str = match serde_json::to_string(&marshallable) {
152
152
-
Ok(s) => s,
153
153
-
Err(e) => {
154
154
-
error!(err = %e, "failed to serialize ws event");
155
155
-
continue;
156
156
-
}
157
157
-
};
62
62
+
fn stream(
63
63
+
state: Arc<AppState>,
64
64
+
mut cancel: oneshot::Receiver<()>,
65
65
+
tx: mpsc::Sender<Message>,
66
66
+
query: StreamQuery,
67
67
+
id: u64,
68
68
+
) {
69
69
+
let db = &state.db;
70
70
+
let mut event_rx = db.event_tx.subscribe();
71
71
+
let ks = db.events.clone();
72
72
+
let mut current_id = match query.cursor {
73
73
+
Some(cursor) => cursor.saturating_sub(1),
74
74
+
None => {
75
75
+
let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst);
76
76
+
max_id.saturating_sub(1)
77
77
+
}
78
78
+
};
79
79
+
let runtime = tokio::runtime::Handle::current();
158
80
159
159
-
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
160
160
-
error!(err = %e, "failed to send ws message");
161
161
-
return;
162
162
-
}
81
81
+
let span = info_span!("stream", id);
82
82
+
let _entered_span = span.enter();
163
83
164
164
-
found = true;
165
165
-
}
166
166
-
if !found {
84
84
+
loop {
85
85
+
// 1. catch up from DB
86
86
+
loop {
87
87
+
let mut found = false;
88
88
+
for item in ks.range(keys::event_key(current_id + 1)..) {
89
89
+
let (k, v) = match item.into_inner() {
90
90
+
Ok((k, v)) => (k, v),
91
91
+
Err(e) => {
92
92
+
error!(err = %e, "failed to read event from db");
167
93
break;
168
94
}
169
169
-
}
170
170
-
171
171
-
// 2. wait for live events
172
172
-
let next_event = runtime.block_on(async {
173
173
-
tokio::select! {
174
174
-
res = event_rx.recv() => Some(res),
175
175
-
_ = &mut cancel_rx => None,
95
95
+
};
96
96
+
let id = match k
97
97
+
.as_ref()
98
98
+
.try_into()
99
99
+
.into_diagnostic()
100
100
+
.wrap_err("expected event id to be 8 bytes")
101
101
+
.map(u64::from_be_bytes)
102
102
+
{
103
103
+
Ok(id) => id,
104
104
+
Err(e) => {
105
105
+
error!(err = %e, "failed to parse event id");
106
106
+
continue;
176
107
}
177
177
-
});
178
178
-
179
179
-
let Some(next_event) = next_event else {
180
180
-
break;
181
108
};
109
109
+
current_id = id;
182
110
183
183
-
match next_event {
184
184
-
Ok(BroadcastEvent::Persisted(_)) => {
185
185
-
// just wake up and run catch-up loop again
111
111
+
let StoredEvent {
112
112
+
live,
113
113
+
did,
114
114
+
rev,
115
115
+
collection,
116
116
+
rkey,
117
117
+
action,
118
118
+
cid,
119
119
+
} = match rmp_serde::from_slice(&v) {
120
120
+
Ok(e) => e,
121
121
+
Err(e) => {
122
122
+
error!(err = %e, "failed to deserialize stored event");
123
123
+
continue;
186
124
}
187
187
-
Ok(BroadcastEvent::Ephemeral(evt)) => {
188
188
-
// send ephemeral event directly
189
189
-
let json_str = match serde_json::to_string(&evt) {
190
190
-
Ok(s) => s,
191
191
-
Err(e) => {
192
192
-
error!(err = %e, "failed to serialize ws event");
193
193
-
continue;
125
125
+
};
126
126
+
127
127
+
let _entered = info_span!("record", cid = ?cid.map(|c| c.to_string())).entered();
128
128
+
129
129
+
let marshallable = {
130
130
+
let record_val;
131
131
+
let block_bytes = cid
132
132
+
.map(|cid| db.blocks.get(&cid.to_bytes()))
133
133
+
.transpose()
134
134
+
.map(Option::flatten);
135
135
+
match block_bytes {
136
136
+
Ok(Some(block_bytes)) => {
137
137
+
match serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block_bytes)
138
138
+
{
139
139
+
Ok(val) => record_val = val,
140
140
+
Err(e) => {
141
141
+
error!(err = %e, "cant parse block, must be corrupted?");
142
142
+
return;
143
143
+
}
194
144
}
195
195
-
};
196
196
-
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
197
197
-
error!(err = %e, "failed to send ws message");
145
145
+
}
146
146
+
Ok(None) => {
147
147
+
warn!(
148
148
+
"block not found, possibly repo deleted but events not evicted yet?"
149
149
+
);
150
150
+
continue;
151
151
+
}
152
152
+
Err(e) => {
153
153
+
error!(err = %e, "can't get block");
154
154
+
crate::db::check_poisoned(&e);
198
155
return;
199
156
}
200
157
}
201
201
-
Err(broadcast::error::RecvError::Lagged(_)) => {
202
202
-
// continue to catch up
158
158
+
159
159
+
MarshallableEvt {
160
160
+
id,
161
161
+
event_type: "record".into(),
162
162
+
record: Some(RecordEvt {
163
163
+
live,
164
164
+
did: did.to_did(),
165
165
+
rev: CowStr::Owned(rev.to_tid().into()),
166
166
+
collection,
167
167
+
rkey: CowStr::Owned(rkey.to_smolstr().into()),
168
168
+
action: CowStr::Borrowed(action.as_str()),
169
169
+
record: Some(record_val),
170
170
+
cid: cid.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()),
171
171
+
}),
172
172
+
identity: None,
173
173
+
account: None,
203
174
}
204
204
-
Err(broadcast::error::RecvError::Closed) => {
205
205
-
break;
175
175
+
};
176
176
+
177
177
+
let json_str = match serde_json::to_string(&marshallable) {
178
178
+
Ok(s) => s,
179
179
+
Err(e) => {
180
180
+
error!(err = %e, "failed to serialize ws event");
181
181
+
continue;
206
182
}
183
183
+
};
184
184
+
185
185
+
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
186
186
+
error!(err = %e, "failed to send ws message");
187
187
+
return;
207
188
}
189
189
+
190
190
+
found = true;
208
191
}
209
209
-
})
210
210
-
.expect("failed to spawn stream handler thread");
192
192
+
if !found {
193
193
+
break;
194
194
+
}
195
195
+
}
211
196
212
212
-
while let Some(msg) = rx.recv().await {
213
213
-
if let Err(e) = socket.send(msg).await {
214
214
-
error!(err = %e, "failed to send ws message");
197
197
+
// 2. wait for live events
198
198
+
let next_event = runtime.block_on(async {
199
199
+
tokio::select! {
200
200
+
res = event_rx.recv() => Some(res),
201
201
+
_ = &mut cancel => None,
202
202
+
}
203
203
+
});
204
204
+
205
205
+
let Some(next_event) = next_event else {
215
206
break;
207
207
+
};
208
208
+
209
209
+
match next_event {
210
210
+
Ok(BroadcastEvent::Persisted(_)) => {
211
211
+
// just wake up and run catch-up loop again
212
212
+
}
213
213
+
Ok(BroadcastEvent::Ephemeral(evt)) => {
214
214
+
// send ephemeral event directly
215
215
+
let json_str = match serde_json::to_string(&evt) {
216
216
+
Ok(s) => s,
217
217
+
Err(e) => {
218
218
+
error!(err = %e, "failed to serialize ws event");
219
219
+
continue;
220
220
+
}
221
221
+
};
222
222
+
if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
223
223
+
error!(err = %e, "failed to send ws message");
224
224
+
return;
225
225
+
}
226
226
+
}
227
227
+
Err(broadcast::error::RecvError::Lagged(_)) => {
228
228
+
// continue to catch up
229
229
+
}
230
230
+
Err(broadcast::error::RecvError::Closed) => {
231
231
+
break;
232
232
+
}
216
233
}
217
217
-
}
218
218
-
219
219
-
let _ = cancel_tx.send(());
220
220
-
if let Err(e) = thread.join() {
221
221
-
error!(err = ?e, "stream handler thread panicked");
222
234
}
223
235
}