this repo has no description
1use axum::{
2 body::Body,
3 http::{Request, StatusCode},
4 middleware::Next,
5 response::{IntoResponse, Response},
6};
7use metrics::{counter, gauge, histogram};
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
13
14pub fn init_metrics() -> PrometheusHandle {
15 let builder = PrometheusBuilder::new();
16 let handle = builder
17 .install_recorder()
18 .expect("failed to install Prometheus recorder");
19
20 PROMETHEUS_HANDLE.set(handle.clone()).ok();
21 describe_metrics();
22
23 handle
24}
25
26fn describe_metrics() {
27 metrics::describe_counter!(
28 "tranquil_pds_http_requests_total",
29 "Total number of HTTP requests"
30 );
31 metrics::describe_histogram!(
32 "tranquil_pds_http_request_duration_seconds",
33 "HTTP request duration in seconds"
34 );
35 metrics::describe_counter!(
36 "tranquil_pds_auth_cache_hits_total",
37 "Total number of authentication cache hits"
38 );
39 metrics::describe_counter!(
40 "tranquil_pds_auth_cache_misses_total",
41 "Total number of authentication cache misses"
42 );
43 metrics::describe_gauge!(
44 "tranquil_pds_firehose_subscribers",
45 "Number of active firehose WebSocket subscribers"
46 );
47 metrics::describe_counter!(
48 "tranquil_pds_firehose_events_total",
49 "Total number of firehose events published"
50 );
51 metrics::describe_counter!(
52 "tranquil_pds_block_operations_total",
53 "Total number of block store operations"
54 );
55 metrics::describe_counter!(
56 "tranquil_pds_s3_operations_total",
57 "Total number of S3/blob storage operations"
58 );
59 metrics::describe_gauge!(
60 "tranquil_pds_comms_queue_size",
61 "Current size of the comms queue"
62 );
63 metrics::describe_counter!(
64 "tranquil_pds_rate_limit_rejections_total",
65 "Total number of rate limit rejections"
66 );
67 metrics::describe_counter!(
68 "tranquil_pds_db_queries_total",
69 "Total number of database queries"
70 );
71 metrics::describe_histogram!(
72 "tranquil_pds_db_query_duration_seconds",
73 "Database query duration in seconds"
74 );
75}
76
77pub async fn metrics_handler() -> impl IntoResponse {
78 match PROMETHEUS_HANDLE.get() {
79 Some(handle) => {
80 let metrics = handle.render();
81 (
82 StatusCode::OK,
83 [("content-type", "text/plain; version=0.0.4")],
84 metrics,
85 )
86 }
87 None => (
88 StatusCode::INTERNAL_SERVER_ERROR,
89 [("content-type", "text/plain")],
90 "Metrics not initialized".to_string(),
91 ),
92 }
93}
94
95pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response {
96 let start = Instant::now();
97 let method = request.method().to_string();
98 let path = normalize_path(request.uri().path());
99
100 let response = next.run(request).await;
101
102 let duration = start.elapsed().as_secs_f64();
103 let status = response.status().as_u16().to_string();
104
105 counter!(
106 "tranquil_pds_http_requests_total",
107 "method" => method.clone(),
108 "path" => path.clone(),
109 "status" => status.clone()
110 )
111 .increment(1);
112
113 histogram!(
114 "tranquil_pds_http_request_duration_seconds",
115 "method" => method,
116 "path" => path
117 )
118 .record(duration);
119
120 response
121}
122
123fn normalize_path(path: &str) -> String {
124 if path.starts_with("/xrpc/")
125 && let Some(method) = path.strip_prefix("/xrpc/")
126 {
127 if let Some(q) = method.find('?') {
128 return format!("/xrpc/{}", &method[..q]);
129 }
130 return path.to_string();
131 }
132
133 if path.starts_with("/u/") && path.ends_with("/did.json") {
134 return "/u/{handle}/did.json".to_string();
135 }
136
137 if path.starts_with("/oauth/") {
138 return path.to_string();
139 }
140
141 path.to_string()
142}
143
144pub fn record_auth_cache_hit(cache_type: &str) {
145 counter!("tranquil_pds_auth_cache_hits_total", "cache_type" => cache_type.to_string())
146 .increment(1);
147}
148
149pub fn record_auth_cache_miss(cache_type: &str) {
150 counter!("tranquil_pds_auth_cache_misses_total", "cache_type" => cache_type.to_string())
151 .increment(1);
152}
153
154pub fn set_firehose_subscribers(count: usize) {
155 gauge!("tranquil_pds_firehose_subscribers").set(count as f64);
156}
157
158pub fn increment_firehose_subscribers() {
159 counter!("tranquil_pds_firehose_events_total").increment(1);
160}
161
162pub fn record_firehose_event() {
163 counter!("tranquil_pds_firehose_events_total").increment(1);
164}
165
166pub fn record_block_operation(op_type: &str) {
167 counter!("tranquil_pds_block_operations_total", "op_type" => op_type.to_string()).increment(1);
168}
169
170pub fn record_s3_operation(op_type: &str, status: &str) {
171 counter!(
172 "tranquil_pds_s3_operations_total",
173 "op_type" => op_type.to_string(),
174 "status" => status.to_string()
175 )
176 .increment(1);
177}
178
179pub fn set_comms_queue_size(size: usize) {
180 gauge!("tranquil_pds_comms_queue_size").set(size as f64);
181}
182
183pub fn record_rate_limit_rejection(limiter: &str) {
184 counter!("tranquil_pds_rate_limit_rejections_total", "limiter" => limiter.to_string())
185 .increment(1);
186}
187
188pub fn record_db_query(query_type: &str, duration_seconds: f64) {
189 counter!("tranquil_pds_db_queries_total", "query_type" => query_type.to_string()).increment(1);
190 histogram!(
191 "tranquil_pds_db_query_duration_seconds",
192 "query_type" => query_type.to_string()
193 )
194 .record(duration_seconds);
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[test]
202 fn test_normalize_path() {
203 assert_eq!(
204 normalize_path("/xrpc/com.atproto.repo.getRecord"),
205 "/xrpc/com.atproto.repo.getRecord"
206 );
207 assert_eq!(
208 normalize_path("/xrpc/com.atproto.repo.getRecord?foo=bar"),
209 "/xrpc/com.atproto.repo.getRecord"
210 );
211 assert_eq!(
212 normalize_path("/u/alice.example.com/did.json"),
213 "/u/{handle}/did.json"
214 );
215 assert_eq!(normalize_path("/oauth/token"), "/oauth/token");
216 assert_eq!(normalize_path("/health"), "/health");
217 }
218}