QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1use crate::config::Config;
2use async_trait::async_trait;
3use cadence::{
4 BufferedUdpMetricSink, Counted, CountedExt, Gauged, Metric, QueuingMetricSink, StatsdClient,
5 Timed,
6};
7use std::net::UdpSocket;
8use std::sync::Arc;
9use thiserror::Error;
10use tracing::{debug, error};
11
12/// Trait for publishing metrics with counter and gauge support
13/// Designed for minimal compatibility with cadence-style metrics
14#[async_trait]
15pub trait MetricsPublisher: Send + Sync {
16 /// Increment a counter by 1
17 async fn incr(&self, key: &str);
18
19 /// Increment a counter by a specific value
20 async fn count(&self, key: &str, value: u64);
21
22 /// Increment a counter with tags
23 async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]);
24
25 /// Increment a counter by a specific value with tags
26 async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]);
27
28 /// Record a gauge value
29 async fn gauge(&self, key: &str, value: u64);
30
31 /// Record a gauge value with tags
32 async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]);
33
34 /// Record a timing in milliseconds
35 async fn time(&self, key: &str, millis: u64);
36
37 /// Record a timing with tags
38 async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]);
39}
40
41/// No-op implementation for development and testing
42#[derive(Debug, Clone, Default)]
43pub struct NoOpMetricsPublisher;
44
45impl NoOpMetricsPublisher {
46 pub fn new() -> Self {
47 Self
48 }
49}
50
51#[async_trait]
52impl MetricsPublisher for NoOpMetricsPublisher {
53 async fn incr(&self, _key: &str) {
54 // No-op
55 }
56
57 async fn count(&self, _key: &str, _value: u64) {
58 // No-op
59 }
60
61 async fn incr_with_tags(&self, _key: &str, _tags: &[(&str, &str)]) {
62 // No-op
63 }
64
65 async fn count_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) {
66 // No-op
67 }
68
69 async fn gauge(&self, _key: &str, _value: u64) {
70 // No-op
71 }
72
73 async fn gauge_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) {
74 // No-op
75 }
76
77 async fn time(&self, _key: &str, _millis: u64) {
78 // No-op
79 }
80
81 async fn time_with_tags(&self, _key: &str, _millis: u64, _tags: &[(&str, &str)]) {
82 // No-op
83 }
84}
85
86/// Statsd-backed metrics publisher using cadence
87pub struct StatsdMetricsPublisher {
88 client: StatsdClient,
89 default_tags: Vec<(String, String)>,
90}
91
92impl StatsdMetricsPublisher {
93 /// Create a new StatsdMetricsPublisher with default configuration
94 pub fn new(host: &str, prefix: &str) -> Result<Self, Box<dyn std::error::Error>> {
95 Self::new_with_bind(host, prefix, "[::]:0")
96 }
97
98 /// Create a new StatsdMetricsPublisher with custom bind address
99 pub fn new_with_bind(
100 host: &str,
101 prefix: &str,
102 bind_addr: &str,
103 ) -> Result<Self, Box<dyn std::error::Error>> {
104 Self::new_with_bind_and_tags(host, prefix, bind_addr, vec![])
105 }
106
107 /// Create a new StatsdMetricsPublisher with default tags
108 pub fn new_with_tags(
109 host: &str,
110 prefix: &str,
111 default_tags: Vec<(String, String)>,
112 ) -> Result<Self, Box<dyn std::error::Error>> {
113 Self::new_with_bind_and_tags(host, prefix, "[::]:0", default_tags)
114 }
115
116 /// Create a new StatsdMetricsPublisher with custom bind address and tags
117 pub fn new_with_bind_and_tags(
118 host: &str,
119 prefix: &str,
120 bind_addr: &str,
121 default_tags: Vec<(String, String)>,
122 ) -> Result<Self, Box<dyn std::error::Error>> {
123 tracing::info!(
124 "Creating StatsdMetricsPublisher: host={}, prefix={}, bind={}, tags={:?}",
125 host,
126 prefix,
127 bind_addr,
128 default_tags
129 );
130
131 let socket = UdpSocket::bind(bind_addr)?;
132 socket.set_nonblocking(true)?;
133
134 let buffered_sink = BufferedUdpMetricSink::from(host, socket)?;
135 let queuing_sink = QueuingMetricSink::builder()
136 .with_error_handler(move |error| {
137 error!("Failed to send metric via sink: {}", error);
138 })
139 .build(buffered_sink);
140 let client = StatsdClient::from_sink(prefix, queuing_sink);
141
142 tracing::info!(
143 "StatsdMetricsPublisher created successfully with bind address: {}",
144 bind_addr
145 );
146 Ok(Self {
147 client,
148 default_tags,
149 })
150 }
151
152 /// Create from an existing StatsdClient
153 pub fn from_client(client: StatsdClient) -> Self {
154 Self::from_client_with_tags(client, vec![])
155 }
156
157 /// Create from an existing StatsdClient with default tags
158 pub fn from_client_with_tags(
159 client: StatsdClient,
160 default_tags: Vec<(String, String)>,
161 ) -> Self {
162 Self {
163 client,
164 default_tags,
165 }
166 }
167
168 /// Apply default tags to a builder
169 fn apply_default_tags<'a, M>(
170 &'a self,
171 mut builder: cadence::MetricBuilder<'a, 'a, M>,
172 ) -> cadence::MetricBuilder<'a, 'a, M>
173 where
174 M: Metric + From<String>,
175 {
176 for (k, v) in &self.default_tags {
177 builder = builder.with_tag(k.as_str(), v.as_str());
178 }
179 builder
180 }
181}
182
183#[async_trait]
184impl MetricsPublisher for StatsdMetricsPublisher {
185 async fn incr(&self, key: &str) {
186 debug!("Sending metric incr: {}", key);
187 if self.default_tags.is_empty() {
188 match self.client.incr(key) {
189 Ok(_) => debug!("Successfully sent metric: {}", key),
190 Err(e) => error!("Failed to send metric {}: {}", key, e),
191 }
192 } else {
193 let builder = self.client.incr_with_tags(key);
194 let builder = self.apply_default_tags(builder);
195 let _ = builder.send();
196 debug!("Sent metric with tags: {}", key);
197 }
198 }
199
200 async fn count(&self, key: &str, value: u64) {
201 if self.default_tags.is_empty() {
202 let _ = self.client.count(key, value);
203 } else {
204 let builder = self.client.count_with_tags(key, value);
205 let builder = self.apply_default_tags(builder);
206 let _ = builder.send();
207 }
208 }
209
210 async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]) {
211 let mut builder = self.client.incr_with_tags(key);
212 builder = self.apply_default_tags(builder);
213 for (k, v) in tags {
214 builder = builder.with_tag(k, v);
215 }
216 let _ = builder.send();
217 }
218
219 async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) {
220 let mut builder = self.client.count_with_tags(key, value);
221 builder = self.apply_default_tags(builder);
222 for (k, v) in tags {
223 builder = builder.with_tag(k, v);
224 }
225 let _ = builder.send();
226 }
227
228 async fn gauge(&self, key: &str, value: u64) {
229 debug!("Sending metric gauge: {} = {}", key, value);
230 if self.default_tags.is_empty() {
231 match self.client.gauge(key, value) {
232 Ok(_) => debug!("Successfully sent gauge: {} = {}", key, value),
233 Err(e) => error!("Failed to send gauge {} = {}: {}", key, value, e),
234 }
235 } else {
236 let builder = self.client.gauge_with_tags(key, value);
237 let builder = self.apply_default_tags(builder);
238 builder.send();
239 debug!("Sent gauge with tags: {} = {}", key, value);
240 }
241 }
242
243 async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) {
244 let mut builder = self.client.gauge_with_tags(key, value);
245 builder = self.apply_default_tags(builder);
246 for (k, v) in tags {
247 builder = builder.with_tag(k, v);
248 }
249 let _ = builder.send();
250 }
251
252 async fn time(&self, key: &str, millis: u64) {
253 if self.default_tags.is_empty() {
254 let _ = self.client.time(key, millis);
255 } else {
256 let builder = self.client.time_with_tags(key, millis);
257 let builder = self.apply_default_tags(builder);
258 let _ = builder.send();
259 }
260 }
261
262 async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]) {
263 let mut builder = self.client.time_with_tags(key, millis);
264 builder = self.apply_default_tags(builder);
265 for (k, v) in tags {
266 builder = builder.with_tag(k, v);
267 }
268 let _ = builder.send();
269 }
270}
271
272/// Type alias for shared metrics publisher
273pub type SharedMetricsPublisher = Arc<dyn MetricsPublisher>;
274
275/// Metrics-specific errors
276#[derive(Debug, Error)]
277pub enum MetricsError {
278 /// Failed to create metrics publisher
279 #[error("error-quickdid-metrics-1 Failed to create metrics publisher: {0}")]
280 CreationFailed(String),
281
282 /// Invalid configuration for metrics
283 #[error("error-quickdid-metrics-2 Invalid metrics configuration: {0}")]
284 InvalidConfig(String),
285}
286
287/// Create a metrics publisher based on configuration
288///
289/// Returns either a no-op publisher or a StatsD publisher based on the
290/// `metrics_adapter` configuration value.
291///
292/// ## Example
293///
294/// ```rust,no_run
295/// use quickdid::config::Config;
296/// use quickdid::metrics::create_metrics_publisher;
297///
298/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
299/// let config = Config::from_env()?;
300/// let metrics = create_metrics_publisher(&config)?;
301///
302/// // Use the metrics publisher
303/// metrics.incr("request.count").await;
304/// # Ok(())
305/// # }
306/// ```
307pub fn create_metrics_publisher(config: &Config) -> Result<SharedMetricsPublisher, MetricsError> {
308 match config.metrics_adapter.as_str() {
309 "noop" => Ok(Arc::new(NoOpMetricsPublisher::new())),
310 "statsd" => {
311 let host = config.metrics_statsd_host.as_ref().ok_or_else(|| {
312 MetricsError::InvalidConfig(
313 "METRICS_STATSD_HOST is required when using statsd adapter".to_string(),
314 )
315 })?;
316
317 // Parse tags from comma-separated key:value pairs
318 let default_tags = if let Some(tags_str) = &config.metrics_tags {
319 tags_str
320 .split(',')
321 .filter_map(|tag| {
322 let parts: Vec<&str> = tag.trim().split(':').collect();
323 if parts.len() == 2 {
324 Some((parts[0].to_string(), parts[1].to_string()))
325 } else {
326 error!("Invalid tag format: {}", tag);
327 None
328 }
329 })
330 .collect()
331 } else {
332 vec![]
333 };
334
335 let publisher = StatsdMetricsPublisher::new_with_bind_and_tags(
336 host,
337 &config.metrics_prefix,
338 &config.metrics_statsd_bind,
339 default_tags,
340 )
341 .map_err(|e| MetricsError::CreationFailed(e.to_string()))?;
342
343 Ok(Arc::new(publisher))
344 }
345 _ => Err(MetricsError::InvalidConfig(format!(
346 "Unknown metrics adapter: {}",
347 config.metrics_adapter
348 ))),
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use once_cell::sync::Lazy;
356 use std::sync::Mutex;
357
358 // Use a mutex to serialize tests that modify environment variables
359 static ENV_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
360
361 #[tokio::test]
362 async fn test_noop_metrics() {
363 let metrics = NoOpMetricsPublisher::new();
364
365 // These should all be no-ops and not panic
366 metrics.incr("test.counter").await;
367 metrics.count("test.counter", 5).await;
368 metrics
369 .incr_with_tags("test.counter", &[("env", "test")])
370 .await;
371 metrics
372 .count_with_tags(
373 "test.counter",
374 10,
375 &[("env", "test"), ("service", "quickdid")],
376 )
377 .await;
378 metrics.gauge("test.gauge", 100).await;
379 metrics
380 .gauge_with_tags("test.gauge", 200, &[("host", "localhost")])
381 .await;
382 metrics.time("test.timing", 42).await;
383 metrics
384 .time_with_tags("test.timing", 84, &[("endpoint", "/resolve")])
385 .await;
386 }
387
388 #[tokio::test]
389 async fn test_shared_metrics() {
390 let metrics: SharedMetricsPublisher = Arc::new(NoOpMetricsPublisher::new());
391
392 // Verify it can be used as a shared reference
393 metrics.incr("shared.counter").await;
394 metrics.gauge("shared.gauge", 50).await;
395
396 // Verify it can be cloned
397 let metrics2 = Arc::clone(&metrics);
398 metrics2.count("cloned.counter", 3).await;
399 }
400
401 #[test]
402 fn test_create_noop_publisher() {
403 use std::env;
404
405 // Lock mutex to prevent concurrent environment variable modification
406 let _guard = ENV_MUTEX.lock().unwrap();
407
408 // Clean up any existing environment variables first
409 unsafe {
410 env::remove_var("METRICS_ADAPTER");
411 env::remove_var("METRICS_STATSD_HOST");
412 env::remove_var("METRICS_PREFIX");
413 env::remove_var("METRICS_TAGS");
414 }
415
416 // Set up environment for noop adapter
417 unsafe {
418 env::set_var("HTTP_EXTERNAL", "test.example.com");
419 env::set_var("METRICS_ADAPTER", "noop");
420 }
421
422 let config = Config::from_env().unwrap();
423 let metrics = create_metrics_publisher(&config).unwrap();
424
425 // Should create successfully - actual type checking happens at compile time
426 assert!(Arc::strong_count(&metrics) == 1);
427
428 // Clean up
429 unsafe {
430 env::remove_var("METRICS_ADAPTER");
431 env::remove_var("HTTP_EXTERNAL");
432 }
433 }
434
435 #[test]
436 fn test_create_statsd_publisher() {
437 use std::env;
438
439 // Lock mutex to prevent concurrent environment variable modification
440 let _guard = ENV_MUTEX.lock().unwrap();
441
442 // Clean up any existing environment variables first
443 unsafe {
444 env::remove_var("METRICS_ADAPTER");
445 env::remove_var("METRICS_STATSD_HOST");
446 env::remove_var("METRICS_PREFIX");
447 env::remove_var("METRICS_TAGS");
448 }
449
450 // Set up environment for statsd adapter
451 unsafe {
452 env::set_var("HTTP_EXTERNAL", "test.example.com");
453 env::set_var("METRICS_ADAPTER", "statsd");
454 env::set_var("METRICS_STATSD_HOST", "localhost:8125");
455 env::set_var("METRICS_PREFIX", "test");
456 env::set_var("METRICS_TAGS", "env:test,service:quickdid");
457 }
458
459 let config = Config::from_env().unwrap();
460 let metrics = create_metrics_publisher(&config).unwrap();
461
462 // Should create successfully
463 assert!(Arc::strong_count(&metrics) == 1);
464
465 // Clean up
466 unsafe {
467 env::remove_var("METRICS_ADAPTER");
468 env::remove_var("METRICS_STATSD_HOST");
469 env::remove_var("METRICS_PREFIX");
470 env::remove_var("METRICS_TAGS");
471 env::remove_var("HTTP_EXTERNAL");
472 }
473 }
474
475 #[test]
476 fn test_missing_statsd_host() {
477 use std::env;
478
479 // Lock mutex to prevent concurrent environment variable modification
480 let _guard = ENV_MUTEX.lock().unwrap();
481
482 // Clean up any existing environment variables first
483 unsafe {
484 env::remove_var("METRICS_ADAPTER");
485 env::remove_var("METRICS_STATSD_HOST");
486 env::remove_var("METRICS_PREFIX");
487 env::remove_var("METRICS_TAGS");
488 }
489
490 // Set up environment for statsd adapter without host
491 unsafe {
492 env::set_var("HTTP_EXTERNAL", "test.example.com");
493 env::set_var("METRICS_ADAPTER", "statsd");
494 env::remove_var("METRICS_STATSD_HOST");
495 }
496
497 let config = Config::from_env().unwrap();
498 let result = create_metrics_publisher(&config);
499
500 // Should fail with invalid config error
501 assert!(result.is_err());
502 if let Err(e) = result {
503 assert!(matches!(e, MetricsError::InvalidConfig(_)));
504 }
505
506 // Clean up
507 unsafe {
508 env::remove_var("METRICS_ADAPTER");
509 env::remove_var("HTTP_EXTERNAL");
510 }
511 }
512
513 #[test]
514 fn test_invalid_adapter() {
515 use std::env;
516
517 // Lock mutex to prevent concurrent environment variable modification
518 let _guard = ENV_MUTEX.lock().unwrap();
519
520 // Clean up any existing environment variables first
521 unsafe {
522 env::remove_var("METRICS_ADAPTER");
523 env::remove_var("METRICS_STATSD_HOST");
524 env::remove_var("METRICS_PREFIX");
525 env::remove_var("METRICS_TAGS");
526 }
527
528 // Set up environment with invalid adapter
529 unsafe {
530 env::set_var("HTTP_EXTERNAL", "test.example.com");
531 env::set_var("METRICS_ADAPTER", "invalid");
532 env::remove_var("METRICS_STATSD_HOST"); // Clean up from other tests
533 }
534
535 let config = Config::from_env().unwrap();
536
537 // Config validation should catch this
538 let validation_result = config.validate();
539 assert!(validation_result.is_err());
540
541 // Clean up
542 unsafe {
543 env::remove_var("METRICS_ADAPTER");
544 env::remove_var("HTTP_EXTERNAL");
545 }
546 }
547}