this repo has no description
1use axum::{
2 body::Body,
3 http::{Request, StatusCode},
4 middleware::Next,
5 response::{IntoResponse, Response},
6};
7use metrics::{counter, gauge, histogram};
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use std::sync::OnceLock;
10use std::time::Instant;
11static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
12pub fn init_metrics() -> PrometheusHandle {
13 let builder = PrometheusBuilder::new();
14 let handle = builder
15 .install_recorder()
16 .expect("failed to install Prometheus recorder");
17 PROMETHEUS_HANDLE.set(handle.clone()).ok();
18 describe_metrics();
19 handle
20}
21fn describe_metrics() {
22 metrics::describe_counter!(
23 "bspds_http_requests_total",
24 "Total number of HTTP requests"
25 );
26 metrics::describe_histogram!(
27 "bspds_http_request_duration_seconds",
28 "HTTP request duration in seconds"
29 );
30 metrics::describe_counter!(
31 "bspds_auth_cache_hits_total",
32 "Total number of authentication cache hits"
33 );
34 metrics::describe_counter!(
35 "bspds_auth_cache_misses_total",
36 "Total number of authentication cache misses"
37 );
38 metrics::describe_gauge!(
39 "bspds_firehose_subscribers",
40 "Number of active firehose WebSocket subscribers"
41 );
42 metrics::describe_counter!(
43 "bspds_firehose_events_total",
44 "Total number of firehose events published"
45 );
46 metrics::describe_counter!(
47 "bspds_block_operations_total",
48 "Total number of block store operations"
49 );
50 metrics::describe_counter!(
51 "bspds_s3_operations_total",
52 "Total number of S3/blob storage operations"
53 );
54 metrics::describe_gauge!(
55 "bspds_notification_queue_size",
56 "Current size of the notification queue"
57 );
58 metrics::describe_counter!(
59 "bspds_rate_limit_rejections_total",
60 "Total number of rate limit rejections"
61 );
62 metrics::describe_counter!(
63 "bspds_db_queries_total",
64 "Total number of database queries"
65 );
66 metrics::describe_histogram!(
67 "bspds_db_query_duration_seconds",
68 "Database query duration in seconds"
69 );
70}
71pub async fn metrics_handler() -> impl IntoResponse {
72 match PROMETHEUS_HANDLE.get() {
73 Some(handle) => {
74 let metrics = handle.render();
75 (StatusCode::OK, [("content-type", "text/plain; version=0.0.4")], metrics)
76 }
77 None => (
78 StatusCode::INTERNAL_SERVER_ERROR,
79 [("content-type", "text/plain")],
80 "Metrics not initialized".to_string(),
81 ),
82 }
83}
84pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response {
85 let start = Instant::now();
86 let method = request.method().to_string();
87 let path = normalize_path(request.uri().path());
88 let response = next.run(request).await;
89 let duration = start.elapsed().as_secs_f64();
90 let status = response.status().as_u16().to_string();
91 counter!(
92 "bspds_http_requests_total",
93 "method" => method.clone(),
94 "path" => path.clone(),
95 "status" => status.clone()
96 )
97 .increment(1);
98 histogram!(
99 "bspds_http_request_duration_seconds",
100 "method" => method,
101 "path" => path
102 )
103 .record(duration);
104 response
105}
106fn normalize_path(path: &str) -> String {
107 if path.starts_with("/xrpc/") {
108 if let Some(method) = path.strip_prefix("/xrpc/") {
109 if let Some(q) = method.find('?') {
110 return format!("/xrpc/{}", &method[..q]);
111 }
112 return path.to_string();
113 }
114 }
115 if path.starts_with("/u/") && path.ends_with("/did.json") {
116 return "/u/{handle}/did.json".to_string();
117 }
118 if path.starts_with("/oauth/") {
119 return path.to_string();
120 }
121 path.to_string()
122}
123pub fn record_auth_cache_hit(cache_type: &str) {
124 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1);
125}
126pub fn record_auth_cache_miss(cache_type: &str) {
127 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1);
128}
129pub fn set_firehose_subscribers(count: usize) {
130 gauge!("bspds_firehose_subscribers").set(count as f64);
131}
132pub fn increment_firehose_subscribers() {
133 counter!("bspds_firehose_events_total").increment(1);
134}
135pub fn record_firehose_event() {
136 counter!("bspds_firehose_events_total").increment(1);
137}
138pub fn record_block_operation(op_type: &str) {
139 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1);
140}
141pub fn record_s3_operation(op_type: &str, status: &str) {
142 counter!(
143 "bspds_s3_operations_total",
144 "op_type" => op_type.to_string(),
145 "status" => status.to_string()
146 )
147 .increment(1);
148}
149pub fn set_notification_queue_size(size: usize) {
150 gauge!("bspds_notification_queue_size").set(size as f64);
151}
152pub fn record_rate_limit_rejection(limiter: &str) {
153 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1);
154}
155pub fn record_db_query(query_type: &str, duration_seconds: f64) {
156 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1);
157 histogram!(
158 "bspds_db_query_duration_seconds",
159 "query_type" => query_type.to_string()
160 )
161 .record(duration_seconds);
162}
163#[cfg(test)]
164mod tests {
165 use super::*;
166 #[test]
167 fn test_normalize_path() {
168 assert_eq!(
169 normalize_path("/xrpc/com.atproto.repo.getRecord"),
170 "/xrpc/com.atproto.repo.getRecord"
171 );
172 assert_eq!(
173 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"),
174 "/xrpc/com.atproto.repo.getRecord"
175 );
176 assert_eq!(
177 normalize_path("/u/alice.example.com/did.json"),
178 "/u/{handle}/did.json"
179 );
180 assert_eq!(normalize_path("/oauth/token"), "/oauth/token");
181 assert_eq!(normalize_path("/health"), "/health");
182 }
183}