tangled
alpha
login
or
join now
ptr.pet
/
Allegedly
forked from
microcosm.blue/Allegedly
0
fork
atom
Server tools to backfill, tail, mirror, and verify PLC logs
0
fork
atom
overview
issues
pulls
pipelines
tail: deduplicate
bad-example.com
6 months ago
815336da
e32ad0df
+93
-15
2 changed files
expand all
collapse all
unified
split
src
lib.rs
poll.rs
+14
-2
src/lib.rs
···
13
/// One page of PLC export
14
///
15
/// Not limited, but expected to have up to about 1000 lines
0
16
pub struct ExportPage {
17
pub ops: Vec<String>,
18
}
19
20
-
#[derive(Deserialize)]
0
0
0
0
0
0
21
#[serde(rename_all = "camelCase")]
22
-
pub struct OpPeek {
0
0
23
pub created_at: Dt,
0
0
0
24
}
25
26
pub fn bin_init(name: &str) {
···
13
/// One page of PLC export
14
///
15
/// Not limited, but expected to have up to about 1000 lines
16
+
#[derive(Debug)]
17
pub struct ExportPage {
18
pub ops: Vec<String>,
19
}
20
21
+
impl ExportPage {
22
+
pub fn is_empty(&self) -> bool {
23
+
self.ops.is_empty()
24
+
}
25
+
}
26
+
27
+
#[derive(Debug, Deserialize)]
28
#[serde(rename_all = "camelCase")]
29
+
pub struct Op<'a> {
30
+
pub did: &'a str,
31
+
pub cid: &'a str,
32
pub created_at: Dt,
33
+
pub nullified: bool,
34
+
#[serde(borrow)]
35
+
pub operation: &'a serde_json::value::RawValue,
36
}
37
38
pub fn bin_init(name: &str) {
+79
-13
src/poll.rs
···
1
-
use crate::{CLIENT, Dt, ExportPage, OpPeek};
2
use std::time::Duration;
3
use thiserror::Error;
4
use url::Url;
···
13
SerdeError(#[from] serde_json::Error),
14
}
15
16
-
pub async fn get_page(url: Url) -> Result<(ExportPage, Option<Dt>), GetPageError> {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
17
log::trace!("Getting page: {url}");
18
19
let ops: Vec<String> = CLIENT
···
25
.await?
26
.trim()
27
.split('\n')
0
0
0
0
28
.map(Into::into)
29
.collect();
30
31
-
let last_at = ops
32
.last()
33
.filter(|s| !s.is_empty())
34
-
.map(|s| serde_json::from_str::<OpPeek>(s))
35
.transpose()?
36
-
.map(|o| o.created_at)
37
-
.inspect(|at| log::trace!("new last_at: {at}"));
38
39
-
Ok((ExportPage { ops }, last_at))
40
}
41
42
pub async fn poll_upstream(
···
45
dest: flume::Sender<ExportPage>,
46
) -> anyhow::Result<()> {
47
let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
48
-
let mut after = after;
49
loop {
50
tick.tick().await;
0
51
let mut url = base.clone();
52
-
if let Some(a) = after {
53
-
url.query_pairs_mut().append_pair("after", &a.to_rfc3339());
0
54
};
55
-
let (page, next_after) = get_page(url).await?;
56
-
dest.send_async(page).await?;
57
-
after = next_after.or(after);
0
0
0
0
0
0
0
58
}
59
}
···
1
+
use crate::{CLIENT, Dt, ExportPage, Op};
2
use std::time::Duration;
3
use thiserror::Error;
4
use url::Url;
···
13
SerdeError(#[from] serde_json::Error),
14
}
15
16
+
/// ops are primary-keyed by (did, cid)
17
+
/// plc orders by `created_at` but does not guarantee distinct times per op
18
+
/// we assume that the order will at least be deterministic: this may be unsound
19
+
#[derive(Debug, PartialEq)]
20
+
pub struct LastOp {
21
+
created_at: Dt, // any op greater is definitely not duplicated
22
+
pk: (String, String), // did, cid
23
+
}
24
+
25
+
impl From<Op<'_>> for LastOp {
26
+
fn from(op: Op) -> Self {
27
+
Self {
28
+
created_at: op.created_at,
29
+
pk: (op.did.to_string(), op.cid.to_string()),
30
+
}
31
+
}
32
+
}
33
+
34
+
impl From<Dt> for LastOp {
35
+
fn from(dt: Dt) -> Self {
36
+
Self {
37
+
created_at: dt,
38
+
pk: ("".to_string(), "".to_string()),
39
+
}
40
+
}
41
+
}
42
+
43
+
impl ExportPage {
44
+
/// this is a (slightly flawed) op deduplicator
45
+
fn only_after_last(&mut self, last_op: &LastOp) {
46
+
loop {
47
+
let Some(s) = self.ops.first().cloned() else {
48
+
break;
49
+
};
50
+
let Ok(op) = serde_json::from_str::<Op>(&s) else {
51
+
log::warn!(
52
+
"deduplication failed op parsing ({s:?}), bailing for downstream to deal with."
53
+
);
54
+
break;
55
+
};
56
+
if op.created_at > last_op.created_at {
57
+
break;
58
+
}
59
+
log::trace!("dedup: dropping an op");
60
+
self.ops.remove(0);
61
+
if Into::<LastOp>::into(op) == *last_op {
62
+
log::trace!("dedup: found exact op, keeping all after here");
63
+
break;
64
+
}
65
+
}
66
+
}
67
+
}
68
+
69
+
pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
70
log::trace!("Getting page: {url}");
71
72
let ops: Vec<String> = CLIENT
···
78
.await?
79
.trim()
80
.split('\n')
81
+
.filter_map(|s| {
82
+
let s = s.trim();
83
+
if s.is_empty() { None } else { Some(s) }
84
+
})
85
.map(Into::into)
86
.collect();
87
88
+
let last_op = ops
89
.last()
90
.filter(|s| !s.is_empty())
91
+
.map(|s| serde_json::from_str::<Op>(s))
92
.transpose()?
93
+
.map(Into::into)
94
+
.inspect(|at| log::trace!("new last op: {at:?}"));
95
96
+
Ok((ExportPage { ops }, last_op))
97
}
98
99
pub async fn poll_upstream(
···
102
dest: flume::Sender<ExportPage>,
103
) -> anyhow::Result<()> {
104
let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
105
+
let mut prev_last: Option<LastOp> = after.map(Into::into);
106
loop {
107
tick.tick().await;
108
+
109
let mut url = base.clone();
110
+
if let Some(ref pl) = prev_last {
111
+
url.query_pairs_mut()
112
+
.append_pair("after", &pl.created_at.to_rfc3339());
113
};
114
+
115
+
let (mut page, next_last) = get_page(url).await?;
116
+
if let Some(ref pl) = prev_last {
117
+
page.only_after_last(pl);
118
+
}
119
+
if !page.is_empty() {
120
+
dest.send_async(page).await?;
121
+
}
122
+
123
+
prev_last = next_last.or(prev_last);
124
}
125
}