at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[api] refactor how repos api is impelemented to make more consistent

ptr.pet bbadc992 e42e3de8

verified
+248 -154
+107 -154
src/api/repos.rs
··· 8 8 response::{IntoResponse, Response}, 9 9 routing::{delete, get, put}, 10 10 }; 11 - use jacquard::IntoStatic; 11 + use jacquard::{IntoStatic, types::did::Did}; 12 12 use miette::IntoDiagnostic; 13 13 use rand::Rng; 14 14 use serde::{Deserialize, Serialize}; ··· 64 64 let items = tokio::task::spawn_blocking(move || { 65 65 let db = &state.db; 66 66 67 - let results = match partition.as_str() { 68 - "all" => { 69 - let start_bound = if let Some(cursor) = params.cursor { 70 - let did = jacquard::types::did::Did::new_owned(&cursor) 71 - .map_err(|_| (StatusCode::BAD_REQUEST, "invalid cursor DID".to_string()))?; 72 - let did_key = keys::repo_key(&did); 73 - std::ops::Bound::Excluded(did_key) 74 - } else { 75 - std::ops::Bound::Unbounded 76 - }; 67 + let to_response = |k: &[u8], v: &[u8]| -> Result<RepoResponse, (StatusCode, String)> { 68 + let repo_state = crate::db::deser_repo_state(v).map_err(internal)?; 69 + let did = crate::db::types::TrimmedDid::try_from(k) 70 + .map_err(internal)? 71 + .to_did(); 77 72 78 - let mut items = Vec::new(); 79 - for item in db 80 - .repos 81 - .range((start_bound, std::ops::Bound::Unbounded)) 82 - .take(limit) 83 - { 84 - let (k, v) = item 85 - .into_inner() 86 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 87 - let repo_state = crate::db::deser_repo_state(&v) 88 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 89 - let did = crate::db::types::TrimmedDid::try_from(k.as_ref()) 90 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 91 - .to_did(); 73 + Ok(RepoResponse { 74 + did: did.to_string(), 75 + status: repo_state.status.to_string(), 76 + tracked: repo_state.tracked, 77 + rev: repo_state.rev.as_ref().map(|r| r.to_string()), 78 + last_updated_at: repo_state.last_updated_at, 79 + }) 80 + }; 92 81 93 - items.push(RepoResponse { 94 - did: did.to_string(), 95 - status: repo_state.status.to_string(), 96 - tracked: repo_state.tracked, 97 - rev: repo_state.rev.as_ref().map(|r| r.to_string()), 98 - last_updated_at: repo_state.last_updated_at, 99 - }); 100 - } 101 - Ok::<_, (StatusCode, String)>(items) 102 - } 103 - "resync" => { 82 + let results = match partition.as_str() { 83 + "all" | "resync" => { 84 + let is_all = partition == "all"; 85 + let ks = if is_all { &db.repos } else { &db.resync }; 86 + 104 87 let start_bound = if let Some(cursor) = params.cursor { 105 - let did = jacquard::types::did::Did::new_owned(&cursor) 106 - .map_err(|_| (StatusCode::BAD_REQUEST, "invalid cursor DID".to_string()))?; 88 + let did = Did::new_owned(&cursor).map_err(bad_request)?; 107 89 let did_key = keys::repo_key(&did); 108 90 std::ops::Bound::Excluded(did_key) 109 91 } else { ··· 111 93 }; 112 94 113 95 let mut items = Vec::new(); 114 - for item in db 115 - .resync 96 + for item in ks 116 97 .range((start_bound, std::ops::Bound::Unbounded)) 117 98 .take(limit) 118 99 { 119 - let (k, _) = item 120 - .into_inner() 121 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 100 + let (k, v) = item.into_inner().map_err(internal)?; 122 101 123 - if let Ok(Some(v)) = db.repos.get(&k) { 124 - let repo_state = crate::db::deser_repo_state(&v) 125 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 126 - let did = crate::db::types::TrimmedDid::try_from(k.as_ref()) 127 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 128 - .to_did(); 102 + let repo_state_bytes = if is_all { 103 + v 104 + } else { 105 + db.repos.get(&k).map_err(internal)?.ok_or_else(|| { 106 + internal(format!("repository state missing for {}", partition)) 107 + })? 108 + }; 129 109 130 - items.push(RepoResponse { 131 - did: did.to_string(), 132 - status: repo_state.status.to_string(), 133 - tracked: repo_state.tracked, 134 - rev: repo_state.rev.as_ref().map(|r| r.to_string()), 135 - last_updated_at: repo_state.last_updated_at, 136 - }); 137 - } 110 + items.push(to_response(&k, &repo_state_bytes)?); 138 111 } 139 - Ok(items) 112 + Ok::<_, (StatusCode, String)>(items) 140 113 } 141 114 "pending" => { 142 115 let start_bound = if let Some(cursor) = params.cursor { 143 - let id = cursor 144 - .parse::<u64>() 145 - .map_err(|_| (StatusCode::BAD_REQUEST, "invalid cursor id".to_string()))?; 116 + let id = cursor.parse::<u64>().map_err(bad_request)?; 146 117 std::ops::Bound::Excluded(id.to_be_bytes().to_vec()) 147 118 } else { 148 119 std::ops::Bound::Unbounded ··· 154 125 .range((start_bound, std::ops::Bound::Unbounded)) 155 126 .take(limit) 156 127 { 157 - let (_, did_key) = item 158 - .into_inner() 159 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 128 + let (_, did_key) = item.into_inner().map_err(internal)?; 160 129 161 130 if let Ok(Some(v)) = db.repos.get(&did_key) { 162 - let repo_state = crate::db::deser_repo_state(&v) 163 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 164 - let did = crate::db::types::TrimmedDid::try_from(did_key.as_ref()) 165 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 166 - .to_did(); 167 - 168 - items.push(RepoResponse { 169 - did: did.to_string(), 170 - status: repo_state.status.to_string(), 171 - tracked: repo_state.tracked, 172 - rev: repo_state.rev.as_ref().map(|r| r.to_string()), 173 - last_updated_at: repo_state.last_updated_at, 174 - }); 131 + items.push(to_response(&did_key, &v)?); 175 132 } 176 133 } 177 134 Ok(items) ··· 182 139 Ok::<_, (StatusCode, String)>(results) 183 140 }) 184 141 .await 185 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))??; 142 + .map_err(internal)??; 186 143 187 144 use futures::StreamExt; 188 145 ··· 209 166 let mut batch = db.inner.batch(); 210 167 let mut added = 0i64; 211 168 let mut gauge_transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 169 + 170 + let mut rng = rand::rng(); 212 171 213 172 for item in items { 214 - let did = match jacquard::types::did::Did::new_owned(&item.did) { 215 - Ok(d) => d, 216 - Err(_) => continue, 217 - }; 173 + let did = Did::new(&item.did).map_err(bad_request)?; 218 174 let did_key = keys::repo_key(&did); 219 175 220 - let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) { 221 - crate::db::deser_repo_state(&bytes) 222 - .ok() 223 - .map(|s| s.into_static()) 224 - } else { 225 - None 226 - }; 176 + let repo_bytes = db.repos.get(&did_key).map_err(internal)?; 177 + let existing_state = repo_bytes 178 + .as_deref() 179 + .map(crate::db::deser_repo_state) 180 + .transpose() 181 + .map_err(internal)?; 227 182 228 183 if let Some(mut repo_state) = existing_state { 229 184 if !repo_state.tracked { 230 - let resync_bytes_opt = db.resync.get(&did_key).ok().flatten(); 185 + let resync_bytes = db.resync.get(&did_key).map_err(internal)?; 231 186 let old_gauge = 232 - crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()); 187 + crate::db::Db::repo_gauge_state(&repo_state, resync_bytes.as_deref()); 233 188 234 189 repo_state.tracked = true; 235 190 // re-enqueue into pending 236 - if let Ok(bytes) = ser_repo_state(&repo_state) { 237 - batch.insert(&db.repos, &did_key, bytes); 238 - } 191 + batch.insert( 192 + &db.repos, 193 + &did_key, 194 + ser_repo_state(&repo_state).map_err(internal)?, 195 + ); 239 196 batch.insert( 240 197 &db.pending, 241 198 keys::pending_key(repo_state.index_id), ··· 245 202 gauge_transitions.push((old_gauge, GaugeState::Pending)); 246 203 } 247 204 } else { 248 - let repo_state = RepoState::backfilling(rand::rng().next_u64()); 249 - if let Ok(bytes) = ser_repo_state(&repo_state) { 250 - batch.insert(&db.repos, &did_key, bytes); 251 - } 205 + let repo_state = RepoState::backfilling(rng.next_u64()); 206 + batch.insert( 207 + &db.repos, 208 + &did_key, 209 + ser_repo_state(&repo_state).map_err(internal)?, 210 + ); 252 211 batch.insert( 253 212 &db.pending, 254 213 keys::pending_key(repo_state.index_id), ··· 259 218 } 260 219 } 261 220 262 - batch 263 - .commit() 264 - .into_diagnostic() 265 - .map_err(|e| e.to_string())?; 266 - Ok::<_, String>((added, gauge_transitions)) 221 + batch.commit().into_diagnostic().map_err(internal)?; 222 + 223 + Ok::<_, (StatusCode, String)>((added, gauge_transitions)) 267 224 }) 268 225 .await 269 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 270 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 226 + .map_err(internal)??; 271 227 272 228 if new_repo_count > 0 { 273 229 state.db.update_count_async("repos", new_repo_count).await; ··· 297 253 let mut gauge_decrements = Vec::new(); 298 254 299 255 for item in items { 300 - let did = match jacquard::types::did::Did::new_owned(&item.did) { 301 - Ok(d) => d, 302 - Err(_) => continue, 303 - }; 304 - 256 + let did = Did::new(&item.did).map_err(bad_request)?; 305 257 let delete_data = item.delete_data.unwrap_or(params.delete_data); 306 258 let did_key = keys::repo_key(&did); 307 259 308 - let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) { 309 - crate::db::deser_repo_state(&bytes) 310 - .ok() 311 - .map(|s| s.into_static()) 312 - } else { 313 - None 314 - }; 260 + let repo_bytes = db.repos.get(&did_key).map_err(internal)?; 261 + let existing_state = repo_bytes 262 + .as_deref() 263 + .map(crate::db::deser_repo_state) 264 + .transpose() 265 + .map_err(internal)?; 315 266 316 - if let Some(mut repo_state) = existing_state { 317 - let resync_bytes_opt = db.resync.get(&did_key).ok().flatten(); 267 + if let Some(repo_state) = existing_state { 268 + let resync_bytes = db.resync.get(&did_key).map_err(internal)?; 318 269 let old_gauge = 319 - crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()); 270 + crate::db::Db::repo_gauge_state(&repo_state, resync_bytes.as_deref()); 320 271 321 272 if delete_data { 322 - if crate::ops::delete_repo(&mut batch, db, &did, &repo_state).is_ok() { 323 - deleted_count += 1; 324 - if old_gauge != GaugeState::Synced { 325 - gauge_decrements.push(old_gauge); 326 - } 327 - } else { 328 - tracing::error!("failed to apply delete_repo_batch to {}", did); 273 + crate::ops::delete_repo(&mut batch, db, &did, &repo_state).map_err(internal)?; 274 + deleted_count += 1; 275 + if old_gauge != GaugeState::Synced { 276 + gauge_decrements.push(old_gauge); 329 277 } 330 278 } else if repo_state.tracked { 279 + let mut repo_state = repo_state.into_static(); 331 280 repo_state.tracked = false; 332 - if let Ok(bytes) = ser_repo_state(&repo_state) { 333 - batch.insert(&db.repos, &did_key, bytes); 334 - } 281 + batch.insert( 282 + &db.repos, 283 + &did_key, 284 + ser_repo_state(&repo_state).map_err(internal)?, 285 + ); 335 286 batch.remove(&db.pending, keys::pending_key(repo_state.index_id)); 336 287 batch.remove(&db.resync, &did_key); 337 288 if old_gauge != GaugeState::Synced { ··· 341 292 } 342 293 } 343 294 344 - batch 345 - .commit() 346 - .into_diagnostic() 347 - .map_err(|e| e.to_string())?; 295 + batch.commit().into_diagnostic().map_err(internal)?; 348 296 349 - Ok::<_, String>((deleted_count, gauge_decrements)) 297 + Ok::<_, (StatusCode, String)>((deleted_count, gauge_decrements)) 350 298 }) 351 299 .await 352 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 353 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 300 + .map_err(internal)??; 354 301 355 302 if deleted_count > 0 { 356 303 state.db.update_count_async("repos", -deleted_count).await; ··· 366 313 } 367 314 368 315 async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> { 316 + let content_type = req 317 + .headers() 318 + .get(header::CONTENT_TYPE) 319 + .and_then(|h| h.to_str().ok()) 320 + .unwrap_or("") 321 + .to_string(); 322 + 369 323 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 370 324 .await 371 - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 372 - 373 - let text = 374 - std::str::from_utf8(&body_bytes).map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 325 + .map_err(bad_request)?; 375 326 327 + let text = std::str::from_utf8(&body_bytes).map_err(bad_request)?; 376 328 let trimmed = text.trim(); 377 - if trimmed.starts_with('[') { 378 - serde_json::from_str::<Vec<RepoRequest>>(trimmed).map_err(|e| { 379 - ( 380 - StatusCode::BAD_REQUEST, 381 - format!("invalid JSON array: {}", e), 382 - ) 383 - }) 329 + 330 + if content_type.contains("application/json") { 331 + serde_json::from_str::<Vec<RepoRequest>>(trimmed) 332 + .map_err(|e| bad_request(format!("invalid JSON array: {e}"))) 384 333 } else { 385 334 trimmed 386 335 .lines() 387 336 .filter(|l| !l.trim().is_empty()) 388 337 .map(|line| { 389 - serde_json::from_str::<RepoRequest>(line).map_err(|e| { 390 - ( 391 - StatusCode::BAD_REQUEST, 392 - format!("invalid NDJSON line: {}", e), 393 - ) 394 - }) 338 + serde_json::from_str::<RepoRequest>(line) 339 + .map_err(|e| bad_request(format!("invalid NDJSON line: {e}"))) 395 340 }) 396 341 .collect() 397 342 } 398 343 } 344 + 345 + fn bad_request<E: std::fmt::Display>(err: E) -> (StatusCode, String) { 346 + (StatusCode::BAD_REQUEST, err.to_string()) 347 + } 348 + 349 + fn internal<E: std::fmt::Display>(err: E) -> (StatusCode, String) { 350 + (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) 351 + }
+141
tests/repos_api_test.nu
··· 1 + #!/usr/bin/env nu 2 + 3 + def test-repos [url: string] { 4 + print "verifying /repos pagination and filtering..." 5 + 6 + # 1. Test limit 7 + print " testing limit=1..." 8 + let items = (http get $"($url)/repos?limit=1" | from json -o) 9 + let count = ($items | length) 10 + print $" count: ($count)" 11 + if $count != 1 { 12 + print " FAILED: expected 1 item" 13 + exit 1 14 + } 15 + 16 + # 2. Test partition=all 17 + print " testing partition=all..." 18 + let all_items = (http get $"($url)/repos?partition=all" | from json -o) 19 + print $" count: ($all_items | length)" 20 + 21 + # 3. Test cursor (if we have items) 22 + if ($all_items | length) > 1 { 23 + let first_did = ($all_items | get 0).did 24 + print $" testing cursor with did ($first_did)..." 25 + let cursor_items = (http get $"($url)/repos?cursor=($first_did)&limit=1" | from json -o) 26 + if ($cursor_items | length) > 0 { 27 + let next_did = ($cursor_items | get 0).did 28 + if $first_did == $next_did { 29 + print " FAILED: cursor did should be excluded" 30 + exit 1 31 + } 32 + print $" next did: ($next_did)" 33 + } 34 + } 35 + 36 + # 4. Test partition=pending 37 + print " testing partition=pending..." 38 + let pending_items = (http get $"($url)/repos?partition=pending" | from json -o) 39 + print $" pending count: ($pending_items | length)" 40 + 41 + # 5. Test partition=resync 42 + print " testing partition=resync..." 43 + let resync_items = (http get $"($url)/repos?partition=resync" | from json -o) 44 + print $" resync count: ($resync_items | length)" 45 + 46 + print "all /repos pagination and filtering tests passed!" 47 + } 48 + 49 + def test-errors [url: string] { 50 + print "verifying /repos error handling..." 51 + 52 + # 1. Invalid DID in PUT 53 + print " testing PUT /repos with invalid DID..." 54 + let resp_put = (http put -f -e -t application/json $"($url)/repos" { did: "invalid" }) 55 + if $resp_put.status != 400 { 56 + print $" FAILED: expected 400, got ($resp_put.status)" 57 + exit 1 58 + } 59 + 60 + # 2. Invalid DID in DELETE 61 + print " testing DELETE /repos with invalid DID..." 62 + let resp_del = (http delete -f -e -t application/json $"($url)/repos" --data { did: "invalid" }) 63 + if $resp_del.status != 400 { 64 + print $" FAILED: expected 400, got ($resp_del.status)" 65 + exit 1 66 + } 67 + 68 + # 3. Invalid cursor in GET 69 + print " testing GET /repos with invalid cursor..." 70 + let resp_get_cursor = (http get -f -e $"($url)/repos?cursor=invalid") 71 + if $resp_get_cursor.status != 400 { 72 + print $" FAILED: expected 400, got ($resp_get_cursor.status)" 73 + exit 1 74 + } 75 + 76 + # 4. Invalid partition in GET 77 + print " testing GET /repos with invalid partition..." 78 + let resp_get_part = (http get -f -e $"($url)/repos?partition=invalid") 79 + if $resp_get_part.status != 400 { 80 + print $" FAILED: expected 400, got ($resp_get_part.status)" 81 + exit 1 82 + } 83 + 84 + print "all /repos error handling tests passed!" 85 + } 86 + 87 + def main [] { 88 + let port = 3001 89 + let url = $"http://localhost:($port)" 90 + let db_path = (mktemp -d -t hydrant_api_test.XXXXXX) 91 + 92 + print $"starting hydrant for API verification..." 93 + let binary = (build-hydrant) 94 + let instance = (start-hydrant $binary $db_path $port) 95 + 96 + if (wait-for-api $url) { 97 + # add a few repos 98 + let dids = [ 99 + "did:plc:dfl62fgb7wtjj3fcbb72naae" 100 + "did:plc:q6gjnv26m4ay3m42ojvzx2m4" 101 + ] 102 + http put -t application/json $"($url)/repos" ($dids | each { |d| { did: $d } }) 103 + 104 + test-repos $url 105 + test-errors $url 106 + } 107 + 108 + kill $instance.pid 109 + } 110 + 111 + # Helper to build hydrant 112 + def build-hydrant [] { 113 + cargo build --release --quiet 114 + "./target/release/hydrant" 115 + } 116 + 117 + # Helper to start hydrant 118 + def start-hydrant [binary: string, db_path: string, port: int] { 119 + let log_file = $"($db_path)/hydrant.log" 120 + let pid = (with-env { 121 + HYDRANT_DATABASE_PATH: $db_path, 122 + HYDRANT_API_PORT: ($port | into string), 123 + HYDRANT_DEBUG_PORT: (($port + 1) | into string), 124 + HYDRANT_MODE: "filter", 125 + HYDRANT_LOG_LEVEL: "info" 126 + } { 127 + sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int 128 + }) 129 + { pid: $pid, log: $log_file } 130 + } 131 + 132 + # Helper to wait for api 133 + def wait-for-api [url: string] { 134 + for i in 1..20 { 135 + if (try { (http get $"($url)/health") == "OK" } catch { false }) { 136 + return true 137 + } 138 + sleep 500ms 139 + } 140 + false 141 + }