at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[api] put debug api in a different router that we bind on a different port, disabled by default

ptr.pet 02165cb8 ded23b02

verified
+58 -20
+5 -7
src/api/debug.rs
··· 1 1 use crate::api::AppState; 2 2 use crate::db::keys; 3 3 use axum::{ 4 - extract::{ConnectInfo, Query, State}, 4 + extract::{Query, State}, 5 5 http::StatusCode, 6 6 Json, 7 7 }; 8 8 use jacquard::types::ident::AtIdentifier; 9 9 use serde::{Deserialize, Serialize}; 10 - use std::net::SocketAddr; 11 10 use std::sync::Arc; 12 11 13 12 #[derive(Deserialize)] ··· 21 20 pub count: usize, 22 21 } 23 22 23 + pub fn router() -> axum::Router<Arc<AppState>> { 24 + axum::Router::new().route("/debug/count", axum::routing::get(handle_debug_count)) 25 + } 26 + 24 27 pub async fn handle_debug_count( 25 28 State(state): State<Arc<AppState>>, 26 - ConnectInfo(addr): ConnectInfo<SocketAddr>, 27 29 Query(req): Query<DebugCountRequest>, 28 30 ) -> Result<Json<DebugCountResponse>, StatusCode> { 29 - if !addr.ip().is_loopback() { 30 - return Err(StatusCode::FORBIDDEN); 31 - } 32 - 33 31 let did = state 34 32 .resolver 35 33 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?)
+24 -1
src/api/mod.rs
··· 15 15 .route("/health", get(|| async { "OK" })) 16 16 .route("/stats", get(stats::get_stats)) 17 17 .route("/stream", get(stream::handle_stream)) 18 - .route("/debug/count", get(debug::handle_debug_count)) 19 18 .merge(xrpc::router()) 20 19 .merge(repo::router()) 21 20 .with_state(state) ··· 37 36 38 37 Ok(()) 39 38 } 39 + 40 + pub async fn serve_debug(state: Arc<AppState>, port: u16) -> miette::Result<()> { 41 + let app = debug::router() 42 + .with_state(state) 43 + .layer(TraceLayer::new_for_http()); 44 + 45 + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{port}")) 46 + .await 47 + .map_err(|e| miette::miette!("failed to bind debug server to port {port}: {e}"))?; 48 + 49 + tracing::info!( 50 + "Debug server listening on {}", 51 + listener.local_addr().unwrap() 52 + ); 53 + 54 + axum::serve( 55 + listener, 56 + app.into_make_service_with_connect_info::<SocketAddr>(), 57 + ) 58 + .await 59 + .map_err(|e| miette::miette!("debug server error: {e}"))?; 60 + 61 + Ok(()) 62 + }
+13
src/config.rs
··· 18 18 pub cache_size: u64, 19 19 pub backfill_concurrency_limit: usize, 20 20 pub disable_lz4_compression: bool, 21 + pub debug_port: u16, 22 + pub enable_debug: bool, 21 23 } 22 24 23 25 impl Config { ··· 72 74 .map(|v| v == "true") 73 75 .unwrap_or(false); 74 76 77 + let debug_port = env::var("HYDRANT_DEBUG_PORT") 78 + .ok() 79 + .and_then(|s| s.parse().ok()) 80 + .unwrap_or(3001); 81 + 82 + let enable_debug = env::var("HYDRANT_ENABLE_DEBUG") 83 + .map(|v| v == "true") 84 + .unwrap_or(false); 85 + 75 86 Ok(Self { 76 87 database_path, 77 88 relay_host, ··· 84 95 cache_size, 85 96 backfill_concurrency_limit, 86 97 disable_lz4_compression, 98 + debug_port, 99 + enable_debug, 87 100 }) 88 101 } 89 102 }
+10 -9
src/main.rs
··· 40 40 let (state, backfill_rx, buffer_rx) = AppState::new(&cfg)?; 41 41 let state = Arc::new(state); 42 42 43 - tokio::spawn({ 44 - let port = cfg.api_port; 45 - let state = state.clone(); 46 - async move { 47 - if let Err(e) = api::serve(state, port).await { 48 - error!("API server failed: {e}"); 49 - } 50 - } 51 - }); 43 + tokio::spawn( 44 + api::serve(state.clone(), cfg.api_port).inspect_err(|e| error!("API server failed: {e}")), 45 + ); 46 + 47 + if cfg.enable_debug { 48 + tokio::spawn( 49 + api::serve_debug(state.clone(), cfg.debug_port) 50 + .inspect_err(|e| error!("debug server failed: {e}")), 51 + ); 52 + } 52 53 53 54 tokio::spawn({ 54 55 let state = state.clone();
+2
tests/common.nu
··· 15 15 HYDRANT_DATABASE_PATH: ($db_path), 16 16 HYDRANT_FULL_NETWORK: "false", 17 17 HYDRANT_API_PORT: ($port | into string), 18 + HYDRANT_ENABLE_DEBUG: "true", 19 + HYDRANT_DEBUG_PORT: ($port + 1 | into string), 18 20 HYDRANT_LOG_LEVEL: "debug" 19 21 } { 20 22 sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int
+4 -3
tests/repo_sync_integrity.nu
··· 55 55 } 56 56 57 57 # verify countRecords API against debug endpoint 58 - def check-count [hydrant_url: string, did: string] { 58 + def check-count [hydrant_url: string, debug_url: string, did: string] { 59 59 print "verifying countRecords API..." 60 60 let collections = [ 61 61 "app.bsky.feed.post" ··· 77 77 78 78 # 2. get actual scan count from debug endpoint 79 79 let debug_count = try { 80 - (http get $"($hydrant_url)/debug/count?did=($did)&collection=($coll)").count 80 + (http get $"($debug_url)/debug/count?did=($did)&collection=($coll)").count 81 81 } catch { 82 82 print $" error calling debug count for ($coll)" 83 83 return false ··· 98 98 let pds = "https://zwsp.xyz" 99 99 let port = 3001 100 100 let url = $"http://localhost:($port)" 101 + let debug_url = $"http://127.0.0.1:($port + 1)" 101 102 let db_path = (mktemp -d -t hydrant_test.XXXXXX) 102 103 103 104 print $"testing backfill integrity for ($did)..." ··· 116 117 if (wait-for-backfill $url) { 117 118 # Run both consistency checks 118 119 let integrity_passed = (check-consistency $url $pds $did) 119 - let count_passed = (check-count $url $did) 120 + let count_passed = (check-count $url $debug_url $did) 120 121 121 122 if $integrity_passed and $count_passed { 122 123 print "all integrity checks passed!"