Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

shutdown setup

i don't think jetstream is quite doing graceful disconnect messages yet but

+50 -12
+2
Cargo.lock
··· 2679 2679 dependencies = [ 2680 2680 "anyhow", 2681 2681 "clap", 2682 + "ctrlc", 2682 2683 "jetstream", 2683 2684 "serde_json", 2684 2685 "tokio", 2686 + "tokio-util", 2685 2687 ] 2686 2688 2687 2689 [[package]]
+2
ufos/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0.97" 8 8 clap = { version = "4.5.31", features = ["derive"] } 9 + ctrlc = "3.4.5" 9 10 jetstream = { path = "../jetstream" } 10 11 serde_json = "1.0.140" 11 12 tokio = { version = "1.43.0", features = ["full", "sync", "time"] } 13 + tokio-util = "0.7.13"
+46 -12
ufos/src/main.rs
··· 2 2 use std::path::PathBuf; 3 3 use std::time::{Duration, Instant}; 4 4 5 + use tokio::select; 6 + use tokio_util::sync::CancellationToken; 7 + 5 8 use jetstream::{ 6 9 events::{commit::CommitEvent, JetstreamEvent::Commit}, 7 10 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 30 33 ..Default::default() 31 34 }; 32 35 36 + let stay_alive = CancellationToken::new(); 37 + 38 + ctrlc::set_handler({ 39 + let mut desperation: u8 = 0; 40 + let stay_alive = stay_alive.clone(); 41 + move || match desperation { 42 + 0 => { 43 + println!("ok, signalling shutdown..."); 44 + stay_alive.cancel(); 45 + desperation += 1; 46 + } 47 + 1.. => panic!("fine, panicking!"), 48 + } 49 + })?; 50 + 33 51 let jetstream: JetstreamConnector<serde_json::Value> = JetstreamConnector::new(config)?; 34 52 let receiver = jetstream.connect().await?; 35 53 ··· 37 55 38 56 let print_throttle = Duration::from_millis(400); 39 57 let mut last = Instant::now(); 40 - while let Ok(event) = receiver.recv_async().await { 41 - if let Commit(CommitEvent::Create { commit, .. }) = event { 42 - let now = Instant::now(); 43 - let since = now - last; 44 - if since >= print_throttle { 45 - let overshoot = since - print_throttle; // adjust to keep the rate on average 46 - last = now - overshoot; 47 - println!( 48 - "{}: {}", 49 - &*commit.info.collection, 50 - serde_json::to_string(&commit.record)? 51 - ); 58 + loop { 59 + select! { 60 + _ = stay_alive.cancelled() => { 61 + eprintln!("byeeee"); 62 + break 63 + } 64 + ev = receiver.recv_async() => { 65 + match ev { 66 + Ok(event) => { 67 + if let Commit(CommitEvent::Create { commit, .. }) = event { 68 + let now = Instant::now(); 69 + let since = now - last; 70 + if since >= print_throttle { 71 + let overshoot = since - print_throttle; // adjust to keep the rate on average 72 + last = now - overshoot; 73 + println!( 74 + "{}: {}", 75 + &*commit.info.collection, 76 + serde_json::to_string(&commit.record)? 77 + ); 78 + } 79 + } 80 + }, 81 + Err(e) => { 82 + eprintln!("jetstream event error: {e:?}"); 83 + break 84 + } 85 + } 52 86 } 53 87 } 54 88 }
ufos/src/serialize.rs

This is a binary file and will not be displayed.