this repo has no description
1use axum::{ 2 body::Body, 3 http::{Request, StatusCode}, 4 middleware::Next, 5 response::{IntoResponse, Response}, 6}; 7use metrics::{counter, gauge, histogram}; 8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; 9use std::sync::OnceLock; 10use std::time::Instant; 11static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new(); 12pub fn init_metrics() -> PrometheusHandle { 13 let builder = PrometheusBuilder::new(); 14 let handle = builder 15 .install_recorder() 16 .expect("failed to install Prometheus recorder"); 17 PROMETHEUS_HANDLE.set(handle.clone()).ok(); 18 describe_metrics(); 19 handle 20} 21fn describe_metrics() { 22 metrics::describe_counter!( 23 "bspds_http_requests_total", 24 "Total number of HTTP requests" 25 ); 26 metrics::describe_histogram!( 27 "bspds_http_request_duration_seconds", 28 "HTTP request duration in seconds" 29 ); 30 metrics::describe_counter!( 31 "bspds_auth_cache_hits_total", 32 "Total number of authentication cache hits" 33 ); 34 metrics::describe_counter!( 35 "bspds_auth_cache_misses_total", 36 "Total number of authentication cache misses" 37 ); 38 metrics::describe_gauge!( 39 "bspds_firehose_subscribers", 40 "Number of active firehose WebSocket subscribers" 41 ); 42 metrics::describe_counter!( 43 "bspds_firehose_events_total", 44 "Total number of firehose events published" 45 ); 46 metrics::describe_counter!( 47 "bspds_block_operations_total", 48 "Total number of block store operations" 49 ); 50 metrics::describe_counter!( 51 "bspds_s3_operations_total", 52 "Total number of S3/blob storage operations" 53 ); 54 metrics::describe_gauge!( 55 "bspds_notification_queue_size", 56 "Current size of the notification queue" 57 ); 58 metrics::describe_counter!( 59 "bspds_rate_limit_rejections_total", 60 "Total number of rate limit rejections" 61 ); 62 metrics::describe_counter!( 63 "bspds_db_queries_total", 64 "Total number of database queries" 65 ); 66 metrics::describe_histogram!( 67 "bspds_db_query_duration_seconds", 68 "Database query duration in seconds" 69 ); 70} 71pub async fn metrics_handler() -> impl IntoResponse { 72 match PROMETHEUS_HANDLE.get() { 73 Some(handle) => { 74 let metrics = handle.render(); 75 (StatusCode::OK, [("content-type", "text/plain; version=0.0.4")], metrics) 76 } 77 None => ( 78 StatusCode::INTERNAL_SERVER_ERROR, 79 [("content-type", "text/plain")], 80 "Metrics not initialized".to_string(), 81 ), 82 } 83} 84pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response { 85 let start = Instant::now(); 86 let method = request.method().to_string(); 87 let path = normalize_path(request.uri().path()); 88 let response = next.run(request).await; 89 let duration = start.elapsed().as_secs_f64(); 90 let status = response.status().as_u16().to_string(); 91 counter!( 92 "bspds_http_requests_total", 93 "method" => method.clone(), 94 "path" => path.clone(), 95 "status" => status.clone() 96 ) 97 .increment(1); 98 histogram!( 99 "bspds_http_request_duration_seconds", 100 "method" => method, 101 "path" => path 102 ) 103 .record(duration); 104 response 105} 106fn normalize_path(path: &str) -> String { 107 if path.starts_with("/xrpc/") { 108 if let Some(method) = path.strip_prefix("/xrpc/") { 109 if let Some(q) = method.find('?') { 110 return format!("/xrpc/{}", &method[..q]); 111 } 112 return path.to_string(); 113 } 114 } 115 if path.starts_with("/u/") && path.ends_with("/did.json") { 116 return "/u/{handle}/did.json".to_string(); 117 } 118 if path.starts_with("/oauth/") { 119 return path.to_string(); 120 } 121 path.to_string() 122} 123pub fn record_auth_cache_hit(cache_type: &str) { 124 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1); 125} 126pub fn record_auth_cache_miss(cache_type: &str) { 127 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1); 128} 129pub fn set_firehose_subscribers(count: usize) { 130 gauge!("bspds_firehose_subscribers").set(count as f64); 131} 132pub fn increment_firehose_subscribers() { 133 counter!("bspds_firehose_events_total").increment(1); 134} 135pub fn record_firehose_event() { 136 counter!("bspds_firehose_events_total").increment(1); 137} 138pub fn record_block_operation(op_type: &str) { 139 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1); 140} 141pub fn record_s3_operation(op_type: &str, status: &str) { 142 counter!( 143 "bspds_s3_operations_total", 144 "op_type" => op_type.to_string(), 145 "status" => status.to_string() 146 ) 147 .increment(1); 148} 149pub fn set_notification_queue_size(size: usize) { 150 gauge!("bspds_notification_queue_size").set(size as f64); 151} 152pub fn record_rate_limit_rejection(limiter: &str) { 153 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1); 154} 155pub fn record_db_query(query_type: &str, duration_seconds: f64) { 156 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1); 157 histogram!( 158 "bspds_db_query_duration_seconds", 159 "query_type" => query_type.to_string() 160 ) 161 .record(duration_seconds); 162} 163#[cfg(test)] 164mod tests { 165 use super::*; 166 #[test] 167 fn test_normalize_path() { 168 assert_eq!( 169 normalize_path("/xrpc/com.atproto.repo.getRecord"), 170 "/xrpc/com.atproto.repo.getRecord" 171 ); 172 assert_eq!( 173 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"), 174 "/xrpc/com.atproto.repo.getRecord" 175 ); 176 assert_eq!( 177 normalize_path("/u/alice.example.com/did.json"), 178 "/u/{handle}/did.json" 179 ); 180 assert_eq!(normalize_path("/oauth/token"), "/oauth/token"); 181 assert_eq!(normalize_path("/health"), "/health"); 182 } 183}