at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at e558361eb571e7e019a4a7967bb4ae7e666f3f25 102 lines 2.9 kB view raw
1use std::sync::Arc; 2 3use crate::api::AppState; 4use crate::db; 5use crate::db::filter::EXCLUDE_PREFIX; 6use crate::filter::{FilterMode, SetUpdate}; 7use axum::{ 8 Json, Router, 9 extract::State, 10 http::StatusCode, 11 routing::{get, patch}, 12}; 13use miette::IntoDiagnostic; 14use serde::{Deserialize, Serialize}; 15 16pub fn router() -> Router<Arc<AppState>> { 17 Router::new() 18 .route("/filter", get(handle_get_filter)) 19 .route("/filter", patch(handle_patch_filter)) 20} 21 22#[derive(Serialize)] 23pub struct FilterResponse { 24 pub mode: FilterMode, 25 pub signals: Vec<String>, 26 pub collections: Vec<String>, 27 pub excludes: Vec<String>, 28} 29 30pub async fn handle_get_filter( 31 State(state): State<Arc<AppState>>, 32) -> Result<Json<FilterResponse>, (StatusCode, String)> { 33 let filter_ks = state.db.filter.clone(); 34 let resp = tokio::task::spawn_blocking(move || { 35 let hot = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 36 let excludes = 37 db::filter::read_set(&filter_ks, EXCLUDE_PREFIX).map_err(|e| e.to_string())?; 38 Ok::<_, String>(FilterResponse { 39 mode: hot.mode, 40 signals: hot.signals.iter().map(|s| s.to_string()).collect(), 41 collections: hot.collections.iter().map(|s| s.to_string()).collect(), 42 excludes, 43 }) 44 }) 45 .await 46 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 47 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 48 49 Ok(Json(resp)) 50} 51 52#[derive(Deserialize)] 53pub struct FilterPatch { 54 pub mode: Option<FilterMode>, 55 pub signals: Option<SetUpdate>, 56 pub collections: Option<SetUpdate>, 57 pub excludes: Option<SetUpdate>, 58} 59 60pub async fn handle_patch_filter( 61 State(state): State<Arc<AppState>>, 62 Json(patch): Json<FilterPatch>, 63) -> Result<StatusCode, (StatusCode, String)> { 64 let db = &state.db; 65 66 let filter_ks = db.filter.clone(); 67 let inner = db.inner.clone(); 68 69 let patch_mode = patch.mode; 70 let patch_signals = patch.signals; 71 let patch_collections = patch.collections; 72 let patch_excludes = patch.excludes; 73 74 let new_filter = tokio::task::spawn_blocking(move || { 75 let mut batch = inner.batch(); 76 77 db::filter::apply_patch( 78 &mut batch, 79 &filter_ks, 80 patch_mode, 81 patch_signals, 82 patch_collections, 83 patch_excludes, 84 ) 85 .map_err(|e| e.to_string())?; 86 87 batch 88 .commit() 89 .into_diagnostic() 90 .map_err(|e| e.to_string())?; 91 92 let new_filter = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 93 Ok::<_, String>(new_filter) 94 }) 95 .await 96 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 97 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 98 99 state.filter.store(Arc::new(new_filter)); 100 101 Ok(StatusCode::OK) 102}