at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 4fe44f8a28620861fb772d40d55fbeb65716e5df 275 lines 9.3 kB view raw
1use std::sync::Arc; 2 3use axum::{ 4 Router, 5 body::Body, 6 extract::{Query, State}, 7 http::{StatusCode, header}, 8 response::{IntoResponse, Response}, 9 routing::{delete, get, put}, 10}; 11use jacquard::IntoStatic; 12use miette::IntoDiagnostic; 13use rand::Rng; 14use serde::{Deserialize, Serialize}; 15 16use crate::api::AppState; 17use crate::db::{keys, ser_repo_state}; 18use crate::types::{GaugeState, RepoState}; 19 20pub fn router() -> Router<Arc<AppState>> { 21 Router::new() 22 .route("/repos", get(handle_get_repos)) 23 .route("/repos", put(handle_put_repos)) 24 .route("/repos", delete(handle_delete_repos)) 25} 26 27#[derive(Deserialize, Debug)] 28pub struct RepoRequest { 29 pub did: String, 30 #[serde(skip_serializing_if = "Option::is_none", rename = "deleteData")] 31 pub delete_data: Option<bool>, 32} 33 34#[derive(Serialize, Debug)] 35pub struct RepoResponse { 36 pub did: String, 37 pub status: String, 38 pub tracked: bool, 39 #[serde(skip_serializing_if = "Option::is_none")] 40 pub rev: Option<String>, 41 pub last_updated_at: i64, 42} 43 44#[derive(Deserialize)] 45pub struct DeleteParams { 46 #[serde(default)] 47 pub delete_data: bool, 48} 49 50pub async fn handle_get_repos( 51 State(state): State<Arc<AppState>>, 52) -> Result<Response, (StatusCode, String)> { 53 let repos_ks = state.db.repos.clone(); 54 55 let stream = futures::stream::iter(repos_ks.prefix(&[]).filter_map(|item| { 56 let (k, v) = item.into_inner().ok()?; 57 let did_str = std::str::from_utf8(&k[2..]).ok()?; 58 let repo_state = crate::db::deser_repo_state(&v).ok()?; 59 60 let response = RepoResponse { 61 did: did_str.to_string(), 62 status: repo_state.status.to_string(), 63 tracked: repo_state.tracked, 64 rev: repo_state.rev.as_ref().map(|r| r.to_string()), 65 last_updated_at: repo_state.last_updated_at, 66 }; 67 68 let json = serde_json::to_string(&response).ok()?; 69 Some(Ok::<_, std::io::Error>(format!("{json}\n"))) 70 })); 71 72 let body = Body::from_stream(stream); 73 74 Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response()) 75} 76 77pub async fn handle_put_repos( 78 State(state): State<Arc<AppState>>, 79 req: axum::extract::Request, 80) -> Result<StatusCode, (StatusCode, String)> { 81 let items = parse_body(req).await?; 82 83 let state_task = state.clone(); 84 let (new_repo_count, gauge_transitions) = tokio::task::spawn_blocking(move || { 85 let db = &state_task.db; 86 let mut batch = db.inner.batch(); 87 let mut added = 0i64; 88 let mut gauge_transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 89 90 for item in items { 91 let did = match jacquard::types::did::Did::new_owned(&item.did) { 92 Ok(d) => d, 93 Err(_) => continue, 94 }; 95 let did_key = keys::repo_key(&did); 96 97 let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) { 98 crate::db::deser_repo_state(&bytes) 99 .ok() 100 .map(|s| s.into_static()) 101 } else { 102 None 103 }; 104 105 if let Some(mut repo_state) = existing_state { 106 if !repo_state.tracked { 107 let resync_bytes_opt = db.resync.get(&did_key).ok().flatten(); 108 let old_gauge = 109 crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()); 110 111 repo_state.tracked = true; 112 // re-enqueue into pending 113 if let Ok(bytes) = ser_repo_state(&repo_state) { 114 batch.insert(&db.repos, &did_key, bytes); 115 } 116 batch.insert( 117 &db.pending, 118 keys::pending_key(repo_state.index_id), 119 &did_key, 120 ); 121 batch.remove(&db.resync, &did_key); 122 gauge_transitions.push((old_gauge, GaugeState::Pending)); 123 } 124 } else { 125 let repo_state = RepoState::backfilling(rand::rng().next_u64()); 126 if let Ok(bytes) = ser_repo_state(&repo_state) { 127 batch.insert(&db.repos, &did_key, bytes); 128 } 129 batch.insert( 130 &db.pending, 131 keys::pending_key(repo_state.index_id), 132 &did_key, 133 ); 134 added += 1; 135 gauge_transitions.push((GaugeState::Synced, GaugeState::Pending)); // pseudo-transition to just inc pending 136 } 137 } 138 139 batch 140 .commit() 141 .into_diagnostic() 142 .map_err(|e| e.to_string())?; 143 Ok::<_, String>((added, gauge_transitions)) 144 }) 145 .await 146 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 147 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 148 149 if new_repo_count > 0 { 150 state.db.update_count_async("repos", new_repo_count).await; 151 } 152 for (old, new) in gauge_transitions { 153 state.db.update_gauge_diff_async(&old, &new).await; 154 } 155 156 // Always notify backfill if anything was added to pending! 157 state.notify_backfill(); 158 159 Ok(StatusCode::OK) 160} 161 162pub async fn handle_delete_repos( 163 State(state): State<Arc<AppState>>, 164 Query(params): Query<DeleteParams>, 165 req: axum::extract::Request, 166) -> Result<StatusCode, (StatusCode, String)> { 167 let items = parse_body(req).await?; 168 169 let state_task = state.clone(); 170 let (deleted_count, gauge_decrements) = tokio::task::spawn_blocking(move || { 171 let db = &state_task.db; 172 let mut batch = db.inner.batch(); 173 let mut deleted_count = 0i64; 174 let mut gauge_decrements = Vec::new(); 175 176 for item in items { 177 let did = match jacquard::types::did::Did::new_owned(&item.did) { 178 Ok(d) => d, 179 Err(_) => continue, 180 }; 181 182 let delete_data = item.delete_data.unwrap_or(params.delete_data); 183 let did_key = keys::repo_key(&did); 184 185 let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) { 186 crate::db::deser_repo_state(&bytes) 187 .ok() 188 .map(|s| s.into_static()) 189 } else { 190 None 191 }; 192 193 if let Some(mut repo_state) = existing_state { 194 let resync_bytes_opt = db.resync.get(&did_key).ok().flatten(); 195 let old_gauge = 196 crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()); 197 198 if delete_data { 199 if crate::ops::delete_repo(&mut batch, db, &did, &repo_state).is_ok() { 200 deleted_count += 1; 201 if old_gauge != GaugeState::Synced { 202 gauge_decrements.push(old_gauge); 203 } 204 } else { 205 tracing::error!("failed to apply delete_repo_batch to {}", did); 206 } 207 } else if repo_state.tracked { 208 repo_state.tracked = false; 209 if let Ok(bytes) = ser_repo_state(&repo_state) { 210 batch.insert(&db.repos, &did_key, bytes); 211 } 212 batch.remove(&db.pending, keys::pending_key(repo_state.index_id)); 213 batch.remove(&db.resync, &did_key); 214 if old_gauge != GaugeState::Synced { 215 gauge_decrements.push(old_gauge); 216 } 217 } 218 } 219 } 220 221 batch 222 .commit() 223 .into_diagnostic() 224 .map_err(|e| e.to_string())?; 225 226 Ok::<_, String>((deleted_count, gauge_decrements)) 227 }) 228 .await 229 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 230 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 231 232 if deleted_count > 0 { 233 state.db.update_count_async("repos", -deleted_count).await; 234 } 235 for gauge in gauge_decrements { 236 state 237 .db 238 .update_gauge_diff_async(&gauge, &GaugeState::Synced) 239 .await; 240 } 241 242 Ok(StatusCode::OK) 243} 244 245async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> { 246 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 247 .await 248 .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 249 250 let text = 251 std::str::from_utf8(&body_bytes).map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 252 253 let trimmed = text.trim(); 254 if trimmed.starts_with('[') { 255 serde_json::from_str::<Vec<RepoRequest>>(trimmed).map_err(|e| { 256 ( 257 StatusCode::BAD_REQUEST, 258 format!("invalid JSON array: {}", e), 259 ) 260 }) 261 } else { 262 trimmed 263 .lines() 264 .filter(|l| !l.trim().is_empty()) 265 .map(|line| { 266 serde_json::from_str::<RepoRequest>(line).map_err(|e| { 267 ( 268 StatusCode::BAD_REQUEST, 269 format!("invalid NDJSON line: {}", e), 270 ) 271 }) 272 }) 273 .collect() 274 } 275}