this repo has no description
1use axum::{ 2 body::Body, 3 http::{Request, StatusCode}, 4 middleware::Next, 5 response::{IntoResponse, Response}, 6}; 7use metrics::{counter, gauge, histogram}; 8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; 9use std::sync::OnceLock; 10use std::time::Instant; 11 12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new(); 13 14pub fn init_metrics() -> PrometheusHandle { 15 let builder = PrometheusBuilder::new(); 16 let handle = builder 17 .install_recorder() 18 .expect("failed to install Prometheus recorder"); 19 20 PROMETHEUS_HANDLE.set(handle.clone()).ok(); 21 describe_metrics(); 22 23 handle 24} 25 26fn describe_metrics() { 27 metrics::describe_counter!( 28 "bspds_http_requests_total", 29 "Total number of HTTP requests" 30 ); 31 metrics::describe_histogram!( 32 "bspds_http_request_duration_seconds", 33 "HTTP request duration in seconds" 34 ); 35 metrics::describe_counter!( 36 "bspds_auth_cache_hits_total", 37 "Total number of authentication cache hits" 38 ); 39 metrics::describe_counter!( 40 "bspds_auth_cache_misses_total", 41 "Total number of authentication cache misses" 42 ); 43 metrics::describe_gauge!( 44 "bspds_firehose_subscribers", 45 "Number of active firehose WebSocket subscribers" 46 ); 47 metrics::describe_counter!( 48 "bspds_firehose_events_total", 49 "Total number of firehose events published" 50 ); 51 metrics::describe_counter!( 52 "bspds_block_operations_total", 53 "Total number of block store operations" 54 ); 55 metrics::describe_counter!( 56 "bspds_s3_operations_total", 57 "Total number of S3/blob storage operations" 58 ); 59 metrics::describe_gauge!( 60 "bspds_notification_queue_size", 61 "Current size of the notification queue" 62 ); 63 metrics::describe_counter!( 64 "bspds_rate_limit_rejections_total", 65 "Total number of rate limit rejections" 66 ); 67 metrics::describe_counter!( 68 "bspds_db_queries_total", 69 "Total number of database queries" 70 ); 71 metrics::describe_histogram!( 72 "bspds_db_query_duration_seconds", 73 "Database query duration in seconds" 74 ); 75} 76 77pub async fn metrics_handler() -> impl IntoResponse { 78 match PROMETHEUS_HANDLE.get() { 79 Some(handle) => { 80 let metrics = handle.render(); 81 (StatusCode::OK, [("content-type", "text/plain; version=0.0.4")], metrics) 82 } 83 None => ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 [("content-type", "text/plain")], 86 "Metrics not initialized".to_string(), 87 ), 88 } 89} 90 91pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response { 92 let start = Instant::now(); 93 let method = request.method().to_string(); 94 let path = normalize_path(request.uri().path()); 95 96 let response = next.run(request).await; 97 98 let duration = start.elapsed().as_secs_f64(); 99 let status = response.status().as_u16().to_string(); 100 101 counter!( 102 "bspds_http_requests_total", 103 "method" => method.clone(), 104 "path" => path.clone(), 105 "status" => status.clone() 106 ) 107 .increment(1); 108 109 histogram!( 110 "bspds_http_request_duration_seconds", 111 "method" => method, 112 "path" => path 113 ) 114 .record(duration); 115 116 response 117} 118 119fn normalize_path(path: &str) -> String { 120 if path.starts_with("/xrpc/") { 121 if let Some(method) = path.strip_prefix("/xrpc/") { 122 if let Some(q) = method.find('?') { 123 return format!("/xrpc/{}", &method[..q]); 124 } 125 return path.to_string(); 126 } 127 } 128 129 if path.starts_with("/u/") && path.ends_with("/did.json") { 130 return "/u/{handle}/did.json".to_string(); 131 } 132 133 if path.starts_with("/oauth/") { 134 return path.to_string(); 135 } 136 137 path.to_string() 138} 139 140pub fn record_auth_cache_hit(cache_type: &str) { 141 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1); 142} 143 144pub fn record_auth_cache_miss(cache_type: &str) { 145 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1); 146} 147 148pub fn set_firehose_subscribers(count: usize) { 149 gauge!("bspds_firehose_subscribers").set(count as f64); 150} 151 152pub fn increment_firehose_subscribers() { 153 counter!("bspds_firehose_events_total").increment(1); 154} 155 156pub fn record_firehose_event() { 157 counter!("bspds_firehose_events_total").increment(1); 158} 159 160pub fn record_block_operation(op_type: &str) { 161 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1); 162} 163 164pub fn record_s3_operation(op_type: &str, status: &str) { 165 counter!( 166 "bspds_s3_operations_total", 167 "op_type" => op_type.to_string(), 168 "status" => status.to_string() 169 ) 170 .increment(1); 171} 172 173pub fn set_notification_queue_size(size: usize) { 174 gauge!("bspds_notification_queue_size").set(size as f64); 175} 176 177pub fn record_rate_limit_rejection(limiter: &str) { 178 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1); 179} 180 181pub fn record_db_query(query_type: &str, duration_seconds: f64) { 182 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1); 183 histogram!( 184 "bspds_db_query_duration_seconds", 185 "query_type" => query_type.to_string() 186 ) 187 .record(duration_seconds); 188} 189 190#[cfg(test)] 191mod tests { 192 use super::*; 193 194 #[test] 195 fn test_normalize_path() { 196 assert_eq!( 197 normalize_path("/xrpc/com.atproto.repo.getRecord"), 198 "/xrpc/com.atproto.repo.getRecord" 199 ); 200 assert_eq!( 201 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"), 202 "/xrpc/com.atproto.repo.getRecord" 203 ); 204 assert_eq!( 205 normalize_path("/u/alice.example.com/did.json"), 206 "/u/{handle}/did.json" 207 ); 208 assert_eq!(normalize_path("/oauth/token"), "/oauth/token"); 209 assert_eq!(normalize_path("/health"), "/health"); 210 } 211}