···11+use allegedly::{ExportPage, poll_upstream};
22+33+#[tokio::main]
44+async fn main() {
55+ // set to `None` to replay from the beginning of the PLC history
66+ let after = Some(chrono::Utc::now());
77+88+ // the PLC server to poll for new ops
99+ let upstream = "https://plc.wtf/export".parse().unwrap();
1010+1111+ // self-rate-limit (plc.directory's limit interval is 600ms)
1212+ let throttle = std::time::Duration::from_millis(300);
1313+1414+ // pages are sent out of the poller via a tokio mpsc channel
1515+ let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1616+1717+ // spawn a tokio task to run the poller
1818+ tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
1919+2020+ // receive pages of plc ops from the poller
2121+ while let Some(ExportPage { ops }) = rx.recv().await {
2222+ println!("received {} plc ops", ops.len());
2323+2424+ for op in ops {
2525+ // in this example we're alerting when changes are found for one
2626+ // specific identity
2727+ if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
2828+ println!(
2929+ "Update found for {}! cid={}\n -> operation: {}",
3030+ op.did, op.cid, op.operation.get()
3131+ );
3232+ }
3333+ }
3434+ }
3535+}
+54-2
src/poll.rs
···5151 }
5252}
53535454-/// PLC
5454+/// State for removing duplicates ops between PLC export page boundaries
5555#[derive(Debug, PartialEq)]
5656pub struct PageBoundaryState {
5757+ /// The previous page's last timestamp
5858+ ///
5959+ /// Duplicate ops from /export only occur for the same exact timestamp
5760 pub last_at: Dt,
6161+ /// The previous page's ops at its last timestamp
5862 keys_at: Vec<OpKey>, // expected to ~always be length one
5963}
60646161-/// track keys at final createdAt to deduplicate the start of the next page
6265impl PageBoundaryState {
6666+ /// Initialize the boundary state with a PLC page
6367 pub fn new(page: &ExportPage) -> Option<Self> {
6468 // grab the very last op
6569 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
···75797680 Some(me)
7781 }
8282+ /// Apply the deduplication and update state
8383+ ///
8484+ /// The beginning of the page will be modified to remove duplicates from the
8585+ /// previous page.
8686+ ///
8787+ /// The end of the page is inspected to update the deduplicator state for
8888+ /// the next page.
7889 fn apply_to_next(&mut self, page: &mut ExportPage) {
7990 // walk ops forward, kicking previously-seen ops until created_at advances
8091 let to_remove: Vec<usize> = page
···124135 }
125136}
126137138138+/// Get one PLC export page
139139+///
140140+/// Extracts the final op so it can be used to fetch the following page
127141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
128142 log::trace!("Getting page: {url}");
129143···152166 Ok((ExportPage { ops }, last_op))
153167}
154168169169+/// Poll an upstream PLC server for new ops
170170+///
171171+/// Pages of operations are written to the `dest` channel.
172172+///
173173+/// ```no_run
174174+/// # #[tokio::main]
175175+/// # async fn main() {
176176+/// use allegedly::{ExportPage, Op, poll_upstream};
177177+///
178178+/// // set to `None` to replay from the beginning of the PLC history
179179+/// let after = Some(chrono::Utc::now());
180180+///
181181+/// // the PLC server to poll for new ops
182182+/// let upstream = "https://plc.wtf/export".parse().unwrap();
183183+///
184184+/// // self-rate-limit (plc.directory's limit interval is 600ms)
185185+/// let throttle = std::time::Duration::from_millis(300);
186186+///
187187+/// // pages are sent out of the poller via a tokio mpsc channel
188188+/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
189189+///
190190+/// // spawn a tokio task to run the poller
191191+/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
192192+///
193193+/// // receive pages of plc ops from the poller
194194+/// while let Some(ExportPage { ops }) = rx.recv().await {
195195+/// println!("received {} plc ops", ops.len());
196196+///
197197+/// for Op { did, cid, operation, .. } in ops {
198198+/// // in this example we're alerting when changes are found for one
199199+/// // specific identity
200200+/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
201201+/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
202202+/// }
203203+/// }
204204+/// }
205205+/// # }
206206+/// ```
155207pub async fn poll_upstream(
156208 after: Option<Dt>,
157209 base: Url,