forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use std::sync::Arc;
2
3use axum::{
4 Json, Router,
5 extract::State,
6 http::StatusCode,
7 routing::{get, patch},
8};
9use miette::IntoDiagnostic;
10use rand::Rng;
11use serde::{Deserialize, Serialize};
12
13use crate::api::AppState;
14use crate::db::{self, keys, ser_repo_state};
15use crate::filter::{DID_PREFIX, EXCLUDE_PREFIX, FilterConfig, FilterMode, SetUpdate};
16use crate::types::{GaugeState, RepoState};
17
18pub fn router() -> Router<Arc<AppState>> {
19 Router::new()
20 .route("/filter", get(handle_get_filter))
21 .route("/filter", patch(handle_patch_filter))
22}
23
24#[derive(Serialize)]
25pub struct FilterResponse {
26 pub mode: FilterMode,
27 pub dids: Vec<String>,
28 pub signals: Vec<String>,
29 pub collections: Vec<String>,
30 pub excludes: Vec<String>,
31}
32
33pub async fn handle_get_filter(
34 State(state): State<Arc<AppState>>,
35) -> Result<Json<FilterResponse>, (StatusCode, String)> {
36 let filter_ks = state.db.filter.clone();
37 let resp = tokio::task::spawn_blocking(move || {
38 let hot = FilterConfig::load(&filter_ks).map_err(|e| e.to_string())?;
39 let dids = db::filter::read_set(&filter_ks, DID_PREFIX).map_err(|e| e.to_string())?;
40 let excludes =
41 db::filter::read_set(&filter_ks, EXCLUDE_PREFIX).map_err(|e| e.to_string())?;
42 Ok::<_, String>(FilterResponse {
43 mode: hot.mode,
44 dids,
45 signals: hot.signals.iter().map(|s| s.to_string()).collect(),
46 collections: hot.collections.iter().map(|s| s.to_string()).collect(),
47 excludes,
48 })
49 })
50 .await
51 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
52 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
53
54 Ok(Json(resp))
55}
56
57#[derive(Deserialize)]
58pub struct FilterPatch {
59 pub mode: Option<FilterMode>,
60 pub dids: Option<SetUpdate>,
61 pub signals: Option<SetUpdate>,
62 pub collections: Option<SetUpdate>,
63 pub excludes: Option<SetUpdate>,
64}
65
66pub async fn handle_patch_filter(
67 State(state): State<Arc<AppState>>,
68 Json(patch): Json<FilterPatch>,
69) -> Result<StatusCode, (StatusCode, String)> {
70 let db = &state.db;
71
72 let new_dids: Option<Vec<String>> = match &patch.dids {
73 Some(SetUpdate::Set(dids)) => Some(dids.clone()),
74 Some(SetUpdate::Patch(map)) => {
75 let added: Vec<String> = map
76 .iter()
77 .filter(|(_, add)| **add)
78 .map(|(d, _)| d.clone())
79 .collect();
80 (!added.is_empty()).then_some(added)
81 }
82 None => None,
83 };
84
85 let filter_ks = db.filter.clone();
86 let repos_ks = db.repos.clone();
87 let pending_ks = db.pending.clone();
88 let inner = db.inner.clone();
89
90 let patch_mode = patch.mode;
91 let patch_dids = patch.dids;
92 let patch_signals = patch.signals;
93 let patch_collections = patch.collections;
94 let patch_excludes = patch.excludes;
95
96 let (new_repo_count, new_filter) = tokio::task::spawn_blocking(move || {
97 let mut batch = inner.batch();
98
99 db::filter::apply_patch(
100 &mut batch,
101 &filter_ks,
102 patch_mode,
103 patch_dids,
104 patch_signals,
105 patch_collections,
106 patch_excludes,
107 )
108 .map_err(|e| e.to_string())?;
109
110 let mut added = 0i64;
111
112 if let Some(dids) = new_dids {
113 for did_str in &dids {
114 let did =
115 jacquard::types::did::Did::new_owned(did_str).map_err(|e| e.to_string())?;
116 let did_key = keys::repo_key(&did);
117 let exists = repos_ks
118 .contains_key(&did_key)
119 .into_diagnostic()
120 .map_err(|e| e.to_string())?;
121 if !exists {
122 let repo_state = RepoState::backfilling(rand::rng().next_u64());
123 let bytes = ser_repo_state(&repo_state).map_err(|e| e.to_string())?;
124 batch.insert(&repos_ks, &did_key, bytes);
125 batch.insert(
126 &pending_ks,
127 keys::pending_key(repo_state.index_id),
128 &did_key,
129 );
130 added += 1;
131 }
132 }
133 }
134
135 batch
136 .commit()
137 .into_diagnostic()
138 .map_err(|e| e.to_string())?;
139
140 let new_filter = db::filter::load(&filter_ks).map_err(|e| e.to_string())?;
141 Ok::<_, String>((added, new_filter))
142 })
143 .await
144 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
145 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
146
147 state.filter.store(Arc::new(new_filter));
148
149 if new_repo_count > 0 {
150 state.db.update_count_async("repos", new_repo_count).await;
151 for _ in 0..new_repo_count {
152 state
153 .db
154 .update_gauge_diff_async(&GaugeState::Synced, &GaugeState::Pending)
155 .await;
156 }
157 state.notify_backfill();
158 }
159
160 Ok(StatusCode::OK)
161}