this repo has no description
1use axum::{ 2 body::Body, 3 http::{Request, StatusCode}, 4 middleware::Next, 5 response::{IntoResponse, Response}, 6}; 7use metrics::{counter, gauge, histogram}; 8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; 9use std::sync::OnceLock; 10use std::time::Instant; 11 12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new(); 13 14pub fn init_metrics() -> PrometheusHandle { 15 let builder = PrometheusBuilder::new(); 16 let handle = builder 17 .install_recorder() 18 .expect("failed to install Prometheus recorder"); 19 20 PROMETHEUS_HANDLE.set(handle.clone()).ok(); 21 describe_metrics(); 22 23 handle 24} 25 26fn describe_metrics() { 27 metrics::describe_counter!("bspds_http_requests_total", "Total number of HTTP requests"); 28 metrics::describe_histogram!( 29 "bspds_http_request_duration_seconds", 30 "HTTP request duration in seconds" 31 ); 32 metrics::describe_counter!( 33 "bspds_auth_cache_hits_total", 34 "Total number of authentication cache hits" 35 ); 36 metrics::describe_counter!( 37 "bspds_auth_cache_misses_total", 38 "Total number of authentication cache misses" 39 ); 40 metrics::describe_gauge!( 41 "bspds_firehose_subscribers", 42 "Number of active firehose WebSocket subscribers" 43 ); 44 metrics::describe_counter!( 45 "bspds_firehose_events_total", 46 "Total number of firehose events published" 47 ); 48 metrics::describe_counter!( 49 "bspds_block_operations_total", 50 "Total number of block store operations" 51 ); 52 metrics::describe_counter!( 53 "bspds_s3_operations_total", 54 "Total number of S3/blob storage operations" 55 ); 56 metrics::describe_gauge!( 57 "bspds_notification_queue_size", 58 "Current size of the notification queue" 59 ); 60 metrics::describe_counter!( 61 "bspds_rate_limit_rejections_total", 62 "Total number of rate limit rejections" 63 ); 64 metrics::describe_counter!("bspds_db_queries_total", "Total number of database queries"); 65 metrics::describe_histogram!( 66 "bspds_db_query_duration_seconds", 67 "Database query duration in seconds" 68 ); 69} 70 71pub async fn metrics_handler() -> impl IntoResponse { 72 match PROMETHEUS_HANDLE.get() { 73 Some(handle) => { 74 let metrics = handle.render(); 75 ( 76 StatusCode::OK, 77 [("content-type", "text/plain; version=0.0.4")], 78 metrics, 79 ) 80 } 81 None => ( 82 StatusCode::INTERNAL_SERVER_ERROR, 83 [("content-type", "text/plain")], 84 "Metrics not initialized".to_string(), 85 ), 86 } 87} 88 89pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response { 90 let start = Instant::now(); 91 let method = request.method().to_string(); 92 let path = normalize_path(request.uri().path()); 93 94 let response = next.run(request).await; 95 96 let duration = start.elapsed().as_secs_f64(); 97 let status = response.status().as_u16().to_string(); 98 99 counter!( 100 "bspds_http_requests_total", 101 "method" => method.clone(), 102 "path" => path.clone(), 103 "status" => status.clone() 104 ) 105 .increment(1); 106 107 histogram!( 108 "bspds_http_request_duration_seconds", 109 "method" => method, 110 "path" => path 111 ) 112 .record(duration); 113 114 response 115} 116 117fn normalize_path(path: &str) -> String { 118 if path.starts_with("/xrpc/") 119 && let Some(method) = path.strip_prefix("/xrpc/") { 120 if let Some(q) = method.find('?') { 121 return format!("/xrpc/{}", &method[..q]); 122 } 123 return path.to_string(); 124 } 125 126 if path.starts_with("/u/") && path.ends_with("/did.json") { 127 return "/u/{handle}/did.json".to_string(); 128 } 129 130 if path.starts_with("/oauth/") { 131 return path.to_string(); 132 } 133 134 path.to_string() 135} 136 137pub fn record_auth_cache_hit(cache_type: &str) { 138 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1); 139} 140 141pub fn record_auth_cache_miss(cache_type: &str) { 142 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1); 143} 144 145pub fn set_firehose_subscribers(count: usize) { 146 gauge!("bspds_firehose_subscribers").set(count as f64); 147} 148 149pub fn increment_firehose_subscribers() { 150 counter!("bspds_firehose_events_total").increment(1); 151} 152 153pub fn record_firehose_event() { 154 counter!("bspds_firehose_events_total").increment(1); 155} 156 157pub fn record_block_operation(op_type: &str) { 158 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1); 159} 160 161pub fn record_s3_operation(op_type: &str, status: &str) { 162 counter!( 163 "bspds_s3_operations_total", 164 "op_type" => op_type.to_string(), 165 "status" => status.to_string() 166 ) 167 .increment(1); 168} 169 170pub fn set_notification_queue_size(size: usize) { 171 gauge!("bspds_notification_queue_size").set(size as f64); 172} 173 174pub fn record_rate_limit_rejection(limiter: &str) { 175 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1); 176} 177 178pub fn record_db_query(query_type: &str, duration_seconds: f64) { 179 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1); 180 histogram!( 181 "bspds_db_query_duration_seconds", 182 "query_type" => query_type.to_string() 183 ) 184 .record(duration_seconds); 185} 186 187#[cfg(test)] 188mod tests { 189 use super::*; 190 191 #[test] 192 fn test_normalize_path() { 193 assert_eq!( 194 normalize_path("/xrpc/com.atproto.repo.getRecord"), 195 "/xrpc/com.atproto.repo.getRecord" 196 ); 197 assert_eq!( 198 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"), 199 "/xrpc/com.atproto.repo.getRecord" 200 ); 201 assert_eq!( 202 normalize_path("/u/alice.example.com/did.json"), 203 "/u/{handle}/did.json" 204 ); 205 assert_eq!(normalize_path("/oauth/token"), "/oauth/token"); 206 assert_eq!(normalize_path("/health"), "/health"); 207 } 208}