[WIP] tangled knot rust implementation
1use jacquard::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams};
2use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
3use n0_future::StreamExt as _;
4use url::Url;
5
6// TODO: I'm not sure if I like jacquard api... but it works
7pub(crate) async fn start(url: Url) -> anyhow::Result<()> {
8 let client = TungsteniteSubscriptionClient::from_base_uri(url);
9 let params = JetstreamParams::new().build();
10 let stream = client.subscribe(¶ms).await?;
11
12 let (_sink, mut messages) = stream.into_stream();
13
14 let (tx, mut rx) = tokio::sync::oneshot::channel();
15 tokio::spawn(async move {
16 // TODO: use timeout instead
17 tokio::signal::ctrl_c().await.ok();
18 let _ = tx.send(());
19 });
20
21 loop {
22 tokio::select! {
23 Some(result) = messages.next() => {
24 match result {
25 Ok(msg) => print_message(&msg),
26 Err(e) => eprintln!("Error: {}", e),
27 }
28 }
29 _ = &mut rx => {
30 println!("\nShutting down...");
31 break;
32 }
33 }
34 }
35
36 Ok(())
37}
38
39fn print_message(msg: &JetstreamMessage) {
40 match msg {
41 JetstreamMessage::Commit {
42 did,
43 time_us,
44 commit,
45 } => {
46 let op = match commit.operation {
47 CommitOperation::Create => "create",
48 CommitOperation::Update => "update",
49 CommitOperation::Delete => "delete",
50 };
51 println!(
52 "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}",
53 did, time_us, op, commit.collection, commit.rkey, commit.cid
54 );
55 }
56 JetstreamMessage::Identity {
57 did,
58 time_us,
59 identity,
60 } => {
61 println!(
62 "Identity | did={} time_us={} handle={:?} seq={} time={}",
63 did, time_us, identity.handle, identity.seq, identity.time
64 );
65 }
66 JetstreamMessage::Account {
67 did,
68 time_us,
69 account,
70 } => {
71 println!(
72 "Account | did={} time_us={} active={} seq={} time={} status={:?}",
73 did, time_us, account.active, account.seq, account.time, account.status
74 );
75 }
76 }
77}