tangled
alpha
login
or
join now
ptr.pet
/
hydrant
24
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
24
fork
atom
overview
issues
6
pulls
pipelines
[api] event ack route, unused for now
ptr.pet
4 days ago
8d487760
98b08acb
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+48
-5
2 changed files
expand all
collapse all
unified
split
src
api
mod.rs
stream.rs
+5
-5
src/api/mod.rs
···
5
5
use tower_http::trace::TraceLayer;
6
6
7
7
mod debug;
8
8
-
pub mod filter;
9
9
-
pub mod repos;
10
10
-
pub mod stats;
8
8
+
mod filter;
9
9
+
mod repos;
10
10
+
mod stats;
11
11
mod stream;
12
12
-
pub mod xrpc;
12
12
+
mod xrpc;
13
13
14
14
pub async fn serve(state: Arc<AppState>, port: u16) -> miette::Result<()> {
15
15
let app = Router::new()
16
16
.route("/health", get(|| async { "OK" }))
17
17
.route("/stats", get(stats::get_stats))
18
18
-
.route("/stream", get(stream::handle_stream))
18
18
+
.merge(stream::router())
19
19
.merge(xrpc::router())
20
20
.merge(filter::router())
21
21
.merge(repos::router())
+43
src/api/stream.rs
···
1
1
use crate::api::AppState;
2
2
use crate::db::keys;
3
3
use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent};
4
4
+
use axum::Router;
5
5
+
use axum::http::StatusCode;
6
6
+
use axum::routing::get;
4
7
use axum::{
5
8
extract::{
6
9
Query, State,
···
14
17
use std::sync::Arc;
15
18
use tokio::sync::{broadcast, mpsc, oneshot};
16
19
use tracing::{error, info_span, warn};
20
20
+
21
21
+
pub fn router() -> Router<Arc<AppState>> {
22
22
+
Router::new().route("/", get(handle_stream))
23
23
+
// .route("/ack", post(handle_ack))
24
24
+
}
25
25
+
26
26
+
#[allow(dead_code)]
27
27
+
#[derive(Deserialize)]
28
28
+
pub struct AckBody {
29
29
+
pub ids: Vec<u64>,
30
30
+
}
31
31
+
32
32
+
#[allow(dead_code)]
33
33
+
pub async fn handle_ack(
34
34
+
State(state): State<Arc<AppState>>,
35
35
+
axum::Json(body): axum::Json<AckBody>,
36
36
+
) -> Result<StatusCode, (StatusCode, String)> {
37
37
+
if body.ids.is_empty() {
38
38
+
return Ok(StatusCode::OK);
39
39
+
}
40
40
+
41
41
+
let state = state.clone();
42
42
+
let ids = body.ids;
43
43
+
tokio::task::spawn_blocking(move || {
44
44
+
let mut batch = state.db.inner.batch();
45
45
+
for &id in &ids {
46
46
+
batch.remove(&state.db.events, keys::event_key(id));
47
47
+
}
48
48
+
batch
49
49
+
.commit()
50
50
+
.into_diagnostic()
51
51
+
.wrap_err("failed to delete events")
52
52
+
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
53
53
+
Ok(StatusCode::OK)
54
54
+
})
55
55
+
.await
56
56
+
.into_diagnostic()
57
57
+
.wrap_err("panicked while deleting events")
58
58
+
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
59
59
+
}
17
60
18
61
#[derive(Deserialize)]
19
62
pub struct StreamQuery {