Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey};
2use reqwest::Url;
3use std::time::Duration;
4use thiserror::Error;
5use tokio::sync::mpsc;
6
7// plc.directory ratelimit on /export is 500 per 5 mins
8const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600);
9
10#[derive(Debug, Error)]
11pub enum GetPageError {
12 #[error(transparent)]
13 Reqwest(#[from] reqwest::Error),
14 #[error(transparent)]
15 ReqwestMiddleware(#[from] reqwest_middleware::Error),
16 #[error(transparent)]
17 Serde(#[from] serde_json::Error),
18}
19
20/// ops are primary-keyed by (did, cid)
21/// plc orders by `created_at` but does not guarantee distinct times per op
22/// we assume that the order will at least be deterministic: this may be unsound
23#[derive(Debug, PartialEq)]
24pub struct LastOp {
25 pub created_at: Dt, // any op greater is definitely not duplicated
26 pk: (String, String), // did, cid
27}
28
29impl From<Op> for LastOp {
30 fn from(op: Op) -> Self {
31 Self {
32 created_at: op.created_at,
33 pk: (op.did, op.cid),
34 }
35 }
36}
37
38impl From<&Op> for LastOp {
39 fn from(op: &Op) -> Self {
40 Self {
41 created_at: op.created_at,
42 pk: (op.did.clone(), op.cid.clone()),
43 }
44 }
45}
46
47// bit of a hack
48impl From<Dt> for LastOp {
49 fn from(dt: Dt) -> Self {
50 Self {
51 created_at: dt,
52 pk: ("".to_string(), "".to_string()),
53 }
54 }
55}
56
57/// PLC
58#[derive(Debug, PartialEq)]
59pub struct PageBoundaryState {
60 pub last_at: Dt,
61 keys_at: Vec<OpKey>, // expected to ~always be length one
62}
63
64/// track keys at final createdAt to deduplicate the start of the next page
65impl PageBoundaryState {
66 pub fn new(page: &ExportPage) -> Option<Self> {
67 // grab the very last op
68 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
69
70 // set initial state
71 let mut me = Self {
72 last_at,
73 keys_at: vec![last_key],
74 };
75
76 // and make sure all keys at this time are captured from the back
77 me.capture_nth_last_at(page, last_at, 1);
78
79 Some(me)
80 }
81 fn apply_to_next(&mut self, page: &mut ExportPage) {
82 // walk ops forward, kicking previously-seen ops until created_at advances
83 let to_remove: Vec<usize> = page
84 .ops
85 .iter()
86 .enumerate()
87 .take_while(|(_, op)| op.created_at == self.last_at)
88 .filter(|(_, op)| self.keys_at.contains(&(*op).into()))
89 .map(|(i, _)| i)
90 .collect();
91
92 // actually remove them. last to first so indices don't shift
93 for dup_idx in to_remove.into_iter().rev() {
94 page.ops.remove(dup_idx);
95 }
96
97 // grab the very last op
98 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else {
99 // there are no ops left? oop. bail.
100 // last_at and existing keys remain in tact
101 return;
102 };
103
104 // reset state (as long as time actually moved forward on this page)
105 if last_at > self.last_at {
106 self.last_at = last_at;
107 self.keys_at = vec![last_key];
108 } else {
109 // weird cases: either time didn't move (fine...) or went backwards (not fine)
110 assert_eq!(last_at, self.last_at, "time moved backwards on a page");
111 self.keys_at.push(last_key);
112 }
113 // and make sure all keys at this time are captured from the back
114 self.capture_nth_last_at(page, last_at, 1);
115 }
116
117 /// walk backwards from 2nd last and collect keys until created_at changes
118 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) {
119 page.ops
120 .iter()
121 .rev()
122 .skip(skips)
123 .take_while(|op| op.created_at == last_at)
124 .for_each(|op| {
125 self.keys_at.push(op.into());
126 });
127 }
128}
129
130pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
131 log::trace!("Getting page: {url}");
132
133 let ops: Vec<Op> = CLIENT
134 .get(url)
135 .send()
136 .await?
137 .error_for_status()?
138 .text()
139 .await?
140 .trim()
141 .split('\n')
142 .filter_map(|s| {
143 serde_json::from_str::<Op>(s)
144 .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
145 .ok()
146 })
147 .collect();
148
149 let last_op = ops.last().map(Into::into);
150
151 Ok((ExportPage { ops }, last_op))
152}
153
154pub async fn poll_upstream(
155 after: Option<Dt>,
156 base: Url,
157 dest: mpsc::Sender<ExportPage>,
158) -> anyhow::Result<&'static str> {
159 log::info!("starting upstream poller after {after:?}");
160 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
161 let mut prev_last: Option<LastOp> = after.map(Into::into);
162 let mut boundary_state: Option<PageBoundaryState> = None;
163 loop {
164 tick.tick().await;
165
166 let mut url = base.clone();
167 if let Some(ref pl) = prev_last {
168 url.query_pairs_mut()
169 .append_pair("after", &pl.created_at.to_rfc3339());
170 };
171
172 let (mut page, next_last) = get_page(url).await?;
173 if let Some(ref mut state) = boundary_state {
174 state.apply_to_next(&mut page);
175 } else {
176 boundary_state = PageBoundaryState::new(&page);
177 }
178 if !page.is_empty() {
179 match dest.try_send(page) {
180 Ok(()) => {}
181 Err(mpsc::error::TrySendError::Full(page)) => {
182 log::warn!("export: destination channel full, awaiting...");
183 dest.send(page).await?;
184 }
185 e => e?,
186 };
187 }
188
189 prev_last = next_last.or(prev_last);
190 }
191}
192
193#[cfg(test)]
194mod test {
195 use super::*;
196
197 const FIVES_TS: i64 = 1431648000;
198 const NEXT_TS: i64 = 1431648001;
199
200 fn valid_op() -> Op {
201 serde_json::from_value(serde_json::json!({
202 "did": "did",
203 "cid": "cid",
204 "createdAt": "2015-05-15T00:00:00Z",
205 "nullified": false,
206 "operation": {},
207 }))
208 .unwrap()
209 }
210
211 fn next_op() -> Op {
212 serde_json::from_value(serde_json::json!({
213 "did": "didnext",
214 "cid": "cidnext",
215 "createdAt": "2015-05-15T00:00:01Z",
216 "nullified": false,
217 "operation": {},
218 }))
219 .unwrap()
220 }
221
222 fn base_state() -> PageBoundaryState {
223 let page = ExportPage {
224 ops: vec![valid_op()],
225 };
226 PageBoundaryState::new(&page).expect("to have a base page boundary state")
227 }
228
229 #[test]
230 fn test_boundary_new_empty() {
231 let page = ExportPage { ops: vec![] };
232 let state = PageBoundaryState::new(&page);
233 assert!(state.is_none());
234 }
235
236 #[test]
237 fn test_boundary_new_one_op() {
238 let page = ExportPage {
239 ops: vec![valid_op()],
240 };
241 let state = PageBoundaryState::new(&page).unwrap();
242 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
243 assert_eq!(
244 state.keys_at,
245 vec![OpKey {
246 cid: "cid".to_string(),
247 did: "did".to_string(),
248 }]
249 );
250 }
251
252 #[test]
253 fn test_add_new_empty() {
254 let mut state = base_state();
255 state.apply_to_next(&mut ExportPage { ops: vec![] });
256 assert_eq!(state, base_state());
257 }
258
259 #[test]
260 fn test_add_new_same_op() {
261 let mut page = ExportPage {
262 ops: vec![valid_op()],
263 };
264 let mut state = base_state();
265 state.apply_to_next(&mut page);
266 assert_eq!(state, base_state());
267 }
268
269 #[test]
270 fn test_add_new_same_time() {
271 // make an op with a different OpKey
272 let mut op = valid_op();
273 op.cid = "cid2".to_string();
274 let mut page = ExportPage { ops: vec![op] };
275
276 let mut state = base_state();
277 state.apply_to_next(&mut page);
278 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
279 assert_eq!(
280 state.keys_at,
281 vec![
282 OpKey {
283 cid: "cid".to_string(),
284 did: "did".to_string(),
285 },
286 OpKey {
287 cid: "cid2".to_string(),
288 did: "did".to_string(),
289 },
290 ]
291 );
292 }
293
294 #[test]
295 fn test_add_new_same_time_dup_before() {
296 // make an op with a different OpKey
297 let mut op = valid_op();
298 op.cid = "cid2".to_string();
299 let mut page = ExportPage {
300 ops: vec![valid_op(), op],
301 };
302
303 let mut state = base_state();
304 state.apply_to_next(&mut page);
305 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
306 assert_eq!(
307 state.keys_at,
308 vec![
309 OpKey {
310 cid: "cid".to_string(),
311 did: "did".to_string(),
312 },
313 OpKey {
314 cid: "cid2".to_string(),
315 did: "did".to_string(),
316 },
317 ]
318 );
319 }
320
321 #[test]
322 fn test_add_new_same_time_dup_after() {
323 // make an op with a different OpKey
324 let mut op = valid_op();
325 op.cid = "cid2".to_string();
326 let mut page = ExportPage {
327 ops: vec![op, valid_op()],
328 };
329
330 let mut state = base_state();
331 state.apply_to_next(&mut page);
332 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
333 assert_eq!(
334 state.keys_at,
335 vec![
336 OpKey {
337 cid: "cid".to_string(),
338 did: "did".to_string(),
339 },
340 OpKey {
341 cid: "cid2".to_string(),
342 did: "did".to_string(),
343 },
344 ]
345 );
346 }
347
348 #[test]
349 fn test_add_new_next_time() {
350 let mut page = ExportPage {
351 ops: vec![next_op()],
352 };
353 let mut state = base_state();
354 state.apply_to_next(&mut page);
355 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
356 assert_eq!(
357 state.keys_at,
358 vec![OpKey {
359 cid: "cidnext".to_string(),
360 did: "didnext".to_string(),
361 },]
362 );
363 }
364
365 #[test]
366 fn test_add_new_next_time_with_dup() {
367 let mut page = ExportPage {
368 ops: vec![valid_op(), next_op()],
369 };
370 let mut state = base_state();
371 state.apply_to_next(&mut page);
372 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
373 assert_eq!(
374 state.keys_at,
375 vec![OpKey {
376 cid: "cidnext".to_string(),
377 did: "didnext".to_string(),
378 },]
379 );
380 assert_eq!(page.ops.len(), 1);
381 assert_eq!(page.ops[0], next_op());
382 }
383
384 #[test]
385 fn test_add_new_next_time_with_dup_and_new_prev_same_time() {
386 // make an op with a different OpKey
387 let mut op = valid_op();
388 op.cid = "cid2".to_string();
389
390 let mut page = ExportPage {
391 ops: vec![
392 valid_op(), // should get dropped
393 op.clone(), // should be kept
394 next_op(),
395 ],
396 };
397 let mut state = base_state();
398 state.apply_to_next(&mut page);
399 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
400 assert_eq!(
401 state.keys_at,
402 vec![OpKey {
403 cid: "cidnext".to_string(),
404 did: "didnext".to_string(),
405 },]
406 );
407 assert_eq!(page.ops.len(), 2);
408 assert_eq!(page.ops[0], op);
409 assert_eq!(page.ops[1], next_op());
410 }
411
412 #[test]
413 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() {
414 // make an op with a different OpKey
415 let mut op = valid_op();
416 op.cid = "cid2".to_string();
417
418 let mut page = ExportPage {
419 ops: vec![
420 op.clone(), // should be kept
421 valid_op(), // should get dropped
422 next_op(),
423 ],
424 };
425 let mut state = base_state();
426 state.apply_to_next(&mut page);
427 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
428 assert_eq!(
429 state.keys_at,
430 vec![OpKey {
431 cid: "cidnext".to_string(),
432 did: "didnext".to_string(),
433 },]
434 );
435 assert_eq!(page.ops.len(), 2);
436 assert_eq!(page.ops[0], op);
437 assert_eq!(page.ops[1], next_op());
438 }
439}