AT-based link agregator. Mirror of https://github.com/likeandscribe/frontpage

Measure actual lag

tom.sherman.is b9516b2f 153236dd

verified
+33 -7
+1
Cargo.lock
··· 542 dependencies = [ 543 "anyhow", 544 "bincode", 545 "jetstream", 546 "log", 547 "serde",
··· 542 dependencies = [ 543 "anyhow", 544 "bincode", 545 + "chrono", 546 "jetstream", 547 "log", 548 "serde",
+1
packages-rs/drainpipe-store/Cargo.toml
··· 6 [dependencies] 7 anyhow = "1.0.93" 8 bincode = "1.3.3" 9 jetstream = { path = "../jetstream" } 10 log = "0.4.22" 11 serde = { version = "1.0.215", features = ["derive"] }
··· 6 [dependencies] 7 anyhow = "1.0.93" 8 bincode = "1.3.3" 9 + chrono = "0.4.42" 10 jetstream = { path = "../jetstream" } 11 log = "0.4.22" 12 serde = { version = "1.0.215", features = ["derive"] }
+26 -1
packages-rs/drainpipe-store/src/lib.rs
··· 14 #[derive(Serialize, Deserialize, Debug)] 15 enum CursorInner { 16 V1(u64), 17 } 18 19 #[derive(Serialize, Deserialize, Debug)] ··· 21 22 impl Cursor { 23 pub fn new(value: u64) -> Self { 24 - Self(CursorInner::V1(value)) 25 } 26 27 pub fn value(&self) -> u64 { 28 match self.0 { 29 CursorInner::V1(value) => value, 30 } 31 } 32 } ··· 86 .map(|cursor| cursor.value()) 87 }) 88 .transpose() 89 } 90 91 fn record_dead_letter(&self, commit_json: String, error_message: String) -> anyhow::Result<()> {
··· 14 #[derive(Serialize, Deserialize, Debug)] 15 enum CursorInner { 16 V1(u64), 17 + V2 { value: u64, recorded_at: i64 }, 18 } 19 20 #[derive(Serialize, Deserialize, Debug)] ··· 22 23 impl Cursor { 24 pub fn new(value: u64) -> Self { 25 + Self(CursorInner::V2 { 26 + value, 27 + recorded_at: chrono::Utc::now().timestamp_micros(), 28 + }) 29 } 30 31 pub fn value(&self) -> u64 { 32 match self.0 { 33 CursorInner::V1(value) => value, 34 + CursorInner::V2 { value, .. } => value, 35 + } 36 + } 37 + 38 + pub fn lag_micros(&self) -> Option<i64> { 39 + match self.0 { 40 + CursorInner::V1(_) => None, 41 + CursorInner::V2 { recorded_at, value } => Some(recorded_at - value as i64), 42 } 43 } 44 } ··· 98 .map(|cursor| cursor.value()) 99 }) 100 .transpose() 101 + } 102 + 103 + pub fn get_cursor_lag_micros(&self) -> anyhow::Result<Option<i64>> { 104 + self.cursor_tree 105 + .get("cursor") 106 + .context("Failed to get cursor")? 107 + .map(|cursor_bytes| { 108 + bincode::deserialize::<Cursor>(&cursor_bytes) 109 + .context("Failed to deserialize cursor") 110 + .map(|c| c.lag_micros()) 111 + }) 112 + .transpose() 113 + .map(|l| l.flatten()) 114 } 115 116 fn record_dead_letter(&self, commit_json: String, error_message: String) -> anyhow::Result<()> {
+5 -6
packages-rs/drainpipe/src/main.rs
··· 70 let store = store.clone(); 71 tokio::spawn(async move { 72 for interval in metrics_monitor.intervals() { 73 - let lag_output = store.get_cursor().unwrap_or(None).and_then(|c| { 74 - let cursor_time = Utc.timestamp_micros(c as i64).earliest()?; 75 - let now = Utc::now(); 76 - let lag = now.signed_duration_since(cursor_time); 77 - Some(format!("Cursor lag: {}ms", lag.num_milliseconds())) 78 - }); 79 if let Some(lag) = lag_output { 80 log::info!( 81 "{:?} per second; {}",
··· 70 let store = store.clone(); 71 tokio::spawn(async move { 72 for interval in metrics_monitor.intervals() { 73 + let lag_output = store 74 + .get_cursor_lag_micros() 75 + .ok() 76 + .flatten() 77 + .map(|lag| format!("Cursor lag: {}ms", lag / 1000)); 78 if let Some(lag) = lag_output { 79 log::info!( 80 "{:?} per second; {}",