at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::sync::Arc;
2
3use axum::{
4 Router,
5 body::Body,
6 extract::{Query, State},
7 http::{StatusCode, header},
8 response::{IntoResponse, Response},
9 routing::{delete, get, put},
10};
11use jacquard::IntoStatic;
12use miette::IntoDiagnostic;
13use rand::Rng;
14use serde::{Deserialize, Serialize};
15
16use crate::api::AppState;
17use crate::db::{keys, ser_repo_state};
18use crate::types::{GaugeState, RepoState};
19
20pub fn router() -> Router<Arc<AppState>> {
21 Router::new()
22 .route("/repos", get(handle_get_repos))
23 .route("/repos", put(handle_put_repos))
24 .route("/repos", delete(handle_delete_repos))
25}
26
27#[derive(Deserialize, Debug)]
28pub struct RepoRequest {
29 pub did: String,
30 #[serde(skip_serializing_if = "Option::is_none", rename = "deleteData")]
31 pub delete_data: Option<bool>,
32}
33
34#[derive(Serialize, Debug)]
35pub struct RepoResponse {
36 pub did: String,
37 pub status: String,
38 pub tracked: bool,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub rev: Option<String>,
41 pub last_updated_at: i64,
42}
43
44#[derive(Deserialize)]
45pub struct DeleteParams {
46 #[serde(default)]
47 pub delete_data: bool,
48}
49
50pub async fn handle_get_repos(
51 State(state): State<Arc<AppState>>,
52) -> Result<Response, (StatusCode, String)> {
53 let repos_ks = state.db.repos.clone();
54
55 let stream = futures::stream::iter(repos_ks.prefix(&[]).filter_map(|item| {
56 let (k, v) = item.into_inner().ok()?;
57 let did_str = std::str::from_utf8(&k[2..]).ok()?;
58 let repo_state = crate::db::deser_repo_state(&v).ok()?;
59
60 let response = RepoResponse {
61 did: did_str.to_string(),
62 status: repo_state.status.to_string(),
63 tracked: repo_state.tracked,
64 rev: repo_state.rev.as_ref().map(|r| r.to_string()),
65 last_updated_at: repo_state.last_updated_at,
66 };
67
68 let json = serde_json::to_string(&response).ok()?;
69 Some(Ok::<_, std::io::Error>(format!("{json}\n")))
70 }));
71
72 let body = Body::from_stream(stream);
73
74 Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response())
75}
76
77pub async fn handle_put_repos(
78 State(state): State<Arc<AppState>>,
79 req: axum::extract::Request,
80) -> Result<StatusCode, (StatusCode, String)> {
81 let items = parse_body(req).await?;
82
83 let state_task = state.clone();
84 let (new_repo_count, gauge_transitions) = tokio::task::spawn_blocking(move || {
85 let db = &state_task.db;
86 let mut batch = db.inner.batch();
87 let mut added = 0i64;
88 let mut gauge_transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
89
90 for item in items {
91 let did = match jacquard::types::did::Did::new_owned(&item.did) {
92 Ok(d) => d,
93 Err(_) => continue,
94 };
95 let did_key = keys::repo_key(&did);
96
97 let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) {
98 crate::db::deser_repo_state(&bytes)
99 .ok()
100 .map(|s| s.into_static())
101 } else {
102 None
103 };
104
105 if let Some(mut repo_state) = existing_state {
106 if !repo_state.tracked {
107 let resync_bytes_opt = db.resync.get(&did_key).ok().flatten();
108 let old_gauge =
109 crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref());
110
111 repo_state.tracked = true;
112 // re-enqueue into pending
113 if let Ok(bytes) = ser_repo_state(&repo_state) {
114 batch.insert(&db.repos, &did_key, bytes);
115 }
116 batch.insert(
117 &db.pending,
118 keys::pending_key(repo_state.index_id),
119 &did_key,
120 );
121 batch.remove(&db.resync, &did_key);
122 gauge_transitions.push((old_gauge, GaugeState::Pending));
123 }
124 } else {
125 let repo_state = RepoState::backfilling(rand::rng().next_u64());
126 if let Ok(bytes) = ser_repo_state(&repo_state) {
127 batch.insert(&db.repos, &did_key, bytes);
128 }
129 batch.insert(
130 &db.pending,
131 keys::pending_key(repo_state.index_id),
132 &did_key,
133 );
134 added += 1;
135 gauge_transitions.push((GaugeState::Synced, GaugeState::Pending)); // pseudo-transition to just inc pending
136 }
137 }
138
139 batch
140 .commit()
141 .into_diagnostic()
142 .map_err(|e| e.to_string())?;
143 Ok::<_, String>((added, gauge_transitions))
144 })
145 .await
146 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
147 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
148
149 if new_repo_count > 0 {
150 state.db.update_count_async("repos", new_repo_count).await;
151 }
152 for (old, new) in gauge_transitions {
153 state.db.update_gauge_diff_async(&old, &new).await;
154 }
155
156 // Always notify backfill if anything was added to pending!
157 state.notify_backfill();
158
159 Ok(StatusCode::OK)
160}
161
162pub async fn handle_delete_repos(
163 State(state): State<Arc<AppState>>,
164 Query(params): Query<DeleteParams>,
165 req: axum::extract::Request,
166) -> Result<StatusCode, (StatusCode, String)> {
167 let items = parse_body(req).await?;
168
169 let state_task = state.clone();
170 let (deleted_count, gauge_decrements) = tokio::task::spawn_blocking(move || {
171 let db = &state_task.db;
172 let mut batch = db.inner.batch();
173 let mut deleted_count = 0i64;
174 let mut gauge_decrements = Vec::new();
175
176 for item in items {
177 let did = match jacquard::types::did::Did::new_owned(&item.did) {
178 Ok(d) => d,
179 Err(_) => continue,
180 };
181
182 let delete_data = item.delete_data.unwrap_or(params.delete_data);
183 let did_key = keys::repo_key(&did);
184
185 let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) {
186 crate::db::deser_repo_state(&bytes)
187 .ok()
188 .map(|s| s.into_static())
189 } else {
190 None
191 };
192
193 if let Some(mut repo_state) = existing_state {
194 let resync_bytes_opt = db.resync.get(&did_key).ok().flatten();
195 let old_gauge =
196 crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref());
197
198 if delete_data {
199 if crate::ops::delete_repo(&mut batch, db, &did, &repo_state).is_ok() {
200 deleted_count += 1;
201 if old_gauge != GaugeState::Synced {
202 gauge_decrements.push(old_gauge);
203 }
204 } else {
205 tracing::error!("failed to apply delete_repo_batch to {}", did);
206 }
207 } else if repo_state.tracked {
208 repo_state.tracked = false;
209 if let Ok(bytes) = ser_repo_state(&repo_state) {
210 batch.insert(&db.repos, &did_key, bytes);
211 }
212 batch.remove(&db.pending, keys::pending_key(repo_state.index_id));
213 batch.remove(&db.resync, &did_key);
214 if old_gauge != GaugeState::Synced {
215 gauge_decrements.push(old_gauge);
216 }
217 }
218 }
219 }
220
221 batch
222 .commit()
223 .into_diagnostic()
224 .map_err(|e| e.to_string())?;
225
226 Ok::<_, String>((deleted_count, gauge_decrements))
227 })
228 .await
229 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
230 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
231
232 if deleted_count > 0 {
233 state.db.update_count_async("repos", -deleted_count).await;
234 }
235 for gauge in gauge_decrements {
236 state
237 .db
238 .update_gauge_diff_async(&gauge, &GaugeState::Synced)
239 .await;
240 }
241
242 Ok(StatusCode::OK)
243}
244
245async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> {
246 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
247 .await
248 .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
249
250 let text =
251 std::str::from_utf8(&body_bytes).map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
252
253 let trimmed = text.trim();
254 if trimmed.starts_with('[') {
255 serde_json::from_str::<Vec<RepoRequest>>(trimmed).map_err(|e| {
256 (
257 StatusCode::BAD_REQUEST,
258 format!("invalid JSON array: {}", e),
259 )
260 })
261 } else {
262 trimmed
263 .lines()
264 .filter(|l| !l.trim().is_empty())
265 .map(|line| {
266 serde_json::from_str::<RepoRequest>(line).map_err(|e| {
267 (
268 StatusCode::BAD_REQUEST,
269 format!("invalid NDJSON line: {}", e),
270 )
271 })
272 })
273 .collect()
274 }
275}