QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 547 lines 18 kB view raw
1use crate::config::Config; 2use async_trait::async_trait; 3use cadence::{ 4 BufferedUdpMetricSink, Counted, CountedExt, Gauged, Metric, QueuingMetricSink, StatsdClient, 5 Timed, 6}; 7use std::net::UdpSocket; 8use std::sync::Arc; 9use thiserror::Error; 10use tracing::{debug, error}; 11 12/// Trait for publishing metrics with counter and gauge support 13/// Designed for minimal compatibility with cadence-style metrics 14#[async_trait] 15pub trait MetricsPublisher: Send + Sync { 16 /// Increment a counter by 1 17 async fn incr(&self, key: &str); 18 19 /// Increment a counter by a specific value 20 async fn count(&self, key: &str, value: u64); 21 22 /// Increment a counter with tags 23 async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]); 24 25 /// Increment a counter by a specific value with tags 26 async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]); 27 28 /// Record a gauge value 29 async fn gauge(&self, key: &str, value: u64); 30 31 /// Record a gauge value with tags 32 async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]); 33 34 /// Record a timing in milliseconds 35 async fn time(&self, key: &str, millis: u64); 36 37 /// Record a timing with tags 38 async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]); 39} 40 41/// No-op implementation for development and testing 42#[derive(Debug, Clone, Default)] 43pub struct NoOpMetricsPublisher; 44 45impl NoOpMetricsPublisher { 46 pub fn new() -> Self { 47 Self 48 } 49} 50 51#[async_trait] 52impl MetricsPublisher for NoOpMetricsPublisher { 53 async fn incr(&self, _key: &str) { 54 // No-op 55 } 56 57 async fn count(&self, _key: &str, _value: u64) { 58 // No-op 59 } 60 61 async fn incr_with_tags(&self, _key: &str, _tags: &[(&str, &str)]) { 62 // No-op 63 } 64 65 async fn count_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) { 66 // No-op 67 } 68 69 async fn gauge(&self, _key: &str, _value: u64) { 70 // No-op 71 } 72 73 async fn gauge_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) { 74 // No-op 75 } 76 77 async fn time(&self, _key: &str, _millis: u64) { 78 // No-op 79 } 80 81 async fn time_with_tags(&self, _key: &str, _millis: u64, _tags: &[(&str, &str)]) { 82 // No-op 83 } 84} 85 86/// Statsd-backed metrics publisher using cadence 87pub struct StatsdMetricsPublisher { 88 client: StatsdClient, 89 default_tags: Vec<(String, String)>, 90} 91 92impl StatsdMetricsPublisher { 93 /// Create a new StatsdMetricsPublisher with default configuration 94 pub fn new(host: &str, prefix: &str) -> Result<Self, Box<dyn std::error::Error>> { 95 Self::new_with_bind(host, prefix, "[::]:0") 96 } 97 98 /// Create a new StatsdMetricsPublisher with custom bind address 99 pub fn new_with_bind( 100 host: &str, 101 prefix: &str, 102 bind_addr: &str, 103 ) -> Result<Self, Box<dyn std::error::Error>> { 104 Self::new_with_bind_and_tags(host, prefix, bind_addr, vec![]) 105 } 106 107 /// Create a new StatsdMetricsPublisher with default tags 108 pub fn new_with_tags( 109 host: &str, 110 prefix: &str, 111 default_tags: Vec<(String, String)>, 112 ) -> Result<Self, Box<dyn std::error::Error>> { 113 Self::new_with_bind_and_tags(host, prefix, "[::]:0", default_tags) 114 } 115 116 /// Create a new StatsdMetricsPublisher with custom bind address and tags 117 pub fn new_with_bind_and_tags( 118 host: &str, 119 prefix: &str, 120 bind_addr: &str, 121 default_tags: Vec<(String, String)>, 122 ) -> Result<Self, Box<dyn std::error::Error>> { 123 tracing::info!( 124 "Creating StatsdMetricsPublisher: host={}, prefix={}, bind={}, tags={:?}", 125 host, 126 prefix, 127 bind_addr, 128 default_tags 129 ); 130 131 let socket = UdpSocket::bind(bind_addr)?; 132 socket.set_nonblocking(true)?; 133 134 let buffered_sink = BufferedUdpMetricSink::from(host, socket)?; 135 let queuing_sink = QueuingMetricSink::builder() 136 .with_error_handler(move |error| { 137 error!("Failed to send metric via sink: {}", error); 138 }) 139 .build(buffered_sink); 140 let client = StatsdClient::from_sink(prefix, queuing_sink); 141 142 tracing::info!( 143 "StatsdMetricsPublisher created successfully with bind address: {}", 144 bind_addr 145 ); 146 Ok(Self { 147 client, 148 default_tags, 149 }) 150 } 151 152 /// Create from an existing StatsdClient 153 pub fn from_client(client: StatsdClient) -> Self { 154 Self::from_client_with_tags(client, vec![]) 155 } 156 157 /// Create from an existing StatsdClient with default tags 158 pub fn from_client_with_tags( 159 client: StatsdClient, 160 default_tags: Vec<(String, String)>, 161 ) -> Self { 162 Self { 163 client, 164 default_tags, 165 } 166 } 167 168 /// Apply default tags to a builder 169 fn apply_default_tags<'a, M>( 170 &'a self, 171 mut builder: cadence::MetricBuilder<'a, 'a, M>, 172 ) -> cadence::MetricBuilder<'a, 'a, M> 173 where 174 M: Metric + From<String>, 175 { 176 for (k, v) in &self.default_tags { 177 builder = builder.with_tag(k.as_str(), v.as_str()); 178 } 179 builder 180 } 181} 182 183#[async_trait] 184impl MetricsPublisher for StatsdMetricsPublisher { 185 async fn incr(&self, key: &str) { 186 debug!("Sending metric incr: {}", key); 187 if self.default_tags.is_empty() { 188 match self.client.incr(key) { 189 Ok(_) => debug!("Successfully sent metric: {}", key), 190 Err(e) => error!("Failed to send metric {}: {}", key, e), 191 } 192 } else { 193 let builder = self.client.incr_with_tags(key); 194 let builder = self.apply_default_tags(builder); 195 let _ = builder.send(); 196 debug!("Sent metric with tags: {}", key); 197 } 198 } 199 200 async fn count(&self, key: &str, value: u64) { 201 if self.default_tags.is_empty() { 202 let _ = self.client.count(key, value); 203 } else { 204 let builder = self.client.count_with_tags(key, value); 205 let builder = self.apply_default_tags(builder); 206 let _ = builder.send(); 207 } 208 } 209 210 async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]) { 211 let mut builder = self.client.incr_with_tags(key); 212 builder = self.apply_default_tags(builder); 213 for (k, v) in tags { 214 builder = builder.with_tag(k, v); 215 } 216 let _ = builder.send(); 217 } 218 219 async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) { 220 let mut builder = self.client.count_with_tags(key, value); 221 builder = self.apply_default_tags(builder); 222 for (k, v) in tags { 223 builder = builder.with_tag(k, v); 224 } 225 let _ = builder.send(); 226 } 227 228 async fn gauge(&self, key: &str, value: u64) { 229 debug!("Sending metric gauge: {} = {}", key, value); 230 if self.default_tags.is_empty() { 231 match self.client.gauge(key, value) { 232 Ok(_) => debug!("Successfully sent gauge: {} = {}", key, value), 233 Err(e) => error!("Failed to send gauge {} = {}: {}", key, value, e), 234 } 235 } else { 236 let builder = self.client.gauge_with_tags(key, value); 237 let builder = self.apply_default_tags(builder); 238 builder.send(); 239 debug!("Sent gauge with tags: {} = {}", key, value); 240 } 241 } 242 243 async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) { 244 let mut builder = self.client.gauge_with_tags(key, value); 245 builder = self.apply_default_tags(builder); 246 for (k, v) in tags { 247 builder = builder.with_tag(k, v); 248 } 249 let _ = builder.send(); 250 } 251 252 async fn time(&self, key: &str, millis: u64) { 253 if self.default_tags.is_empty() { 254 let _ = self.client.time(key, millis); 255 } else { 256 let builder = self.client.time_with_tags(key, millis); 257 let builder = self.apply_default_tags(builder); 258 let _ = builder.send(); 259 } 260 } 261 262 async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]) { 263 let mut builder = self.client.time_with_tags(key, millis); 264 builder = self.apply_default_tags(builder); 265 for (k, v) in tags { 266 builder = builder.with_tag(k, v); 267 } 268 let _ = builder.send(); 269 } 270} 271 272/// Type alias for shared metrics publisher 273pub type SharedMetricsPublisher = Arc<dyn MetricsPublisher>; 274 275/// Metrics-specific errors 276#[derive(Debug, Error)] 277pub enum MetricsError { 278 /// Failed to create metrics publisher 279 #[error("error-quickdid-metrics-1 Failed to create metrics publisher: {0}")] 280 CreationFailed(String), 281 282 /// Invalid configuration for metrics 283 #[error("error-quickdid-metrics-2 Invalid metrics configuration: {0}")] 284 InvalidConfig(String), 285} 286 287/// Create a metrics publisher based on configuration 288/// 289/// Returns either a no-op publisher or a StatsD publisher based on the 290/// `metrics_adapter` configuration value. 291/// 292/// ## Example 293/// 294/// ```rust,no_run 295/// use quickdid::config::Config; 296/// use quickdid::metrics::create_metrics_publisher; 297/// 298/// # async fn example() -> Result<(), Box<dyn std::error::Error>> { 299/// let config = Config::from_env()?; 300/// let metrics = create_metrics_publisher(&config)?; 301/// 302/// // Use the metrics publisher 303/// metrics.incr("request.count").await; 304/// # Ok(()) 305/// # } 306/// ``` 307pub fn create_metrics_publisher(config: &Config) -> Result<SharedMetricsPublisher, MetricsError> { 308 match config.metrics_adapter.as_str() { 309 "noop" => Ok(Arc::new(NoOpMetricsPublisher::new())), 310 "statsd" => { 311 let host = config.metrics_statsd_host.as_ref().ok_or_else(|| { 312 MetricsError::InvalidConfig( 313 "METRICS_STATSD_HOST is required when using statsd adapter".to_string(), 314 ) 315 })?; 316 317 // Parse tags from comma-separated key:value pairs 318 let default_tags = if let Some(tags_str) = &config.metrics_tags { 319 tags_str 320 .split(',') 321 .filter_map(|tag| { 322 let parts: Vec<&str> = tag.trim().split(':').collect(); 323 if parts.len() == 2 { 324 Some((parts[0].to_string(), parts[1].to_string())) 325 } else { 326 error!("Invalid tag format: {}", tag); 327 None 328 } 329 }) 330 .collect() 331 } else { 332 vec![] 333 }; 334 335 let publisher = StatsdMetricsPublisher::new_with_bind_and_tags( 336 host, 337 &config.metrics_prefix, 338 &config.metrics_statsd_bind, 339 default_tags, 340 ) 341 .map_err(|e| MetricsError::CreationFailed(e.to_string()))?; 342 343 Ok(Arc::new(publisher)) 344 } 345 _ => Err(MetricsError::InvalidConfig(format!( 346 "Unknown metrics adapter: {}", 347 config.metrics_adapter 348 ))), 349 } 350} 351 352#[cfg(test)] 353mod tests { 354 use super::*; 355 use once_cell::sync::Lazy; 356 use std::sync::Mutex; 357 358 // Use a mutex to serialize tests that modify environment variables 359 static ENV_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(())); 360 361 #[tokio::test] 362 async fn test_noop_metrics() { 363 let metrics = NoOpMetricsPublisher::new(); 364 365 // These should all be no-ops and not panic 366 metrics.incr("test.counter").await; 367 metrics.count("test.counter", 5).await; 368 metrics 369 .incr_with_tags("test.counter", &[("env", "test")]) 370 .await; 371 metrics 372 .count_with_tags( 373 "test.counter", 374 10, 375 &[("env", "test"), ("service", "quickdid")], 376 ) 377 .await; 378 metrics.gauge("test.gauge", 100).await; 379 metrics 380 .gauge_with_tags("test.gauge", 200, &[("host", "localhost")]) 381 .await; 382 metrics.time("test.timing", 42).await; 383 metrics 384 .time_with_tags("test.timing", 84, &[("endpoint", "/resolve")]) 385 .await; 386 } 387 388 #[tokio::test] 389 async fn test_shared_metrics() { 390 let metrics: SharedMetricsPublisher = Arc::new(NoOpMetricsPublisher::new()); 391 392 // Verify it can be used as a shared reference 393 metrics.incr("shared.counter").await; 394 metrics.gauge("shared.gauge", 50).await; 395 396 // Verify it can be cloned 397 let metrics2 = Arc::clone(&metrics); 398 metrics2.count("cloned.counter", 3).await; 399 } 400 401 #[test] 402 fn test_create_noop_publisher() { 403 use std::env; 404 405 // Lock mutex to prevent concurrent environment variable modification 406 let _guard = ENV_MUTEX.lock().unwrap(); 407 408 // Clean up any existing environment variables first 409 unsafe { 410 env::remove_var("METRICS_ADAPTER"); 411 env::remove_var("METRICS_STATSD_HOST"); 412 env::remove_var("METRICS_PREFIX"); 413 env::remove_var("METRICS_TAGS"); 414 } 415 416 // Set up environment for noop adapter 417 unsafe { 418 env::set_var("HTTP_EXTERNAL", "test.example.com"); 419 env::set_var("METRICS_ADAPTER", "noop"); 420 } 421 422 let config = Config::from_env().unwrap(); 423 let metrics = create_metrics_publisher(&config).unwrap(); 424 425 // Should create successfully - actual type checking happens at compile time 426 assert!(Arc::strong_count(&metrics) == 1); 427 428 // Clean up 429 unsafe { 430 env::remove_var("METRICS_ADAPTER"); 431 env::remove_var("HTTP_EXTERNAL"); 432 } 433 } 434 435 #[test] 436 fn test_create_statsd_publisher() { 437 use std::env; 438 439 // Lock mutex to prevent concurrent environment variable modification 440 let _guard = ENV_MUTEX.lock().unwrap(); 441 442 // Clean up any existing environment variables first 443 unsafe { 444 env::remove_var("METRICS_ADAPTER"); 445 env::remove_var("METRICS_STATSD_HOST"); 446 env::remove_var("METRICS_PREFIX"); 447 env::remove_var("METRICS_TAGS"); 448 } 449 450 // Set up environment for statsd adapter 451 unsafe { 452 env::set_var("HTTP_EXTERNAL", "test.example.com"); 453 env::set_var("METRICS_ADAPTER", "statsd"); 454 env::set_var("METRICS_STATSD_HOST", "localhost:8125"); 455 env::set_var("METRICS_PREFIX", "test"); 456 env::set_var("METRICS_TAGS", "env:test,service:quickdid"); 457 } 458 459 let config = Config::from_env().unwrap(); 460 let metrics = create_metrics_publisher(&config).unwrap(); 461 462 // Should create successfully 463 assert!(Arc::strong_count(&metrics) == 1); 464 465 // Clean up 466 unsafe { 467 env::remove_var("METRICS_ADAPTER"); 468 env::remove_var("METRICS_STATSD_HOST"); 469 env::remove_var("METRICS_PREFIX"); 470 env::remove_var("METRICS_TAGS"); 471 env::remove_var("HTTP_EXTERNAL"); 472 } 473 } 474 475 #[test] 476 fn test_missing_statsd_host() { 477 use std::env; 478 479 // Lock mutex to prevent concurrent environment variable modification 480 let _guard = ENV_MUTEX.lock().unwrap(); 481 482 // Clean up any existing environment variables first 483 unsafe { 484 env::remove_var("METRICS_ADAPTER"); 485 env::remove_var("METRICS_STATSD_HOST"); 486 env::remove_var("METRICS_PREFIX"); 487 env::remove_var("METRICS_TAGS"); 488 } 489 490 // Set up environment for statsd adapter without host 491 unsafe { 492 env::set_var("HTTP_EXTERNAL", "test.example.com"); 493 env::set_var("METRICS_ADAPTER", "statsd"); 494 env::remove_var("METRICS_STATSD_HOST"); 495 } 496 497 let config = Config::from_env().unwrap(); 498 let result = create_metrics_publisher(&config); 499 500 // Should fail with invalid config error 501 assert!(result.is_err()); 502 if let Err(e) = result { 503 assert!(matches!(e, MetricsError::InvalidConfig(_))); 504 } 505 506 // Clean up 507 unsafe { 508 env::remove_var("METRICS_ADAPTER"); 509 env::remove_var("HTTP_EXTERNAL"); 510 } 511 } 512 513 #[test] 514 fn test_invalid_adapter() { 515 use std::env; 516 517 // Lock mutex to prevent concurrent environment variable modification 518 let _guard = ENV_MUTEX.lock().unwrap(); 519 520 // Clean up any existing environment variables first 521 unsafe { 522 env::remove_var("METRICS_ADAPTER"); 523 env::remove_var("METRICS_STATSD_HOST"); 524 env::remove_var("METRICS_PREFIX"); 525 env::remove_var("METRICS_TAGS"); 526 } 527 528 // Set up environment with invalid adapter 529 unsafe { 530 env::set_var("HTTP_EXTERNAL", "test.example.com"); 531 env::set_var("METRICS_ADAPTER", "invalid"); 532 env::remove_var("METRICS_STATSD_HOST"); // Clean up from other tests 533 } 534 535 let config = Config::from_env().unwrap(); 536 537 // Config validation should catch this 538 let validation_result = config.validate(); 539 assert!(validation_result.is_err()); 540 541 // Clean up 542 unsafe { 543 env::remove_var("METRICS_ADAPTER"); 544 env::remove_var("HTTP_EXTERNAL"); 545 } 546 } 547}