this repo has no description
1use axum::{
2 body::Body,
3 http::{Request, StatusCode},
4 middleware::Next,
5 response::{IntoResponse, Response},
6};
7use metrics::{counter, gauge, histogram};
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
13
14pub fn init_metrics() -> PrometheusHandle {
15 let builder = PrometheusBuilder::new();
16 let handle = builder
17 .install_recorder()
18 .expect("failed to install Prometheus recorder");
19
20 PROMETHEUS_HANDLE.set(handle.clone()).ok();
21
22 describe_metrics();
23
24 handle
25}
26
27fn describe_metrics() {
28 metrics::describe_counter!(
29 "bspds_http_requests_total",
30 "Total number of HTTP requests"
31 );
32 metrics::describe_histogram!(
33 "bspds_http_request_duration_seconds",
34 "HTTP request duration in seconds"
35 );
36 metrics::describe_counter!(
37 "bspds_auth_cache_hits_total",
38 "Total number of authentication cache hits"
39 );
40 metrics::describe_counter!(
41 "bspds_auth_cache_misses_total",
42 "Total number of authentication cache misses"
43 );
44 metrics::describe_gauge!(
45 "bspds_firehose_subscribers",
46 "Number of active firehose WebSocket subscribers"
47 );
48 metrics::describe_counter!(
49 "bspds_firehose_events_total",
50 "Total number of firehose events published"
51 );
52 metrics::describe_counter!(
53 "bspds_block_operations_total",
54 "Total number of block store operations"
55 );
56 metrics::describe_counter!(
57 "bspds_s3_operations_total",
58 "Total number of S3/blob storage operations"
59 );
60 metrics::describe_gauge!(
61 "bspds_notification_queue_size",
62 "Current size of the notification queue"
63 );
64 metrics::describe_counter!(
65 "bspds_rate_limit_rejections_total",
66 "Total number of rate limit rejections"
67 );
68 metrics::describe_counter!(
69 "bspds_db_queries_total",
70 "Total number of database queries"
71 );
72 metrics::describe_histogram!(
73 "bspds_db_query_duration_seconds",
74 "Database query duration in seconds"
75 );
76}
77
78pub async fn metrics_handler() -> impl IntoResponse {
79 match PROMETHEUS_HANDLE.get() {
80 Some(handle) => {
81 let metrics = handle.render();
82 (StatusCode::OK, [("content-type", "text/plain; version=0.0.4")], metrics)
83 }
84 None => (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 [("content-type", "text/plain")],
87 "Metrics not initialized".to_string(),
88 ),
89 }
90}
91
92pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response {
93 let start = Instant::now();
94 let method = request.method().to_string();
95 let path = normalize_path(request.uri().path());
96
97 let response = next.run(request).await;
98
99 let duration = start.elapsed().as_secs_f64();
100 let status = response.status().as_u16().to_string();
101
102 counter!(
103 "bspds_http_requests_total",
104 "method" => method.clone(),
105 "path" => path.clone(),
106 "status" => status.clone()
107 )
108 .increment(1);
109
110 histogram!(
111 "bspds_http_request_duration_seconds",
112 "method" => method,
113 "path" => path
114 )
115 .record(duration);
116
117 response
118}
119
120fn normalize_path(path: &str) -> String {
121 if path.starts_with("/xrpc/") {
122 if let Some(method) = path.strip_prefix("/xrpc/") {
123 if let Some(q) = method.find('?') {
124 return format!("/xrpc/{}", &method[..q]);
125 }
126 return path.to_string();
127 }
128 }
129
130 if path.starts_with("/u/") && path.ends_with("/did.json") {
131 return "/u/{handle}/did.json".to_string();
132 }
133
134 if path.starts_with("/oauth/") {
135 return path.to_string();
136 }
137
138 path.to_string()
139}
140
141pub fn record_auth_cache_hit(cache_type: &str) {
142 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1);
143}
144
145pub fn record_auth_cache_miss(cache_type: &str) {
146 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1);
147}
148
149pub fn set_firehose_subscribers(count: usize) {
150 gauge!("bspds_firehose_subscribers").set(count as f64);
151}
152
153pub fn increment_firehose_subscribers() {
154 counter!("bspds_firehose_events_total").increment(1);
155}
156
157pub fn record_firehose_event() {
158 counter!("bspds_firehose_events_total").increment(1);
159}
160
161pub fn record_block_operation(op_type: &str) {
162 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1);
163}
164
165pub fn record_s3_operation(op_type: &str, status: &str) {
166 counter!(
167 "bspds_s3_operations_total",
168 "op_type" => op_type.to_string(),
169 "status" => status.to_string()
170 )
171 .increment(1);
172}
173
174pub fn set_notification_queue_size(size: usize) {
175 gauge!("bspds_notification_queue_size").set(size as f64);
176}
177
178pub fn record_rate_limit_rejection(limiter: &str) {
179 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1);
180}
181
182pub fn record_db_query(query_type: &str, duration_seconds: f64) {
183 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1);
184 histogram!(
185 "bspds_db_query_duration_seconds",
186 "query_type" => query_type.to_string()
187 )
188 .record(duration_seconds);
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn test_normalize_path() {
197 assert_eq!(
198 normalize_path("/xrpc/com.atproto.repo.getRecord"),
199 "/xrpc/com.atproto.repo.getRecord"
200 );
201 assert_eq!(
202 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"),
203 "/xrpc/com.atproto.repo.getRecord"
204 );
205 assert_eq!(
206 normalize_path("/u/alice.example.com/did.json"),
207 "/u/{handle}/did.json"
208 );
209 assert_eq!(normalize_path("/oauth/token"), "/oauth/token");
210 assert_eq!(normalize_path("/health"), "/health");
211 }
212}