use jacquard::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; use n0_future::StreamExt as _; use url::Url; // TODO: I'm not sure if I like jacquard api... but it works pub(crate) async fn start(url: Url) -> anyhow::Result<()> { let client = TungsteniteSubscriptionClient::from_base_uri(url); let params = JetstreamParams::new().build(); let stream = client.subscribe(¶ms).await?; let (_sink, mut messages) = stream.into_stream(); let (tx, mut rx) = tokio::sync::oneshot::channel(); tokio::spawn(async move { // TODO: use timeout instead tokio::signal::ctrl_c().await.ok(); let _ = tx.send(()); }); loop { tokio::select! { Some(result) = messages.next() => { match result { Ok(msg) => print_message(&msg), Err(e) => eprintln!("Error: {}", e), } } _ = &mut rx => { println!("\nShutting down..."); break; } } } Ok(()) } fn print_message(msg: &JetstreamMessage) { match msg { JetstreamMessage::Commit { did, time_us, commit, } => { let op = match commit.operation { CommitOperation::Create => "create", CommitOperation::Update => "update", CommitOperation::Delete => "delete", }; println!( "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}", did, time_us, op, commit.collection, commit.rkey, commit.cid ); } JetstreamMessage::Identity { did, time_us, identity, } => { println!( "Identity | did={} time_us={} handle={:?} seq={} time={}", did, time_us, identity.handle, identity.seq, identity.time ); } JetstreamMessage::Account { did, time_us, account, } => { println!( "Account | did={} time_us={} active={} seq={} time={} status={:?}", did, time_us, account.active, account.seq, account.time, account.status ); } } }