this repo has no description
1use axum::{
2 body::Body,
3 http::{Request, StatusCode},
4 middleware::Next,
5 response::{IntoResponse, Response},
6};
7use metrics::{counter, gauge, histogram};
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
13
14pub fn init_metrics() -> PrometheusHandle {
15 let builder = PrometheusBuilder::new();
16 let handle = builder
17 .install_recorder()
18 .expect("failed to install Prometheus recorder");
19
20 PROMETHEUS_HANDLE.set(handle.clone()).ok();
21 describe_metrics();
22
23 handle
24}
25
26fn describe_metrics() {
27 metrics::describe_counter!(
28 "bspds_http_requests_total",
29 "Total number of HTTP requests"
30 );
31 metrics::describe_histogram!(
32 "bspds_http_request_duration_seconds",
33 "HTTP request duration in seconds"
34 );
35 metrics::describe_counter!(
36 "bspds_auth_cache_hits_total",
37 "Total number of authentication cache hits"
38 );
39 metrics::describe_counter!(
40 "bspds_auth_cache_misses_total",
41 "Total number of authentication cache misses"
42 );
43 metrics::describe_gauge!(
44 "bspds_firehose_subscribers",
45 "Number of active firehose WebSocket subscribers"
46 );
47 metrics::describe_counter!(
48 "bspds_firehose_events_total",
49 "Total number of firehose events published"
50 );
51 metrics::describe_counter!(
52 "bspds_block_operations_total",
53 "Total number of block store operations"
54 );
55 metrics::describe_counter!(
56 "bspds_s3_operations_total",
57 "Total number of S3/blob storage operations"
58 );
59 metrics::describe_gauge!(
60 "bspds_notification_queue_size",
61 "Current size of the notification queue"
62 );
63 metrics::describe_counter!(
64 "bspds_rate_limit_rejections_total",
65 "Total number of rate limit rejections"
66 );
67 metrics::describe_counter!(
68 "bspds_db_queries_total",
69 "Total number of database queries"
70 );
71 metrics::describe_histogram!(
72 "bspds_db_query_duration_seconds",
73 "Database query duration in seconds"
74 );
75}
76
77pub async fn metrics_handler() -> impl IntoResponse {
78 match PROMETHEUS_HANDLE.get() {
79 Some(handle) => {
80 let metrics = handle.render();
81 (StatusCode::OK, [("content-type", "text/plain; version=0.0.4")], metrics)
82 }
83 None => (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 [("content-type", "text/plain")],
86 "Metrics not initialized".to_string(),
87 ),
88 }
89}
90
91pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response {
92 let start = Instant::now();
93 let method = request.method().to_string();
94 let path = normalize_path(request.uri().path());
95
96 let response = next.run(request).await;
97
98 let duration = start.elapsed().as_secs_f64();
99 let status = response.status().as_u16().to_string();
100
101 counter!(
102 "bspds_http_requests_total",
103 "method" => method.clone(),
104 "path" => path.clone(),
105 "status" => status.clone()
106 )
107 .increment(1);
108
109 histogram!(
110 "bspds_http_request_duration_seconds",
111 "method" => method,
112 "path" => path
113 )
114 .record(duration);
115
116 response
117}
118
119fn normalize_path(path: &str) -> String {
120 if path.starts_with("/xrpc/") {
121 if let Some(method) = path.strip_prefix("/xrpc/") {
122 if let Some(q) = method.find('?') {
123 return format!("/xrpc/{}", &method[..q]);
124 }
125 return path.to_string();
126 }
127 }
128
129 if path.starts_with("/u/") && path.ends_with("/did.json") {
130 return "/u/{handle}/did.json".to_string();
131 }
132
133 if path.starts_with("/oauth/") {
134 return path.to_string();
135 }
136
137 path.to_string()
138}
139
140pub fn record_auth_cache_hit(cache_type: &str) {
141 counter!("bspds_auth_cache_hits_total", "cache_type" => cache_type.to_string()).increment(1);
142}
143
144pub fn record_auth_cache_miss(cache_type: &str) {
145 counter!("bspds_auth_cache_misses_total", "cache_type" => cache_type.to_string()).increment(1);
146}
147
148pub fn set_firehose_subscribers(count: usize) {
149 gauge!("bspds_firehose_subscribers").set(count as f64);
150}
151
152pub fn increment_firehose_subscribers() {
153 counter!("bspds_firehose_events_total").increment(1);
154}
155
156pub fn record_firehose_event() {
157 counter!("bspds_firehose_events_total").increment(1);
158}
159
160pub fn record_block_operation(op_type: &str) {
161 counter!("bspds_block_operations_total", "op_type" => op_type.to_string()).increment(1);
162}
163
164pub fn record_s3_operation(op_type: &str, status: &str) {
165 counter!(
166 "bspds_s3_operations_total",
167 "op_type" => op_type.to_string(),
168 "status" => status.to_string()
169 )
170 .increment(1);
171}
172
173pub fn set_notification_queue_size(size: usize) {
174 gauge!("bspds_notification_queue_size").set(size as f64);
175}
176
177pub fn record_rate_limit_rejection(limiter: &str) {
178 counter!("bspds_rate_limit_rejections_total", "limiter" => limiter.to_string()).increment(1);
179}
180
181pub fn record_db_query(query_type: &str, duration_seconds: f64) {
182 counter!("bspds_db_queries_total", "query_type" => query_type.to_string()).increment(1);
183 histogram!(
184 "bspds_db_query_duration_seconds",
185 "query_type" => query_type.to_string()
186 )
187 .record(duration_seconds);
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn test_normalize_path() {
196 assert_eq!(
197 normalize_path("/xrpc/com.atproto.repo.getRecord"),
198 "/xrpc/com.atproto.repo.getRecord"
199 );
200 assert_eq!(
201 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"),
202 "/xrpc/com.atproto.repo.getRecord"
203 );
204 assert_eq!(
205 normalize_path("/u/alice.example.com/did.json"),
206 "/u/{handle}/did.json"
207 );
208 assert_eq!(normalize_path("/oauth/token"), "/oauth/token");
209 assert_eq!(normalize_path("/health"), "/health");
210 }
211}