[WIP] tangled knot rust implementation
at main 77 lines 2.4 kB view raw
1use jacquard::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; 2use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 3use n0_future::StreamExt as _; 4use url::Url; 5 6// TODO: I'm not sure if I like jacquard api... but it works 7pub(crate) async fn start(url: Url) -> anyhow::Result<()> { 8 let client = TungsteniteSubscriptionClient::from_base_uri(url); 9 let params = JetstreamParams::new().build(); 10 let stream = client.subscribe(&params).await?; 11 12 let (_sink, mut messages) = stream.into_stream(); 13 14 let (tx, mut rx) = tokio::sync::oneshot::channel(); 15 tokio::spawn(async move { 16 // TODO: use timeout instead 17 tokio::signal::ctrl_c().await.ok(); 18 let _ = tx.send(()); 19 }); 20 21 loop { 22 tokio::select! { 23 Some(result) = messages.next() => { 24 match result { 25 Ok(msg) => print_message(&msg), 26 Err(e) => eprintln!("Error: {}", e), 27 } 28 } 29 _ = &mut rx => { 30 println!("\nShutting down..."); 31 break; 32 } 33 } 34 } 35 36 Ok(()) 37} 38 39fn print_message(msg: &JetstreamMessage) { 40 match msg { 41 JetstreamMessage::Commit { 42 did, 43 time_us, 44 commit, 45 } => { 46 let op = match commit.operation { 47 CommitOperation::Create => "create", 48 CommitOperation::Update => "update", 49 CommitOperation::Delete => "delete", 50 }; 51 println!( 52 "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}", 53 did, time_us, op, commit.collection, commit.rkey, commit.cid 54 ); 55 } 56 JetstreamMessage::Identity { 57 did, 58 time_us, 59 identity, 60 } => { 61 println!( 62 "Identity | did={} time_us={} handle={:?} seq={} time={}", 63 did, time_us, identity.handle, identity.seq, identity.time 64 ); 65 } 66 JetstreamMessage::Account { 67 did, 68 time_us, 69 account, 70 } => { 71 println!( 72 "Account | did={} time_us={} active={} seq={} time={} status={:?}", 73 did, time_us, account.active, account.seq, account.time, account.status 74 ); 75 } 76 } 77}