forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey, SeqOp, SeqPage};
2use reqwest::Url;
3use std::time::Duration;
4use thiserror::Error;
5use tokio::sync::mpsc;
6
7#[derive(Debug, Error)]
8pub enum GetPageError {
9 #[error(transparent)]
10 Reqwest(#[from] reqwest::Error),
11 #[error(transparent)]
12 ReqwestMiddleware(#[from] reqwest_middleware::Error),
13 #[error(transparent)]
14 Serde(#[from] serde_json::Error),
15}
16
17/// ops are primary-keyed by (did, cid)
18/// plc orders by `created_at` but does not guarantee distinct times per op
19/// we assume that the order will at least be deterministic: this may be unsound
20#[derive(Debug, PartialEq)]
21pub struct LastOp {
22 pub created_at: Dt, // any op greater is definitely not duplicated
23 pk: (String, String), // did, cid
24}
25
26impl From<Op> for LastOp {
27 fn from(op: Op) -> Self {
28 Self {
29 created_at: op.created_at,
30 pk: (op.did, op.cid),
31 }
32 }
33}
34
35impl From<&Op> for LastOp {
36 fn from(op: &Op) -> Self {
37 Self {
38 created_at: op.created_at,
39 pk: (op.did.clone(), op.cid.clone()),
40 }
41 }
42}
43
44// bit of a hack
45impl From<Dt> for LastOp {
46 fn from(dt: Dt) -> Self {
47 Self {
48 created_at: dt,
49 pk: ("".to_string(), "".to_string()),
50 }
51 }
52}
53
54/// State for removing duplicates ops between PLC export page boundaries
55#[derive(Debug, PartialEq)]
56pub struct PageBoundaryState {
57 /// The previous page's last timestamp
58 ///
59 /// Duplicate ops from /export only occur for the same exact timestamp
60 pub last_at: Dt,
61 /// The previous page's ops at its last timestamp
62 keys_at: Vec<OpKey>, // expected to ~always be length one
63}
64
65impl PageBoundaryState {
66 /// Initialize the boundary state with a PLC page
67 pub fn new(page: &ExportPage) -> Option<Self> {
68 // grab the very last op
69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
70
71 // set initial state
72 let mut me = Self {
73 last_at,
74 keys_at: vec![last_key],
75 };
76
77 // and make sure all keys at this time are captured from the back
78 me.capture_nth_last_at(page, last_at, 1);
79
80 Some(me)
81 }
82 /// Apply the deduplication and update state
83 ///
84 /// The beginning of the page will be modified to remove duplicates from the
85 /// previous page.
86 ///
87 /// The end of the page is inspected to update the deduplicator state for
88 /// the next page.
89 fn apply_to_next(&mut self, page: &mut ExportPage) {
90 // walk ops forward, kicking previously-seen ops until created_at advances
91 let to_remove: Vec<usize> = page
92 .ops
93 .iter()
94 .enumerate()
95 .take_while(|(_, op)| op.created_at == self.last_at)
96 .filter(|(_, op)| self.keys_at.contains(&(*op).into()))
97 .map(|(i, _)| i)
98 .collect();
99
100 // actually remove them. last to first so indices don't shift
101 for dup_idx in to_remove.into_iter().rev() {
102 page.ops.remove(dup_idx);
103 }
104
105 // grab the very last op
106 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else {
107 // there are no ops left? oop. bail.
108 // last_at and existing keys remain in tact
109 return;
110 };
111
112 // reset state (as long as time actually moved forward on this page)
113 if last_at > self.last_at {
114 self.last_at = last_at;
115 self.keys_at = vec![last_key];
116 } else {
117 // weird cases: either time didn't move (fine...) or went backwards (not fine)
118 assert_eq!(last_at, self.last_at, "time moved backwards on a page");
119 self.keys_at.push(last_key);
120 }
121 // and make sure all keys at this time are captured from the back
122 self.capture_nth_last_at(page, last_at, 1);
123 }
124
125 /// walk backwards from 2nd last and collect keys until created_at changes
126 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) {
127 page.ops
128 .iter()
129 .rev()
130 .skip(skips)
131 .take_while(|op| op.created_at == last_at)
132 .for_each(|op| {
133 self.keys_at.push(op.into());
134 });
135 }
136}
137
138/// Get one PLC export page
139///
140/// Extracts the final op so it can be used to fetch the following page
141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
142 use futures::TryStreamExt;
143 use tokio::io::{AsyncBufReadExt, BufReader};
144 use tokio_util::compat::FuturesAsyncReadCompatExt;
145
146 log::trace!("Getting page: {url}");
147
148 let res = CLIENT.get(url).send().await?.error_for_status()?;
149 let stream = Box::pin(
150 res.bytes_stream()
151 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
152 .into_async_read()
153 .compat(),
154 );
155
156 let mut lines = BufReader::new(stream).lines();
157 let mut ops = Vec::new();
158
159 loop {
160 match lines.next_line().await {
161 Ok(Some(line)) => {
162 let line = line.trim();
163 if line.is_empty() {
164 continue;
165 }
166 match serde_json::from_str::<Op>(line) {
167 Ok(op) => ops.push(op),
168 Err(e) => log::warn!("failed to parse op: {e} ({line})"),
169 }
170 }
171 Ok(None) => break,
172 Err(e) => {
173 log::warn!("transport error mid-page: {}; returning partial page", e);
174 break;
175 }
176 }
177 }
178
179 let last_op = ops.last().map(Into::into);
180
181 Ok((ExportPage { ops }, last_op))
182}
183
184/// Poll an upstream PLC server for new ops
185///
186/// Pages of operations are written to the `dest` channel.
187///
188/// ```no_run
189/// # #[tokio::main]
190/// # async fn main() {
191/// use allegedly::{ExportPage, Op, poll_upstream};
192///
193/// let after = Some(chrono::Utc::now());
194/// let upstream = "https://plc.wtf/export".parse().unwrap();
195/// let throttle = std::time::Duration::from_millis(300);
196///
197/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
198/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
199///
200/// while let Some(ExportPage { ops }) = rx.recv().await {
201/// println!("received {} plc ops", ops.len());
202///
203/// for Op { did, cid, operation, .. } in ops {
204/// // in this example we're alerting when changes are found for one
205/// // specific identity
206/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
207/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
208/// }
209/// }
210/// }
211/// # }
212/// ```
213pub async fn poll_upstream(
214 after: Option<Dt>,
215 base: Url,
216 throttle: Duration,
217 dest: mpsc::Sender<ExportPage>,
218) -> anyhow::Result<&'static str> {
219 log::info!("starting upstream poller at {base} after {after:?}");
220 let mut tick = tokio::time::interval(throttle);
221 let mut prev_last: Option<LastOp> = after.map(Into::into);
222 let mut boundary_state: Option<PageBoundaryState> = None;
223 loop {
224 tick.tick().await;
225
226 let mut url = base.clone();
227 if let Some(ref pl) = prev_last {
228 url.query_pairs_mut()
229 .append_pair("after", &pl.created_at.to_rfc3339());
230 };
231
232 let (mut page, next_last) = match get_page(url).await {
233 Ok(res) => res,
234 Err(e) => {
235 log::warn!("error polling upstream: {e}");
236 continue;
237 }
238 };
239
240 if let Some(ref mut state) = boundary_state {
241 state.apply_to_next(&mut page);
242 } else {
243 boundary_state = PageBoundaryState::new(&page);
244 }
245 if !page.is_empty() {
246 match dest.try_send(page) {
247 Ok(()) => {}
248 Err(mpsc::error::TrySendError::Full(page)) => {
249 log::warn!("export: destination channel full, awaiting...");
250 dest.send(page).await?;
251 }
252 e => e?,
253 };
254 }
255
256 prev_last = next_last.or(prev_last);
257 }
258}
259
260/// Fetch one page of seq-based export from `/export?after=<seq>`
261async fn get_seq_page(url: Url) -> Result<SeqPage, GetPageError> {
262 use futures::TryStreamExt;
263 use tokio::io::{AsyncBufReadExt, BufReader};
264 use tokio_util::compat::FuturesAsyncReadCompatExt;
265
266 log::trace!("getting seq page: {url}");
267
268 let res = CLIENT.get(url).send().await?.error_for_status()?;
269 let stream = Box::pin(
270 res.bytes_stream()
271 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
272 .into_async_read()
273 .compat(),
274 );
275
276 let mut lines = BufReader::new(stream).lines();
277 let mut ops = Vec::new();
278
279 loop {
280 match lines.next_line().await {
281 Ok(Some(line)) => {
282 let line = line.trim();
283 if line.is_empty() {
284 continue;
285 }
286 match serde_json::from_str::<SeqOp>(line) {
287 Ok(op) => ops.push(op),
288 Err(e) => log::warn!("failed to parse seq op: {e} ({line})"),
289 }
290 }
291 Ok(None) => break,
292 Err(e) => {
293 log::warn!(
294 "transport error mid-seq-page: {}; returning partial page",
295 e
296 );
297 break;
298 }
299 }
300 }
301
302 Ok(SeqPage { ops })
303}
304
305/// Poll an upstream PLC server using seq-number-based cursoring
306///
307/// Uses `/export?after=<seq>` — each op from the server carries a `seq` field
308/// which is a globally monotonic unsigned integer. Because seq is unique per op
309/// there is no need for page-boundary deduplication.
310///
311/// Pages are sent to `dest`. Returns when the channel closes.
312pub async fn poll_upstream_seq(
313 after: Option<u64>,
314 base: Url,
315 throttle: Duration,
316 dest: mpsc::Sender<SeqPage>,
317) -> anyhow::Result<&'static str> {
318 log::info!("starting seq upstream poller at {base} after {after:?}");
319 let mut tick = tokio::time::interval(throttle);
320 let mut last_seq: u64 = after.unwrap_or(0);
321
322 loop {
323 tick.tick().await;
324
325 let mut url = base.clone();
326 url.query_pairs_mut()
327 .append_pair("after", &last_seq.to_string());
328
329 let page = match get_seq_page(url).await {
330 Ok(p) => p,
331 Err(e) => {
332 log::warn!("error polling upstream (seq): {e}");
333 continue;
334 }
335 };
336
337 if let Some(last) = page.ops.last() {
338 last_seq = last.seq;
339 }
340
341 if !page.is_empty() {
342 match dest.try_send(page) {
343 Ok(()) => {}
344 Err(mpsc::error::TrySendError::Full(page)) => {
345 log::warn!("seq poll: destination channel full, awaiting...");
346 dest.send(page).await?;
347 }
348 e => e?,
349 };
350 }
351 }
352}
353
354/// Tail the upstream PLC `/export/stream` WebSocket endpoint
355///
356/// `cursor` is a seq number to resume from. The server only supports backfill
357/// of up to ~1 week (server-configurable), so this cannot replay from seq 0.
358/// Use `poll_upstream_seq` to catch up first, then hand off to this function.
359///
360/// Messages arrive as single-op `SeqPage`s sent to `dest`. Returns on
361/// disconnect so the caller can reconnect or fall back to polling.
362pub async fn tail_upstream_stream(
363 cursor: Option<u64>,
364 base: Url,
365 dest: mpsc::Sender<SeqPage>,
366) -> anyhow::Result<()> {
367 use futures::StreamExt;
368 use tokio_tungstenite::{connect_async, tungstenite::Message};
369
370 let mut url = base.clone();
371 // convert ws(s):// scheme if needed; some callers pass http(s)://
372 let ws_scheme = match url.scheme() {
373 "https" => "wss",
374 "http" => "ws",
375 _ => "ws",
376 }
377 .to_owned();
378 url.set_scheme(&ws_scheme)
379 .map_err(|_| anyhow::anyhow!("failed to set websocket scheme"))?;
380 if let Some(seq) = cursor {
381 url.query_pairs_mut()
382 .append_pair("cursor", &seq.to_string());
383 }
384
385 log::info!("connecting to stream: {url}");
386 let (mut ws, _) = connect_async(url.as_str()).await?;
387 log::info!("stream connected");
388
389 while let Some(msg) = ws.next().await {
390 let msg = msg?;
391 let text = match msg {
392 Message::Text(t) => t,
393 Message::Close(_) => {
394 log::info!("stream closed by server");
395 break;
396 }
397 _ => continue,
398 };
399
400 let op: SeqOp = match serde_json::from_str(&text) {
401 Ok(op) => op,
402 Err(e) => {
403 log::warn!("failed to parse stream event: {e} ({text})");
404 continue;
405 }
406 };
407
408 let page = SeqPage { ops: vec![op] };
409 if dest.send(page).await.is_err() {
410 log::info!("stream dest channel closed, stopping");
411 break;
412 }
413 }
414
415 Ok(())
416}
417
418#[cfg(test)]
419mod test {
420 use super::*;
421
422 const FIVES_TS: i64 = 1431648000;
423 const NEXT_TS: i64 = 1431648001;
424
425 fn valid_op() -> Op {
426 serde_json::from_value(serde_json::json!({
427 "did": "did",
428 "cid": "cid",
429 "createdAt": "2015-05-15T00:00:00Z",
430 "nullified": false,
431 "operation": {},
432 }))
433 .unwrap()
434 }
435
436 fn next_op() -> Op {
437 serde_json::from_value(serde_json::json!({
438 "did": "didnext",
439 "cid": "cidnext",
440 "createdAt": "2015-05-15T00:00:01Z",
441 "nullified": false,
442 "operation": {},
443 }))
444 .unwrap()
445 }
446
447 fn base_state() -> PageBoundaryState {
448 let page = ExportPage {
449 ops: vec![valid_op()],
450 };
451 PageBoundaryState::new(&page).expect("to have a base page boundary state")
452 }
453
454 #[test]
455 fn test_boundary_new_empty() {
456 let page = ExportPage { ops: vec![] };
457 let state = PageBoundaryState::new(&page);
458 assert!(state.is_none());
459 }
460
461 #[test]
462 fn test_boundary_new_one_op() {
463 let page = ExportPage {
464 ops: vec![valid_op()],
465 };
466 let state = PageBoundaryState::new(&page).unwrap();
467 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
468 assert_eq!(
469 state.keys_at,
470 vec![OpKey {
471 cid: "cid".to_string(),
472 did: "did".to_string(),
473 }]
474 );
475 }
476
477 #[test]
478 fn test_add_new_empty() {
479 let mut state = base_state();
480 state.apply_to_next(&mut ExportPage { ops: vec![] });
481 assert_eq!(state, base_state());
482 }
483
484 #[test]
485 fn test_add_new_same_op() {
486 let mut page = ExportPage {
487 ops: vec![valid_op()],
488 };
489 let mut state = base_state();
490 state.apply_to_next(&mut page);
491 assert_eq!(state, base_state());
492 }
493
494 #[test]
495 fn test_add_new_same_time() {
496 // make an op with a different OpKey
497 let mut op = valid_op();
498 op.cid = "cid2".to_string();
499 let mut page = ExportPage { ops: vec![op] };
500
501 let mut state = base_state();
502 state.apply_to_next(&mut page);
503 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
504 assert_eq!(
505 state.keys_at,
506 vec![
507 OpKey {
508 cid: "cid".to_string(),
509 did: "did".to_string(),
510 },
511 OpKey {
512 cid: "cid2".to_string(),
513 did: "did".to_string(),
514 },
515 ]
516 );
517 }
518
519 #[test]
520 fn test_add_new_same_time_dup_before() {
521 // make an op with a different OpKey
522 let mut op = valid_op();
523 op.cid = "cid2".to_string();
524 let mut page = ExportPage {
525 ops: vec![valid_op(), op],
526 };
527
528 let mut state = base_state();
529 state.apply_to_next(&mut page);
530 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
531 assert_eq!(
532 state.keys_at,
533 vec![
534 OpKey {
535 cid: "cid".to_string(),
536 did: "did".to_string(),
537 },
538 OpKey {
539 cid: "cid2".to_string(),
540 did: "did".to_string(),
541 },
542 ]
543 );
544 }
545
546 #[test]
547 fn test_add_new_same_time_dup_after() {
548 // make an op with a different OpKey
549 let mut op = valid_op();
550 op.cid = "cid2".to_string();
551 let mut page = ExportPage {
552 ops: vec![op, valid_op()],
553 };
554
555 let mut state = base_state();
556 state.apply_to_next(&mut page);
557 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
558 assert_eq!(
559 state.keys_at,
560 vec![
561 OpKey {
562 cid: "cid".to_string(),
563 did: "did".to_string(),
564 },
565 OpKey {
566 cid: "cid2".to_string(),
567 did: "did".to_string(),
568 },
569 ]
570 );
571 }
572
573 #[test]
574 fn test_add_new_next_time() {
575 let mut page = ExportPage {
576 ops: vec![next_op()],
577 };
578 let mut state = base_state();
579 state.apply_to_next(&mut page);
580 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
581 assert_eq!(
582 state.keys_at,
583 vec![OpKey {
584 cid: "cidnext".to_string(),
585 did: "didnext".to_string(),
586 },]
587 );
588 }
589
590 #[test]
591 fn test_add_new_next_time_with_dup() {
592 let mut page = ExportPage {
593 ops: vec![valid_op(), next_op()],
594 };
595 let mut state = base_state();
596 state.apply_to_next(&mut page);
597 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
598 assert_eq!(
599 state.keys_at,
600 vec![OpKey {
601 cid: "cidnext".to_string(),
602 did: "didnext".to_string(),
603 },]
604 );
605 assert_eq!(page.ops.len(), 1);
606 assert_eq!(page.ops[0], next_op());
607 }
608
609 #[test]
610 fn test_add_new_next_time_with_dup_and_new_prev_same_time() {
611 // make an op with a different OpKey
612 let mut op = valid_op();
613 op.cid = "cid2".to_string();
614
615 let mut page = ExportPage {
616 ops: vec![
617 valid_op(), // should get dropped
618 op.clone(), // should be kept
619 next_op(),
620 ],
621 };
622 let mut state = base_state();
623 state.apply_to_next(&mut page);
624 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
625 assert_eq!(
626 state.keys_at,
627 vec![OpKey {
628 cid: "cidnext".to_string(),
629 did: "didnext".to_string(),
630 },]
631 );
632 assert_eq!(page.ops.len(), 2);
633 assert_eq!(page.ops[0], op);
634 assert_eq!(page.ops[1], next_op());
635 }
636
637 #[test]
638 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() {
639 // make an op with a different OpKey
640 let mut op = valid_op();
641 op.cid = "cid2".to_string();
642
643 let mut page = ExportPage {
644 ops: vec![
645 op.clone(), // should be kept
646 valid_op(), // should get dropped
647 next_op(),
648 ],
649 };
650 let mut state = base_state();
651 state.apply_to_next(&mut page);
652 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
653 assert_eq!(
654 state.keys_at,
655 vec![OpKey {
656 cid: "cidnext".to_string(),
657 did: "didnext".to_string(),
658 },]
659 );
660 assert_eq!(page.ops.len(), 2);
661 assert_eq!(page.ops[0], op);
662 assert_eq!(page.ops[1], next_op());
663 }
664}