this repo has no description
1use axum::{ 2 body::Body, 3 http::{Request, StatusCode}, 4 middleware::Next, 5 response::{IntoResponse, Response}, 6}; 7use metrics::{counter, gauge, histogram}; 8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; 9use std::sync::OnceLock; 10use std::time::Instant; 11 12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new(); 13 14pub fn init_metrics() -> PrometheusHandle { 15 let builder = PrometheusBuilder::new(); 16 let handle = builder 17 .install_recorder() 18 .expect("failed to install Prometheus recorder"); 19 20 PROMETHEUS_HANDLE.set(handle.clone()).ok(); 21 describe_metrics(); 22 23 handle 24} 25 26fn describe_metrics() { 27 metrics::describe_counter!( 28 "tranquil_pds_http_requests_total", 29 "Total number of HTTP requests" 30 ); 31 metrics::describe_histogram!( 32 "tranquil_pds_http_request_duration_seconds", 33 "HTTP request duration in seconds" 34 ); 35 metrics::describe_counter!( 36 "tranquil_pds_auth_cache_hits_total", 37 "Total number of authentication cache hits" 38 ); 39 metrics::describe_counter!( 40 "tranquil_pds_auth_cache_misses_total", 41 "Total number of authentication cache misses" 42 ); 43 metrics::describe_gauge!( 44 "tranquil_pds_firehose_subscribers", 45 "Number of active firehose WebSocket subscribers" 46 ); 47 metrics::describe_counter!( 48 "tranquil_pds_firehose_events_total", 49 "Total number of firehose events published" 50 ); 51 metrics::describe_counter!( 52 "tranquil_pds_block_operations_total", 53 "Total number of block store operations" 54 ); 55 metrics::describe_counter!( 56 "tranquil_pds_s3_operations_total", 57 "Total number of S3/blob storage operations" 58 ); 59 metrics::describe_gauge!( 60 "tranquil_pds_comms_queue_size", 61 "Current size of the comms queue" 62 ); 63 metrics::describe_counter!( 64 "tranquil_pds_rate_limit_rejections_total", 65 "Total number of rate limit rejections" 66 ); 67 metrics::describe_counter!( 68 "tranquil_pds_db_queries_total", 69 "Total number of database queries" 70 ); 71 metrics::describe_histogram!( 72 "tranquil_pds_db_query_duration_seconds", 73 "Database query duration in seconds" 74 ); 75} 76 77pub async fn metrics_handler() -> impl IntoResponse { 78 match PROMETHEUS_HANDLE.get() { 79 Some(handle) => { 80 let metrics = handle.render(); 81 ( 82 StatusCode::OK, 83 [("content-type", "text/plain; version=0.0.4")], 84 metrics, 85 ) 86 } 87 None => ( 88 StatusCode::INTERNAL_SERVER_ERROR, 89 [("content-type", "text/plain")], 90 "Metrics not initialized".to_string(), 91 ), 92 } 93} 94 95pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response { 96 let start = Instant::now(); 97 let method = request.method().to_string(); 98 let path = normalize_path(request.uri().path()); 99 100 let response = next.run(request).await; 101 102 let duration = start.elapsed().as_secs_f64(); 103 let status = response.status().as_u16().to_string(); 104 105 counter!( 106 "tranquil_pds_http_requests_total", 107 "method" => method.clone(), 108 "path" => path.clone(), 109 "status" => status.clone() 110 ) 111 .increment(1); 112 113 histogram!( 114 "tranquil_pds_http_request_duration_seconds", 115 "method" => method, 116 "path" => path 117 ) 118 .record(duration); 119 120 response 121} 122 123fn normalize_path(path: &str) -> String { 124 if path.starts_with("/xrpc/") 125 && let Some(method) = path.strip_prefix("/xrpc/") 126 { 127 if let Some(q) = method.find('?') { 128 return format!("/xrpc/{}", &method[..q]); 129 } 130 return path.to_string(); 131 } 132 133 if path.starts_with("/u/") && path.ends_with("/did.json") { 134 return "/u/{handle}/did.json".to_string(); 135 } 136 137 if path.starts_with("/oauth/") { 138 return path.to_string(); 139 } 140 141 path.to_string() 142} 143 144pub fn record_auth_cache_hit(cache_type: &str) { 145 counter!("tranquil_pds_auth_cache_hits_total", "cache_type" => cache_type.to_string()) 146 .increment(1); 147} 148 149pub fn record_auth_cache_miss(cache_type: &str) { 150 counter!("tranquil_pds_auth_cache_misses_total", "cache_type" => cache_type.to_string()) 151 .increment(1); 152} 153 154pub fn set_firehose_subscribers(count: usize) { 155 gauge!("tranquil_pds_firehose_subscribers").set(count as f64); 156} 157 158pub fn increment_firehose_subscribers() { 159 counter!("tranquil_pds_firehose_events_total").increment(1); 160} 161 162pub fn record_firehose_event() { 163 counter!("tranquil_pds_firehose_events_total").increment(1); 164} 165 166pub fn record_block_operation(op_type: &str) { 167 counter!("tranquil_pds_block_operations_total", "op_type" => op_type.to_string()).increment(1); 168} 169 170pub fn record_s3_operation(op_type: &str, status: &str) { 171 counter!( 172 "tranquil_pds_s3_operations_total", 173 "op_type" => op_type.to_string(), 174 "status" => status.to_string() 175 ) 176 .increment(1); 177} 178 179pub fn set_comms_queue_size(size: usize) { 180 gauge!("tranquil_pds_comms_queue_size").set(size as f64); 181} 182 183pub fn record_rate_limit_rejection(limiter: &str) { 184 counter!("tranquil_pds_rate_limit_rejections_total", "limiter" => limiter.to_string()) 185 .increment(1); 186} 187 188pub fn record_db_query(query_type: &str, duration_seconds: f64) { 189 counter!("tranquil_pds_db_queries_total", "query_type" => query_type.to_string()).increment(1); 190 histogram!( 191 "tranquil_pds_db_query_duration_seconds", 192 "query_type" => query_type.to_string() 193 ) 194 .record(duration_seconds); 195} 196 197#[cfg(test)] 198mod tests { 199 use super::*; 200 201 #[test] 202 fn test_normalize_path() { 203 assert_eq!( 204 normalize_path("/xrpc/com.atproto.repo.getRecord"), 205 "/xrpc/com.atproto.repo.getRecord" 206 ); 207 assert_eq!( 208 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"), 209 "/xrpc/com.atproto.repo.getRecord" 210 ); 211 assert_eq!( 212 normalize_path("/u/alice.example.com/did.json"), 213 "/u/{handle}/did.json" 214 ); 215 assert_eq!(normalize_path("/oauth/token"), "/oauth/token"); 216 assert_eq!(normalize_path("/health"), "/health"); 217 } 218}