this repo has no description
1use axum::{ 2 body::Body, 3 http::{Request, StatusCode}, 4 middleware::Next, 5 response::{IntoResponse, Response}, 6}; 7use metrics::{counter, gauge, histogram}; 8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; 9use std::sync::OnceLock; 10use std::time::Instant; 11 12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new(); 13 14pub fn init_metrics() -> PrometheusHandle { 15 let builder = PrometheusBuilder::new(); 16 let handle = builder 17 .install_recorder() 18 .expect("failed to install Prometheus recorder"); 19 20 PROMETHEUS_HANDLE.set(handle.clone()).ok(); 21 22 describe_metrics(); 23 24 handle 25} 26 27fn describe_metrics() { 28 metrics::describe_counter!( 29 "bspds_http_requests_total", 30 "Total number of HTTP requests" 31 ); 32 metrics::describe_histogram!( 33 "bspds_http_request_duration_seconds", 34 "HTTP request duration in seconds" 35 ); 36 metrics::describe_counter!( 37 "bspds_auth_cache_hits_total", 38 "Total number of authentication cache hits" 39 ); 40 metrics::describe_counter!( 41 "bspds_auth_cache_misses_total", 42 "Total number of authentication cache misses" 43 ); 44 metrics::describe_gauge!( 45 "bspds_firehose_subscribers", 46 "Number of active firehose WebSocket subscribers" 47 ); 48 metrics::describe_counter!( 49 "bspds_firehose_events_total", 50 "Total number of firehose events published" 51 ); 52 metrics::describe_counter!( 53 "bspds_block_operations_total", 54 "Total number of block store operations" 55 ); 56 metrics::describe_counter!( 57 "bspds_s3_operations_total", 58 "Total number of S3/blob storage operations" 59 ); 60 metrics::describe_gauge!( 61 "bspds_notification_queue_size", 62 "Current size of the notification queue" 63 ); 64 metrics::describe_counter!( 65 "bspds_rate_limit_rejections_total", 66 "Total number of rate limit rejections" 67 ); 68 metrics::describe_counter!( 69 "bspds_db_queries_total", 70 "Total number of database queries" 71 ); 72 metrics::describe_histogram!( 73 "bspds_db_query_duration_seconds", 74 "Database query duration in seconds" 75 ); 76} 77 78pub async fn metrics_handler() -> impl IntoResponse { 79 match PROMETHEUS_HANDLE.get() { 80 Some(handle) => { 81 let metrics = handle.render(); 82 (StatusCode::OK, [("content-type", "text/plain; version=0.0.4")], metrics) 83 } 84 None => ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 [("content-type", "text/plain")], 87 "Metrics not initialized".to_string(), 88 ), 89 } 90} 91 92pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response { 93 let start = Instant::now(); 94 let method = request.method().to_string(); 95 let path = normalize_path(request.uri().path()); 96 97 let response = next.run(request).await; 98 99 let duration = start.elapsed().as_secs_f64(); 100 let status = response.status().as_u16().to_string(); 101 102 counter!( 103 "bspds_http_requests_total", 104 "method" => method.clone(), 105 "path" => path.clone(), 106 "status" => status.clone() 107 ) 108 .increment(1); 109 110 histogram!( 111 "bspds_http_request_duration_seconds", 112 "method" => method, 113 "path" => path 114 ) 115 .record(duration); 116 117 response 118} 119 120fn normalize_path(path: &str) -> String { 121 if path.starts_with("/xrpc/") { 122 if let Some(method) = path.strip_prefix("/xrpc/") { 123 if let Some(q) = method.find('?') { 124 return format!("/xrpc/{}", &method[..q]); 125 } 126 return path.to_string(); 127 } 128 } 129 130 if path.starts_with("/u/") && path.ends_with("/did.json") { 131 return "/u/{handle}/did.json".to_string(); 132 } 133 134 if path.starts_with("/oauth/") { 135 return path.to_string(); 136 } 137 138 path.to_string() 139} 140 141pub fn record_auth_cache_hit(cache_type: &str) { 142 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1); 143} 144 145pub fn record_auth_cache_miss(cache_type: &str) { 146 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1); 147} 148 149pub fn set_firehose_subscribers(count: usize) { 150 gauge!("bspds_firehose_subscribers").set(count as f64); 151} 152 153pub fn increment_firehose_subscribers() { 154 counter!("bspds_firehose_events_total").increment(1); 155} 156 157pub fn record_firehose_event() { 158 counter!("bspds_firehose_events_total").increment(1); 159} 160 161pub fn record_block_operation(op_type: &str) { 162 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1); 163} 164 165pub fn record_s3_operation(op_type: &str, status: &str) { 166 counter!( 167 "bspds_s3_operations_total", 168 "op_type" => op_type.to_string(), 169 "status" => status.to_string() 170 ) 171 .increment(1); 172} 173 174pub fn set_notification_queue_size(size: usize) { 175 gauge!("bspds_notification_queue_size").set(size as f64); 176} 177 178pub fn record_rate_limit_rejection(limiter: &str) { 179 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1); 180} 181 182pub fn record_db_query(query_type: &str, duration_seconds: f64) { 183 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1); 184 histogram!( 185 "bspds_db_query_duration_seconds", 186 "query_type" => query_type.to_string() 187 ) 188 .record(duration_seconds); 189} 190 191#[cfg(test)] 192mod tests { 193 use super::*; 194 195 #[test] 196 fn test_normalize_path() { 197 assert_eq!( 198 normalize_path("/xrpc/com.atproto.repo.getRecord"), 199 "/xrpc/com.atproto.repo.getRecord" 200 ); 201 assert_eq!( 202 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"), 203 "/xrpc/com.atproto.repo.getRecord" 204 ); 205 assert_eq!( 206 normalize_path("/u/alice.example.com/did.json"), 207 "/u/{handle}/did.json" 208 ); 209 assert_eq!(normalize_path("/oauth/token"), "/oauth/token"); 210 assert_eq!(normalize_path("/health"), "/health"); 211 } 212}