Server tools to backfill, tail, mirror, and verify PLC logs
at debug 439 lines 13 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey}; 2use reqwest::Url; 3use std::time::Duration; 4use thiserror::Error; 5use tokio::sync::mpsc; 6 7// plc.directory ratelimit on /export is 500 per 5 mins 8const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 9 10#[derive(Debug, Error)] 11pub enum GetPageError { 12 #[error(transparent)] 13 Reqwest(#[from] reqwest::Error), 14 #[error(transparent)] 15 ReqwestMiddleware(#[from] reqwest_middleware::Error), 16 #[error(transparent)] 17 Serde(#[from] serde_json::Error), 18} 19 20/// ops are primary-keyed by (did, cid) 21/// plc orders by `created_at` but does not guarantee distinct times per op 22/// we assume that the order will at least be deterministic: this may be unsound 23#[derive(Debug, PartialEq)] 24pub struct LastOp { 25 pub created_at: Dt, // any op greater is definitely not duplicated 26 pk: (String, String), // did, cid 27} 28 29impl From<Op> for LastOp { 30 fn from(op: Op) -> Self { 31 Self { 32 created_at: op.created_at, 33 pk: (op.did, op.cid), 34 } 35 } 36} 37 38impl From<&Op> for LastOp { 39 fn from(op: &Op) -> Self { 40 Self { 41 created_at: op.created_at, 42 pk: (op.did.clone(), op.cid.clone()), 43 } 44 } 45} 46 47// bit of a hack 48impl From<Dt> for LastOp { 49 fn from(dt: Dt) -> Self { 50 Self { 51 created_at: dt, 52 pk: ("".to_string(), "".to_string()), 53 } 54 } 55} 56 57/// PLC 58#[derive(Debug, PartialEq)] 59pub struct PageBoundaryState { 60 pub last_at: Dt, 61 keys_at: Vec<OpKey>, // expected to ~always be length one 62} 63 64/// track keys at final createdAt to deduplicate the start of the next page 65impl PageBoundaryState { 66 pub fn new(page: &ExportPage) -> Option<Self> { 67 // grab the very last op 68 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 69 70 // set initial state 71 let mut me = Self { 72 last_at, 73 keys_at: vec![last_key], 74 }; 75 76 // and make sure all keys at this time are captured from the back 77 me.capture_nth_last_at(page, last_at, 1); 78 79 Some(me) 80 } 81 fn apply_to_next(&mut self, page: &mut ExportPage) { 82 // walk ops forward, kicking previously-seen ops until created_at advances 83 let to_remove: Vec<usize> = page 84 .ops 85 .iter() 86 .enumerate() 87 .take_while(|(_, op)| op.created_at == self.last_at) 88 .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 89 .map(|(i, _)| i) 90 .collect(); 91 92 // actually remove them. last to first so indices don't shift 93 for dup_idx in to_remove.into_iter().rev() { 94 page.ops.remove(dup_idx); 95 } 96 97 // grab the very last op 98 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 99 // there are no ops left? oop. bail. 100 // last_at and existing keys remain in tact 101 return; 102 }; 103 104 // reset state (as long as time actually moved forward on this page) 105 if last_at > self.last_at { 106 self.last_at = last_at; 107 self.keys_at = vec![last_key]; 108 } else { 109 // weird cases: either time didn't move (fine...) or went backwards (not fine) 110 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 111 self.keys_at.push(last_key); 112 } 113 // and make sure all keys at this time are captured from the back 114 self.capture_nth_last_at(page, last_at, 1); 115 } 116 117 /// walk backwards from 2nd last and collect keys until created_at changes 118 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) { 119 page.ops 120 .iter() 121 .rev() 122 .skip(skips) 123 .take_while(|op| op.created_at == last_at) 124 .for_each(|op| { 125 self.keys_at.push(op.into()); 126 }); 127 } 128} 129 130pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 131 log::trace!("Getting page: {url}"); 132 133 let ops: Vec<Op> = CLIENT 134 .get(url) 135 .send() 136 .await? 137 .error_for_status()? 138 .text() 139 .await? 140 .trim() 141 .split('\n') 142 .filter_map(|s| { 143 serde_json::from_str::<Op>(s) 144 .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 145 .ok() 146 }) 147 .collect(); 148 149 let last_op = ops.last().map(Into::into); 150 151 Ok((ExportPage { ops }, last_op)) 152} 153 154pub async fn poll_upstream( 155 after: Option<Dt>, 156 base: Url, 157 dest: mpsc::Sender<ExportPage>, 158) -> anyhow::Result<&'static str> { 159 log::info!("starting upstream poller after {after:?}"); 160 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 161 let mut prev_last: Option<LastOp> = after.map(Into::into); 162 let mut boundary_state: Option<PageBoundaryState> = None; 163 loop { 164 tick.tick().await; 165 166 let mut url = base.clone(); 167 if let Some(ref pl) = prev_last { 168 url.query_pairs_mut() 169 .append_pair("after", &pl.created_at.to_rfc3339()); 170 }; 171 172 let (mut page, next_last) = get_page(url).await?; 173 if let Some(ref mut state) = boundary_state { 174 state.apply_to_next(&mut page); 175 } else { 176 boundary_state = PageBoundaryState::new(&page); 177 } 178 if !page.is_empty() { 179 match dest.try_send(page) { 180 Ok(()) => {} 181 Err(mpsc::error::TrySendError::Full(page)) => { 182 log::warn!("export: destination channel full, awaiting..."); 183 dest.send(page).await?; 184 } 185 e => e?, 186 }; 187 } 188 189 prev_last = next_last.or(prev_last); 190 } 191} 192 193#[cfg(test)] 194mod test { 195 use super::*; 196 197 const FIVES_TS: i64 = 1431648000; 198 const NEXT_TS: i64 = 1431648001; 199 200 fn valid_op() -> Op { 201 serde_json::from_value(serde_json::json!({ 202 "did": "did", 203 "cid": "cid", 204 "createdAt": "2015-05-15T00:00:00Z", 205 "nullified": false, 206 "operation": {}, 207 })) 208 .unwrap() 209 } 210 211 fn next_op() -> Op { 212 serde_json::from_value(serde_json::json!({ 213 "did": "didnext", 214 "cid": "cidnext", 215 "createdAt": "2015-05-15T00:00:01Z", 216 "nullified": false, 217 "operation": {}, 218 })) 219 .unwrap() 220 } 221 222 fn base_state() -> PageBoundaryState { 223 let page = ExportPage { 224 ops: vec![valid_op()], 225 }; 226 PageBoundaryState::new(&page).expect("to have a base page boundary state") 227 } 228 229 #[test] 230 fn test_boundary_new_empty() { 231 let page = ExportPage { ops: vec![] }; 232 let state = PageBoundaryState::new(&page); 233 assert!(state.is_none()); 234 } 235 236 #[test] 237 fn test_boundary_new_one_op() { 238 let page = ExportPage { 239 ops: vec![valid_op()], 240 }; 241 let state = PageBoundaryState::new(&page).unwrap(); 242 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 243 assert_eq!( 244 state.keys_at, 245 vec![OpKey { 246 cid: "cid".to_string(), 247 did: "did".to_string(), 248 }] 249 ); 250 } 251 252 #[test] 253 fn test_add_new_empty() { 254 let mut state = base_state(); 255 state.apply_to_next(&mut ExportPage { ops: vec![] }); 256 assert_eq!(state, base_state()); 257 } 258 259 #[test] 260 fn test_add_new_same_op() { 261 let mut page = ExportPage { 262 ops: vec![valid_op()], 263 }; 264 let mut state = base_state(); 265 state.apply_to_next(&mut page); 266 assert_eq!(state, base_state()); 267 } 268 269 #[test] 270 fn test_add_new_same_time() { 271 // make an op with a different OpKey 272 let mut op = valid_op(); 273 op.cid = "cid2".to_string(); 274 let mut page = ExportPage { ops: vec![op] }; 275 276 let mut state = base_state(); 277 state.apply_to_next(&mut page); 278 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 279 assert_eq!( 280 state.keys_at, 281 vec![ 282 OpKey { 283 cid: "cid".to_string(), 284 did: "did".to_string(), 285 }, 286 OpKey { 287 cid: "cid2".to_string(), 288 did: "did".to_string(), 289 }, 290 ] 291 ); 292 } 293 294 #[test] 295 fn test_add_new_same_time_dup_before() { 296 // make an op with a different OpKey 297 let mut op = valid_op(); 298 op.cid = "cid2".to_string(); 299 let mut page = ExportPage { 300 ops: vec![valid_op(), op], 301 }; 302 303 let mut state = base_state(); 304 state.apply_to_next(&mut page); 305 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 306 assert_eq!( 307 state.keys_at, 308 vec![ 309 OpKey { 310 cid: "cid".to_string(), 311 did: "did".to_string(), 312 }, 313 OpKey { 314 cid: "cid2".to_string(), 315 did: "did".to_string(), 316 }, 317 ] 318 ); 319 } 320 321 #[test] 322 fn test_add_new_same_time_dup_after() { 323 // make an op with a different OpKey 324 let mut op = valid_op(); 325 op.cid = "cid2".to_string(); 326 let mut page = ExportPage { 327 ops: vec![op, valid_op()], 328 }; 329 330 let mut state = base_state(); 331 state.apply_to_next(&mut page); 332 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 333 assert_eq!( 334 state.keys_at, 335 vec![ 336 OpKey { 337 cid: "cid".to_string(), 338 did: "did".to_string(), 339 }, 340 OpKey { 341 cid: "cid2".to_string(), 342 did: "did".to_string(), 343 }, 344 ] 345 ); 346 } 347 348 #[test] 349 fn test_add_new_next_time() { 350 let mut page = ExportPage { 351 ops: vec![next_op()], 352 }; 353 let mut state = base_state(); 354 state.apply_to_next(&mut page); 355 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 356 assert_eq!( 357 state.keys_at, 358 vec![OpKey { 359 cid: "cidnext".to_string(), 360 did: "didnext".to_string(), 361 },] 362 ); 363 } 364 365 #[test] 366 fn test_add_new_next_time_with_dup() { 367 let mut page = ExportPage { 368 ops: vec![valid_op(), next_op()], 369 }; 370 let mut state = base_state(); 371 state.apply_to_next(&mut page); 372 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 373 assert_eq!( 374 state.keys_at, 375 vec![OpKey { 376 cid: "cidnext".to_string(), 377 did: "didnext".to_string(), 378 },] 379 ); 380 assert_eq!(page.ops.len(), 1); 381 assert_eq!(page.ops[0], next_op()); 382 } 383 384 #[test] 385 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 386 // make an op with a different OpKey 387 let mut op = valid_op(); 388 op.cid = "cid2".to_string(); 389 390 let mut page = ExportPage { 391 ops: vec![ 392 valid_op(), // should get dropped 393 op.clone(), // should be kept 394 next_op(), 395 ], 396 }; 397 let mut state = base_state(); 398 state.apply_to_next(&mut page); 399 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 400 assert_eq!( 401 state.keys_at, 402 vec![OpKey { 403 cid: "cidnext".to_string(), 404 did: "didnext".to_string(), 405 },] 406 ); 407 assert_eq!(page.ops.len(), 2); 408 assert_eq!(page.ops[0], op); 409 assert_eq!(page.ops[1], next_op()); 410 } 411 412 #[test] 413 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 414 // make an op with a different OpKey 415 let mut op = valid_op(); 416 op.cid = "cid2".to_string(); 417 418 let mut page = ExportPage { 419 ops: vec![ 420 op.clone(), // should be kept 421 valid_op(), // should get dropped 422 next_op(), 423 ], 424 }; 425 let mut state = base_state(); 426 state.apply_to_next(&mut page); 427 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 428 assert_eq!( 429 state.keys_at, 430 vec![OpKey { 431 cid: "cidnext".to_string(), 432 did: "didnext".to_string(), 433 },] 434 ); 435 assert_eq!(page.ops.len(), 2); 436 assert_eq!(page.ops[0], op); 437 assert_eq!(page.ops[1], next_op()); 438 } 439}