use crate::config::Config; use async_trait::async_trait; use cadence::{ BufferedUdpMetricSink, Counted, CountedExt, Gauged, Metric, QueuingMetricSink, StatsdClient, Timed, }; use std::net::UdpSocket; use std::sync::Arc; use thiserror::Error; use tracing::{debug, error}; /// Trait for publishing metrics with counter and gauge support /// Designed for minimal compatibility with cadence-style metrics #[async_trait] pub trait MetricsPublisher: Send + Sync { /// Increment a counter by 1 async fn incr(&self, key: &str); /// Increment a counter by a specific value async fn count(&self, key: &str, value: u64); /// Increment a counter with tags async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]); /// Increment a counter by a specific value with tags async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]); /// Record a gauge value async fn gauge(&self, key: &str, value: u64); /// Record a gauge value with tags async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]); /// Record a timing in milliseconds async fn time(&self, key: &str, millis: u64); /// Record a timing with tags async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]); } /// No-op implementation for development and testing #[derive(Debug, Clone, Default)] pub struct NoOpMetricsPublisher; impl NoOpMetricsPublisher { pub fn new() -> Self { Self } } #[async_trait] impl MetricsPublisher for NoOpMetricsPublisher { async fn incr(&self, _key: &str) { // No-op } async fn count(&self, _key: &str, _value: u64) { // No-op } async fn incr_with_tags(&self, _key: &str, _tags: &[(&str, &str)]) { // No-op } async fn count_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) { // No-op } async fn gauge(&self, _key: &str, _value: u64) { // No-op } async fn gauge_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) { // No-op } async fn time(&self, _key: &str, _millis: u64) { // No-op } async fn time_with_tags(&self, _key: &str, _millis: u64, _tags: &[(&str, &str)]) { // No-op } } /// Statsd-backed metrics publisher using cadence pub struct StatsdMetricsPublisher { client: StatsdClient, default_tags: Vec<(String, String)>, } impl StatsdMetricsPublisher { /// Create a new StatsdMetricsPublisher with default configuration pub fn new(host: &str, prefix: &str) -> Result> { Self::new_with_bind(host, prefix, "[::]:0") } /// Create a new StatsdMetricsPublisher with custom bind address pub fn new_with_bind( host: &str, prefix: &str, bind_addr: &str, ) -> Result> { Self::new_with_bind_and_tags(host, prefix, bind_addr, vec![]) } /// Create a new StatsdMetricsPublisher with default tags pub fn new_with_tags( host: &str, prefix: &str, default_tags: Vec<(String, String)>, ) -> Result> { Self::new_with_bind_and_tags(host, prefix, "[::]:0", default_tags) } /// Create a new StatsdMetricsPublisher with custom bind address and tags pub fn new_with_bind_and_tags( host: &str, prefix: &str, bind_addr: &str, default_tags: Vec<(String, String)>, ) -> Result> { tracing::info!( "Creating StatsdMetricsPublisher: host={}, prefix={}, bind={}, tags={:?}", host, prefix, bind_addr, default_tags ); let socket = UdpSocket::bind(bind_addr)?; socket.set_nonblocking(true)?; let buffered_sink = BufferedUdpMetricSink::from(host, socket)?; let queuing_sink = QueuingMetricSink::builder() .with_error_handler(move |error| { error!("Failed to send metric via sink: {}", error); }) .build(buffered_sink); let client = StatsdClient::from_sink(prefix, queuing_sink); tracing::info!( "StatsdMetricsPublisher created successfully with bind address: {}", bind_addr ); Ok(Self { client, default_tags, }) } /// Create from an existing StatsdClient pub fn from_client(client: StatsdClient) -> Self { Self::from_client_with_tags(client, vec![]) } /// Create from an existing StatsdClient with default tags pub fn from_client_with_tags( client: StatsdClient, default_tags: Vec<(String, String)>, ) -> Self { Self { client, default_tags, } } /// Apply default tags to a builder fn apply_default_tags<'a, M>( &'a self, mut builder: cadence::MetricBuilder<'a, 'a, M>, ) -> cadence::MetricBuilder<'a, 'a, M> where M: Metric + From, { for (k, v) in &self.default_tags { builder = builder.with_tag(k.as_str(), v.as_str()); } builder } } #[async_trait] impl MetricsPublisher for StatsdMetricsPublisher { async fn incr(&self, key: &str) { debug!("Sending metric incr: {}", key); if self.default_tags.is_empty() { match self.client.incr(key) { Ok(_) => debug!("Successfully sent metric: {}", key), Err(e) => error!("Failed to send metric {}: {}", key, e), } } else { let builder = self.client.incr_with_tags(key); let builder = self.apply_default_tags(builder); let _ = builder.send(); debug!("Sent metric with tags: {}", key); } } async fn count(&self, key: &str, value: u64) { if self.default_tags.is_empty() { let _ = self.client.count(key, value); } else { let builder = self.client.count_with_tags(key, value); let builder = self.apply_default_tags(builder); let _ = builder.send(); } } async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]) { let mut builder = self.client.incr_with_tags(key); builder = self.apply_default_tags(builder); for (k, v) in tags { builder = builder.with_tag(k, v); } let _ = builder.send(); } async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) { let mut builder = self.client.count_with_tags(key, value); builder = self.apply_default_tags(builder); for (k, v) in tags { builder = builder.with_tag(k, v); } let _ = builder.send(); } async fn gauge(&self, key: &str, value: u64) { debug!("Sending metric gauge: {} = {}", key, value); if self.default_tags.is_empty() { match self.client.gauge(key, value) { Ok(_) => debug!("Successfully sent gauge: {} = {}", key, value), Err(e) => error!("Failed to send gauge {} = {}: {}", key, value, e), } } else { let builder = self.client.gauge_with_tags(key, value); let builder = self.apply_default_tags(builder); builder.send(); debug!("Sent gauge with tags: {} = {}", key, value); } } async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) { let mut builder = self.client.gauge_with_tags(key, value); builder = self.apply_default_tags(builder); for (k, v) in tags { builder = builder.with_tag(k, v); } let _ = builder.send(); } async fn time(&self, key: &str, millis: u64) { if self.default_tags.is_empty() { let _ = self.client.time(key, millis); } else { let builder = self.client.time_with_tags(key, millis); let builder = self.apply_default_tags(builder); let _ = builder.send(); } } async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]) { let mut builder = self.client.time_with_tags(key, millis); builder = self.apply_default_tags(builder); for (k, v) in tags { builder = builder.with_tag(k, v); } let _ = builder.send(); } } /// Type alias for shared metrics publisher pub type SharedMetricsPublisher = Arc; /// Metrics-specific errors #[derive(Debug, Error)] pub enum MetricsError { /// Failed to create metrics publisher #[error("error-quickdid-metrics-1 Failed to create metrics publisher: {0}")] CreationFailed(String), /// Invalid configuration for metrics #[error("error-quickdid-metrics-2 Invalid metrics configuration: {0}")] InvalidConfig(String), } /// Create a metrics publisher based on configuration /// /// Returns either a no-op publisher or a StatsD publisher based on the /// `metrics_adapter` configuration value. /// /// ## Example /// /// ```rust,no_run /// use quickdid::config::Config; /// use quickdid::metrics::create_metrics_publisher; /// /// # async fn example() -> Result<(), Box> { /// let config = Config::from_env()?; /// let metrics = create_metrics_publisher(&config)?; /// /// // Use the metrics publisher /// metrics.incr("request.count").await; /// # Ok(()) /// # } /// ``` pub fn create_metrics_publisher(config: &Config) -> Result { match config.metrics_adapter.as_str() { "noop" => Ok(Arc::new(NoOpMetricsPublisher::new())), "statsd" => { let host = config.metrics_statsd_host.as_ref().ok_or_else(|| { MetricsError::InvalidConfig( "METRICS_STATSD_HOST is required when using statsd adapter".to_string(), ) })?; // Parse tags from comma-separated key:value pairs let default_tags = if let Some(tags_str) = &config.metrics_tags { tags_str .split(',') .filter_map(|tag| { let parts: Vec<&str> = tag.trim().split(':').collect(); if parts.len() == 2 { Some((parts[0].to_string(), parts[1].to_string())) } else { error!("Invalid tag format: {}", tag); None } }) .collect() } else { vec![] }; let publisher = StatsdMetricsPublisher::new_with_bind_and_tags( host, &config.metrics_prefix, &config.metrics_statsd_bind, default_tags, ) .map_err(|e| MetricsError::CreationFailed(e.to_string()))?; Ok(Arc::new(publisher)) } _ => Err(MetricsError::InvalidConfig(format!( "Unknown metrics adapter: {}", config.metrics_adapter ))), } } #[cfg(test)] mod tests { use super::*; use once_cell::sync::Lazy; use std::sync::Mutex; // Use a mutex to serialize tests that modify environment variables static ENV_MUTEX: Lazy> = Lazy::new(|| Mutex::new(())); #[tokio::test] async fn test_noop_metrics() { let metrics = NoOpMetricsPublisher::new(); // These should all be no-ops and not panic metrics.incr("test.counter").await; metrics.count("test.counter", 5).await; metrics .incr_with_tags("test.counter", &[("env", "test")]) .await; metrics .count_with_tags( "test.counter", 10, &[("env", "test"), ("service", "quickdid")], ) .await; metrics.gauge("test.gauge", 100).await; metrics .gauge_with_tags("test.gauge", 200, &[("host", "localhost")]) .await; metrics.time("test.timing", 42).await; metrics .time_with_tags("test.timing", 84, &[("endpoint", "/resolve")]) .await; } #[tokio::test] async fn test_shared_metrics() { let metrics: SharedMetricsPublisher = Arc::new(NoOpMetricsPublisher::new()); // Verify it can be used as a shared reference metrics.incr("shared.counter").await; metrics.gauge("shared.gauge", 50).await; // Verify it can be cloned let metrics2 = Arc::clone(&metrics); metrics2.count("cloned.counter", 3).await; } #[test] fn test_create_noop_publisher() { use std::env; // Lock mutex to prevent concurrent environment variable modification let _guard = ENV_MUTEX.lock().unwrap(); // Clean up any existing environment variables first unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("METRICS_STATSD_HOST"); env::remove_var("METRICS_PREFIX"); env::remove_var("METRICS_TAGS"); } // Set up environment for noop adapter unsafe { env::set_var("HTTP_EXTERNAL", "test.example.com"); env::set_var("METRICS_ADAPTER", "noop"); } let config = Config::from_env().unwrap(); let metrics = create_metrics_publisher(&config).unwrap(); // Should create successfully - actual type checking happens at compile time assert!(Arc::strong_count(&metrics) == 1); // Clean up unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("HTTP_EXTERNAL"); } } #[test] fn test_create_statsd_publisher() { use std::env; // Lock mutex to prevent concurrent environment variable modification let _guard = ENV_MUTEX.lock().unwrap(); // Clean up any existing environment variables first unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("METRICS_STATSD_HOST"); env::remove_var("METRICS_PREFIX"); env::remove_var("METRICS_TAGS"); } // Set up environment for statsd adapter unsafe { env::set_var("HTTP_EXTERNAL", "test.example.com"); env::set_var("METRICS_ADAPTER", "statsd"); env::set_var("METRICS_STATSD_HOST", "localhost:8125"); env::set_var("METRICS_PREFIX", "test"); env::set_var("METRICS_TAGS", "env:test,service:quickdid"); } let config = Config::from_env().unwrap(); let metrics = create_metrics_publisher(&config).unwrap(); // Should create successfully assert!(Arc::strong_count(&metrics) == 1); // Clean up unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("METRICS_STATSD_HOST"); env::remove_var("METRICS_PREFIX"); env::remove_var("METRICS_TAGS"); env::remove_var("HTTP_EXTERNAL"); } } #[test] fn test_missing_statsd_host() { use std::env; // Lock mutex to prevent concurrent environment variable modification let _guard = ENV_MUTEX.lock().unwrap(); // Clean up any existing environment variables first unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("METRICS_STATSD_HOST"); env::remove_var("METRICS_PREFIX"); env::remove_var("METRICS_TAGS"); } // Set up environment for statsd adapter without host unsafe { env::set_var("HTTP_EXTERNAL", "test.example.com"); env::set_var("METRICS_ADAPTER", "statsd"); env::remove_var("METRICS_STATSD_HOST"); } let config = Config::from_env().unwrap(); let result = create_metrics_publisher(&config); // Should fail with invalid config error assert!(result.is_err()); if let Err(e) = result { assert!(matches!(e, MetricsError::InvalidConfig(_))); } // Clean up unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("HTTP_EXTERNAL"); } } #[test] fn test_invalid_adapter() { use std::env; // Lock mutex to prevent concurrent environment variable modification let _guard = ENV_MUTEX.lock().unwrap(); // Clean up any existing environment variables first unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("METRICS_STATSD_HOST"); env::remove_var("METRICS_PREFIX"); env::remove_var("METRICS_TAGS"); } // Set up environment with invalid adapter unsafe { env::set_var("HTTP_EXTERNAL", "test.example.com"); env::set_var("METRICS_ADAPTER", "invalid"); env::remove_var("METRICS_STATSD_HOST"); // Clean up from other tests } let config = Config::from_env().unwrap(); // Config validation should catch this let validation_result = config.validate(); assert!(validation_result.is_err()); // Clean up unsafe { env::remove_var("METRICS_ADAPTER"); env::remove_var("HTTP_EXTERNAL"); } } }