at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::sync::Arc;
2
3use crate::api::AppState;
4use crate::db;
5use crate::db::filter::EXCLUDE_PREFIX;
6use crate::filter::{FilterMode, SetUpdate};
7use axum::{
8 Json, Router,
9 extract::State,
10 http::StatusCode,
11 routing::{get, patch},
12};
13use miette::IntoDiagnostic;
14use serde::{Deserialize, Serialize};
15
16pub fn router() -> Router<Arc<AppState>> {
17 Router::new()
18 .route("/filter", get(handle_get_filter))
19 .route("/filter", patch(handle_patch_filter))
20}
21
22#[derive(Serialize)]
23pub struct FilterResponse {
24 pub mode: FilterMode,
25 pub signals: Vec<String>,
26 pub collections: Vec<String>,
27 pub excludes: Vec<String>,
28}
29
30pub async fn handle_get_filter(
31 State(state): State<Arc<AppState>>,
32) -> Result<Json<FilterResponse>, (StatusCode, String)> {
33 let filter_ks = state.db.filter.clone();
34 let resp = tokio::task::spawn_blocking(move || {
35 let hot = db::filter::load(&filter_ks).map_err(|e| e.to_string())?;
36 let excludes =
37 db::filter::read_set(&filter_ks, EXCLUDE_PREFIX).map_err(|e| e.to_string())?;
38 Ok::<_, String>(FilterResponse {
39 mode: hot.mode,
40 signals: hot.signals.iter().map(|s| s.to_string()).collect(),
41 collections: hot.collections.iter().map(|s| s.to_string()).collect(),
42 excludes,
43 })
44 })
45 .await
46 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
47 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
48
49 Ok(Json(resp))
50}
51
52#[derive(Deserialize)]
53pub struct FilterPatch {
54 pub mode: Option<FilterMode>,
55 pub signals: Option<SetUpdate>,
56 pub collections: Option<SetUpdate>,
57 pub excludes: Option<SetUpdate>,
58}
59
60pub async fn handle_patch_filter(
61 State(state): State<Arc<AppState>>,
62 Json(patch): Json<FilterPatch>,
63) -> Result<StatusCode, (StatusCode, String)> {
64 let db = &state.db;
65
66 let filter_ks = db.filter.clone();
67 let inner = db.inner.clone();
68
69 let patch_mode = patch.mode;
70 let patch_signals = patch.signals;
71 let patch_collections = patch.collections;
72 let patch_excludes = patch.excludes;
73
74 let new_filter = tokio::task::spawn_blocking(move || {
75 let mut batch = inner.batch();
76
77 db::filter::apply_patch(
78 &mut batch,
79 &filter_ks,
80 patch_mode,
81 patch_signals,
82 patch_collections,
83 patch_excludes,
84 )
85 .map_err(|e| e.to_string())?;
86
87 batch
88 .commit()
89 .into_diagnostic()
90 .map_err(|e| e.to_string())?;
91
92 let new_filter = db::filter::load(&filter_ks).map_err(|e| e.to_string())?;
93 Ok::<_, String>(new_filter)
94 })
95 .await
96 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
97 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
98
99 state.filter.store(Arc::new(new_filter));
100
101 Ok(StatusCode::OK)
102}