this repo has no description
1use axum::{
2 body::Body,
3 http::{Request, StatusCode},
4 middleware::Next,
5 response::{IntoResponse, Response},
6};
7use metrics::{counter, gauge, histogram};
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
13
14pub fn init_metrics() -> PrometheusHandle {
15 let builder = PrometheusBuilder::new();
16 let handle = builder
17 .install_recorder()
18 .expect("failed to install Prometheus recorder");
19
20 PROMETHEUS_HANDLE.set(handle.clone()).ok();
21 describe_metrics();
22
23 handle
24}
25
26fn describe_metrics() {
27 metrics::describe_counter!("bspds_http_requests_total", "Total number of HTTP requests");
28 metrics::describe_histogram!(
29 "bspds_http_request_duration_seconds",
30 "HTTP request duration in seconds"
31 );
32 metrics::describe_counter!(
33 "bspds_auth_cache_hits_total",
34 "Total number of authentication cache hits"
35 );
36 metrics::describe_counter!(
37 "bspds_auth_cache_misses_total",
38 "Total number of authentication cache misses"
39 );
40 metrics::describe_gauge!(
41 "bspds_firehose_subscribers",
42 "Number of active firehose WebSocket subscribers"
43 );
44 metrics::describe_counter!(
45 "bspds_firehose_events_total",
46 "Total number of firehose events published"
47 );
48 metrics::describe_counter!(
49 "bspds_block_operations_total",
50 "Total number of block store operations"
51 );
52 metrics::describe_counter!(
53 "bspds_s3_operations_total",
54 "Total number of S3/blob storage operations"
55 );
56 metrics::describe_gauge!(
57 "bspds_comms_queue_size",
58 "Current size of the comms queue"
59 );
60 metrics::describe_counter!(
61 "bspds_rate_limit_rejections_total",
62 "Total number of rate limit rejections"
63 );
64 metrics::describe_counter!("bspds_db_queries_total", "Total number of database queries");
65 metrics::describe_histogram!(
66 "bspds_db_query_duration_seconds",
67 "Database query duration in seconds"
68 );
69}
70
71pub async fn metrics_handler() -> impl IntoResponse {
72 match PROMETHEUS_HANDLE.get() {
73 Some(handle) => {
74 let metrics = handle.render();
75 (
76 StatusCode::OK,
77 [("content-type", "text/plain; version=0.0.4")],
78 metrics,
79 )
80 }
81 None => (
82 StatusCode::INTERNAL_SERVER_ERROR,
83 [("content-type", "text/plain")],
84 "Metrics not initialized".to_string(),
85 ),
86 }
87}
88
89pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response {
90 let start = Instant::now();
91 let method = request.method().to_string();
92 let path = normalize_path(request.uri().path());
93
94 let response = next.run(request).await;
95
96 let duration = start.elapsed().as_secs_f64();
97 let status = response.status().as_u16().to_string();
98
99 counter!(
100 "bspds_http_requests_total",
101 "method" => method.clone(),
102 "path" => path.clone(),
103 "status" => status.clone()
104 )
105 .increment(1);
106
107 histogram!(
108 "bspds_http_request_duration_seconds",
109 "method" => method,
110 "path" => path
111 )
112 .record(duration);
113
114 response
115}
116
117fn normalize_path(path: &str) -> String {
118 if path.starts_with("/xrpc/")
119 && let Some(method) = path.strip_prefix("/xrpc/") {
120 if let Some(q) = method.find('?') {
121 return format!("/xrpc/{}", &method[..q]);
122 }
123 return path.to_string();
124 }
125
126 if path.starts_with("/u/") && path.ends_with("/did.json") {
127 return "/u/{handle}/did.json".to_string();
128 }
129
130 if path.starts_with("/oauth/") {
131 return path.to_string();
132 }
133
134 path.to_string()
135}
136
137pub fn record_auth_cache_hit(cache_type: &str) {
138 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1);
139}
140
141pub fn record_auth_cache_miss(cache_type: &str) {
142 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1);
143}
144
145pub fn set_firehose_subscribers(count: usize) {
146 gauge!("bspds_firehose_subscribers").set(count as f64);
147}
148
149pub fn increment_firehose_subscribers() {
150 counter!("bspds_firehose_events_total").increment(1);
151}
152
153pub fn record_firehose_event() {
154 counter!("bspds_firehose_events_total").increment(1);
155}
156
157pub fn record_block_operation(op_type: &str) {
158 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1);
159}
160
161pub fn record_s3_operation(op_type: &str, status: &str) {
162 counter!(
163 "bspds_s3_operations_total",
164 "op_type" => op_type.to_string(),
165 "status" => status.to_string()
166 )
167 .increment(1);
168}
169
170pub fn set_comms_queue_size(size: usize) {
171 gauge!("bspds_comms_queue_size").set(size as f64);
172}
173
174pub fn record_rate_limit_rejection(limiter: &str) {
175 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1);
176}
177
178pub fn record_db_query(query_type: &str, duration_seconds: f64) {
179 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1);
180 histogram!(
181 "bspds_db_query_duration_seconds",
182 "query_type" => query_type.to_string()
183 )
184 .record(duration_seconds);
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_normalize_path() {
193 assert_eq!(
194 normalize_path("/xrpc/com.atproto.repo.getRecord"),
195 "/xrpc/com.atproto.repo.getRecord"
196 );
197 assert_eq!(
198 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"),
199 "/xrpc/com.atproto.repo.getRecord"
200 );
201 assert_eq!(
202 normalize_path("/u/alice.example.com/did.json"),
203 "/u/{handle}/did.json"
204 );
205 assert_eq!(normalize_path("/oauth/token"), "/oauth/token");
206 assert_eq!(normalize_path("/health"), "/health");
207 }
208}