APIs for links and references in the ATmosphere

metrics + less logging for slow inserts

+13 -4
+13 -4
ufos/src/storage.rs
··· 5 5 }; 6 6 use async_trait::async_trait; 7 7 use jetstream::exports::{Did, Nsid}; 8 + use metrics::{describe_histogram, histogram, Unit}; 8 9 use std::collections::{HashMap, HashSet}; 9 10 use std::path::Path; 10 - use std::time::{Duration, SystemTime}; 11 + use std::time::{Duration, Instant}; 11 12 use tokio::sync::mpsc::Receiver; 12 13 use tokio_util::sync::CancellationToken; 13 14 ··· 35 36 self, 36 37 mut batches: Receiver<EventBatch<LIMIT>>, 37 38 ) -> StorageResult<()> { 39 + describe_histogram!( 40 + "storage_slow_batches", 41 + Unit::Microseconds, 42 + "batches that took more than 3s to insert" 43 + ); 38 44 while let Some(event_batch) = batches.recv().await { 39 45 let token = CancellationToken::new(); 40 46 let cancelled = token.clone(); 41 47 tokio::spawn(async move { 42 - let started = SystemTime::now(); 48 + let started = Instant::now(); 43 49 let mut concerned = false; 44 50 loop { 45 51 tokio::select! { 46 - _ = tokio::time::sleep(Duration::from_secs_f64(3.)) => { 47 - log::warn!("taking a long time to insert an event batch ({:?})...", started.elapsed()); 52 + _ = tokio::time::sleep(Duration::from_secs(3)) => { 53 + if !concerned { 54 + log::warn!("taking a long time to insert an event batch..."); 55 + } 48 56 concerned = true; 49 57 } 50 58 _ = cancelled.cancelled() => { 51 59 if concerned { 52 60 log::warn!("finally inserted slow event batch (or failed) after {:?}", started.elapsed()); 61 + histogram!("storage_slow_batches").record(started.elapsed().as_micros() as f64); 53 62 } 54 63 break 55 64 }