kind of like tap but different and in rust
at main 161 lines 5.1 kB view raw
1use std::sync::Arc; 2 3use axum::{ 4 Json, Router, 5 extract::State, 6 http::StatusCode, 7 routing::{get, patch}, 8}; 9use miette::IntoDiagnostic; 10use rand::Rng; 11use serde::{Deserialize, Serialize}; 12 13use crate::api::AppState; 14use crate::db::{self, keys, ser_repo_state}; 15use crate::filter::{DID_PREFIX, EXCLUDE_PREFIX, FilterConfig, FilterMode, SetUpdate}; 16use crate::types::{GaugeState, RepoState}; 17 18pub fn router() -> Router<Arc<AppState>> { 19 Router::new() 20 .route("/filter", get(handle_get_filter)) 21 .route("/filter", patch(handle_patch_filter)) 22} 23 24#[derive(Serialize)] 25pub struct FilterResponse { 26 pub mode: FilterMode, 27 pub dids: Vec<String>, 28 pub signals: Vec<String>, 29 pub collections: Vec<String>, 30 pub excludes: Vec<String>, 31} 32 33pub async fn handle_get_filter( 34 State(state): State<Arc<AppState>>, 35) -> Result<Json<FilterResponse>, (StatusCode, String)> { 36 let filter_ks = state.db.filter.clone(); 37 let resp = tokio::task::spawn_blocking(move || { 38 let hot = FilterConfig::load(&filter_ks).map_err(|e| e.to_string())?; 39 let dids = db::filter::read_set(&filter_ks, DID_PREFIX).map_err(|e| e.to_string())?; 40 let excludes = 41 db::filter::read_set(&filter_ks, EXCLUDE_PREFIX).map_err(|e| e.to_string())?; 42 Ok::<_, String>(FilterResponse { 43 mode: hot.mode, 44 dids, 45 signals: hot.signals.iter().map(|s| s.to_string()).collect(), 46 collections: hot.collections.iter().map(|s| s.to_string()).collect(), 47 excludes, 48 }) 49 }) 50 .await 51 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 52 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 53 54 Ok(Json(resp)) 55} 56 57#[derive(Deserialize)] 58pub struct FilterPatch { 59 pub mode: Option<FilterMode>, 60 pub dids: Option<SetUpdate>, 61 pub signals: Option<SetUpdate>, 62 pub collections: Option<SetUpdate>, 63 pub excludes: Option<SetUpdate>, 64} 65 66pub async fn handle_patch_filter( 67 State(state): State<Arc<AppState>>, 68 Json(patch): Json<FilterPatch>, 69) -> Result<StatusCode, (StatusCode, String)> { 70 let db = &state.db; 71 72 let new_dids: Option<Vec<String>> = match &patch.dids { 73 Some(SetUpdate::Set(dids)) => Some(dids.clone()), 74 Some(SetUpdate::Patch(map)) => { 75 let added: Vec<String> = map 76 .iter() 77 .filter(|(_, add)| **add) 78 .map(|(d, _)| d.clone()) 79 .collect(); 80 (!added.is_empty()).then_some(added) 81 } 82 None => None, 83 }; 84 85 let filter_ks = db.filter.clone(); 86 let repos_ks = db.repos.clone(); 87 let pending_ks = db.pending.clone(); 88 let inner = db.inner.clone(); 89 90 let patch_mode = patch.mode; 91 let patch_dids = patch.dids; 92 let patch_signals = patch.signals; 93 let patch_collections = patch.collections; 94 let patch_excludes = patch.excludes; 95 96 let (new_repo_count, new_filter) = tokio::task::spawn_blocking(move || { 97 let mut batch = inner.batch(); 98 99 db::filter::apply_patch( 100 &mut batch, 101 &filter_ks, 102 patch_mode, 103 patch_dids, 104 patch_signals, 105 patch_collections, 106 patch_excludes, 107 ) 108 .map_err(|e| e.to_string())?; 109 110 let mut added = 0i64; 111 112 if let Some(dids) = new_dids { 113 for did_str in &dids { 114 let did = 115 jacquard::types::did::Did::new_owned(did_str).map_err(|e| e.to_string())?; 116 let did_key = keys::repo_key(&did); 117 let exists = repos_ks 118 .contains_key(&did_key) 119 .into_diagnostic() 120 .map_err(|e| e.to_string())?; 121 if !exists { 122 let repo_state = RepoState::backfilling(rand::rng().next_u64()); 123 let bytes = ser_repo_state(&repo_state).map_err(|e| e.to_string())?; 124 batch.insert(&repos_ks, &did_key, bytes); 125 batch.insert( 126 &pending_ks, 127 keys::pending_key(repo_state.index_id), 128 &did_key, 129 ); 130 added += 1; 131 } 132 } 133 } 134 135 batch 136 .commit() 137 .into_diagnostic() 138 .map_err(|e| e.to_string())?; 139 140 let new_filter = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 141 Ok::<_, String>((added, new_filter)) 142 }) 143 .await 144 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 145 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 146 147 state.filter.store(Arc::new(new_filter)); 148 149 if new_repo_count > 0 { 150 state.db.update_count_async("repos", new_repo_count).await; 151 for _ in 0..new_repo_count { 152 state 153 .db 154 .update_gauge_diff_async(&GaugeState::Synced, &GaugeState::Pending) 155 .await; 156 } 157 state.notify_backfill(); 158 } 159 160 Ok(StatusCode::OK) 161}