APIs for links and references in the ATmosphere

handle subscriber transitions, probably

+242 -89
+10 -4
Cargo.lock
··· 839 839 840 840 [[package]] 841 841 name = "ctrlc" 842 - version = "3.4.6" 842 + version = "3.4.7" 843 843 source = "registry+https://github.com/rust-lang/crates.io-index" 844 - checksum = "697b5419f348fd5ae2478e8018cb016c00a5881c7f46c717de98ffd135a5651c" 844 + checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73" 845 845 dependencies = [ 846 846 "nix", 847 847 "windows-sys 0.59.0", ··· 2374 2374 2375 2375 [[package]] 2376 2376 name = "nix" 2377 - version = "0.29.0" 2377 + version = "0.30.1" 2378 2378 source = "registry+https://github.com/rust-lang/crates.io-index" 2379 - checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" 2379 + checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" 2380 2380 dependencies = [ 2381 2381 "bitflags", 2382 2382 "cfg-if", ··· 3383 3383 dependencies = [ 3384 3384 "async-trait", 3385 3385 "clap", 3386 + "ctrlc", 3386 3387 "dropshot", 3388 + "env_logger", 3387 3389 "futures", 3388 3390 "http", 3389 3391 "jetstream", 3390 3392 "links", 3393 + "log", 3391 3394 "metrics", 3395 + "rand 0.9.1", 3392 3396 "schemars", 3393 3397 "semver", 3394 3398 "serde", 3395 3399 "serde_json", 3396 3400 "serde_qs", 3401 + "thiserror 2.0.12", 3397 3402 "tinyjson", 3398 3403 "tokio", 3399 3404 "tokio-tungstenite 0.27.0", 3405 + "tokio-util", 3400 3406 ] 3401 3407 3402 3408 [[package]]
+6
spacedust/Cargo.toml
··· 6 6 [dependencies] 7 7 async-trait = "0.1.88" 8 8 clap = { version = "4.5.40", features = ["derive"] } 9 + ctrlc = "3.4.7" 9 10 dropshot = "0.16.2" 11 + env_logger = "0.11.8" 10 12 futures = "0.3.31" 11 13 http = "1.3.1" 12 14 jetstream = { path = "../jetstream", features = ["metrics"] } 13 15 links = { path = "../links" } 16 + log = "0.4.27" 14 17 metrics = "0.24.2" 18 + rand = "0.9.1" 15 19 schemars = "0.8.22" 16 20 semver = "1.0.26" 17 21 serde = { version = "1.0.219", features = ["derive"] } 18 22 serde_json = "1.0.140" 19 23 serde_qs = "1.0.0-rc.3" 24 + thiserror = "2.0.12" 20 25 tinyjson = "2.5.1" 21 26 tokio = { version = "1.45.1", features = ["full"] } 22 27 tokio-tungstenite = "0.27.0" 28 + tokio-util = "0.7.15"
+24 -11
spacedust/src/consumer.rs
··· 1 + use tokio_util::sync::CancellationToken; 1 2 use crate::LinkEvent; 3 + use crate::error::ConsumerError; 2 4 use jetstream::{ 3 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 4 6 events::{CommitOp, Cursor, EventKind}, 5 7 }; 6 8 use links::collect_links; 7 - use std::error::Error; 8 9 use tokio::sync::broadcast; 9 10 10 11 const MAX_LINKS_PER_EVENT: usize = 100; 11 12 12 13 pub async fn consume( 13 14 b: broadcast::Sender<LinkEvent>, 14 - jetstream_endpoint: &str, 15 + jetstream_endpoint: String, 15 16 cursor: Option<Cursor>, 16 17 no_zstd: bool, 17 - ) -> Result<(), Box<dyn Error>> { 18 - let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint); 18 + shutdown: CancellationToken, 19 + ) -> Result<(), ConsumerError> { 20 + let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint); 19 21 if endpoint == jetstream_endpoint { 20 - std::println!("connecting to jetstream at {endpoint}"); 22 + log::info!("connecting to jetstream at {endpoint}"); 21 23 } else { 22 - std::println!("connecting to jetstream at {jetstream_endpoint} => {endpoint}"); 24 + log::info!("connecting to jetstream at {jetstream_endpoint} => {endpoint}"); 23 25 } 24 26 let config: JetstreamConfig = JetstreamConfig { 25 27 endpoint, ··· 36 38 .connect_cursor(cursor) 37 39 .await?; 38 40 39 - while let Some(event) = receiver.recv().await { 41 + log::info!("receiving jetstream messages.."); 42 + loop { 43 + if shutdown.is_cancelled() { 44 + log::info!("exiting consumer for shutdown"); 45 + break; 46 + } 47 + let Some(event) = receiver.recv().await else { 48 + log::error!("could not receive jetstream event, shutting down..."); 49 + shutdown.cancel(); 50 + break; 51 + }; 52 + 40 53 if event.kind != EventKind::Commit { 41 54 continue; 42 55 } 43 56 let Some(commit) = event.commit else { 44 - eprintln!("jetstream commit event missing commit data, ignoring"); 57 + log::warn!("jetstream commit event missing commit data, ignoring"); 45 58 continue; 46 59 }; 47 60 ··· 51 64 continue; 52 65 } 53 66 let Some(record) = commit.record else { 54 - eprintln!("jetstream commit update/delete missing record, ignoring"); 67 + log::warn!("jetstream commit update/delete missing record, ignoring"); 55 68 continue; 56 69 }; 57 70 ··· 60 73 // todo: indicate if the link limit was reached (-> links omitted) 61 74 for (i, link) in collect_links(&jv).into_iter().enumerate() { 62 75 if i >= MAX_LINKS_PER_EVENT { 63 - eprintln!("jetstream event has too many links, ignoring the rest"); 76 + log::warn!("jetstream event has too many links, ignoring the rest"); 64 77 break; 65 78 } 66 79 let link_ev = LinkEvent { ··· 79 92 } 80 93 } 81 94 82 - Err("jetstream consumer ended".into()) 95 + Err(ConsumerError::JetstreamEnded) 83 96 }
+13
spacedust/src/error.rs
··· 1 + use thiserror::Error; 2 + 3 + #[derive(Debug, Error)] 4 + pub enum ConsumerError { 5 + #[error(transparent)] 6 + JetstreamConnectionError(#[from] jetstream::error::ConnectionError), 7 + #[error(transparent)] 8 + JetstreamConfigValidationError(#[from] jetstream::error::ConfigValidationError), 9 + #[error(transparent)] 10 + JsonParseError(#[from] tinyjson::JsonParseError), 11 + #[error("jetstream ended")] 12 + JetstreamEnded 13 + }
+1
spacedust/src/lib.rs
··· 1 1 pub mod consumer; 2 + pub mod error; 2 3 pub mod server; 3 4 pub mod subscriber; 4 5
+23 -7
spacedust/src/main.rs
··· 3 3 4 4 use clap::Parser; 5 5 use tokio::sync::broadcast; 6 + use tokio_util::sync::CancellationToken; 6 7 7 8 /// Aggregate links in the at-mosphere 8 9 #[derive(Parser, Debug, Clone)] ··· 21 22 22 23 #[tokio::main] 23 24 async fn main() -> Result<(), String> { 24 - let args = Args::parse(); 25 + env_logger::init(); 25 26 26 27 // tokio broadcast keeps a single main output queue for all subscribers. 27 28 // each subscriber clones off a copy of an individual value for each recv. ··· 40 41 // TODO: determine if a pathological case could blow this up (eg 1MB link 41 42 // paths + slow subscriber -> 16GiB queue) 42 43 let (b, _) = broadcast::channel(16_384); 44 + let consumer_sender = b.clone(); 43 45 44 - let consuming = consumer::consume(b.clone(), &args.jetstream, None, args.jetstream_no_zstd); 46 + let shutdown = CancellationToken::new(); 47 + 48 + let ctrlc_shutdown = shutdown.clone(); 49 + ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 45 50 46 - let serving = server::serve(b); 51 + let args = Args::parse(); 47 52 48 - tokio::select! { 49 - e = serving => eprintln!("serving failed: {e:?}"), 50 - e = consuming => eprintln!("consuming failed: {e:?}"), 51 - }; 53 + let server_shutdown = shutdown.clone(); 54 + let serving = tokio::spawn(async move { 55 + server::serve(b, server_shutdown).await 56 + }); 57 + 58 + let consumer_shutdown = shutdown.clone(); 59 + let consuming = tokio::spawn(async move { 60 + consumer::consume(consumer_sender, args.jetstream, None, args.jetstream_no_zstd, consumer_shutdown).await 61 + }); 62 + 63 + let (served, consumed) = tokio::join!(serving, consuming); 64 + log::info!("serving ended: {served:?}"); 65 + log::info!("consuming ended: {consumed:?}"); 66 + 67 + log::info!("bye!"); 52 68 53 69 Ok(()) 54 70 }
+23 -12
spacedust/src/server.rs
··· 1 - use crate::subscriber; 1 + use crate::subscriber::Subscriber; 2 2 use metrics::{histogram, counter}; 3 3 use std::sync::Arc; 4 4 use crate::LinkEvent; ··· 19 19 use tokio::sync::broadcast; 20 20 use tokio::time::Instant; 21 21 use tokio_tungstenite::tungstenite::protocol::Role; 22 + use tokio_util::sync::CancellationToken; 22 23 use async_trait::async_trait; 23 24 use std::collections::HashSet; 24 25 25 26 const INDEX_HTML: &str = include_str!("../static/index.html"); 26 27 const FAVICON: &[u8] = include_bytes!("../static/favicon.ico"); 27 28 28 - pub async fn serve(b: broadcast::Sender<LinkEvent>) -> Result<(), String> { 29 + pub async fn serve(b: broadcast::Sender<LinkEvent>, shutdown: CancellationToken) -> Result<(), String> { 29 30 let config_logging = ConfigLogging::StderrTerminal { 30 31 level: ConfigLoggingLevel::Info, 31 32 }; ··· 58 59 .map_err(|e| e.to_string())?, 59 60 ); 60 61 61 - let ctx = Context { spec, b }; 62 + let sub_shutdown = shutdown.clone(); 63 + let ctx = Context { spec, b, shutdown: sub_shutdown }; 62 64 63 65 let server = ServerBuilder::new(api, ctx, log) 64 66 .config(ConfigDropshot { ··· 68 70 .start() 69 71 .map_err(|error| format!("failed to create server: {}", error))?; 70 72 71 - server.await 73 + tokio::select! { 74 + s = server.wait_for_shutdown() => { 75 + log::error!("dropshot server ended: {s:?}"); 76 + s 77 + }, 78 + _ = shutdown.cancelled() => { 79 + log::info!("shutting down server"); 80 + server.close().await?; 81 + Err("shutdown requested".to_string()) 82 + } 83 + } 72 84 } 73 85 74 86 #[derive(Debug, Clone)] 75 87 struct Context { 76 88 pub spec: Arc<serde_json::Value>, 77 89 pub b: broadcast::Sender<LinkEvent>, 90 + pub shutdown: CancellationToken, 78 91 } 79 92 80 93 async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError> ··· 266 279 } 267 280 } 268 281 269 - #[derive(Deserialize, JsonSchema)] 270 - struct QueryParams { 271 - _hello: Option<String>, 272 - } 273 - 274 282 #[channel { 275 283 protocol = WEBSOCKETS, 276 284 path = "/subscribe", 277 285 }] 278 286 async fn subscribe( 279 - ctx: RequestContext<Context>, 287 + reqctx: RequestContext<Context>, 280 288 query: MultiSubscribeQuery, 281 289 upgraded: WebsocketConnection, 282 290 ) -> dropshot::WebsocketChannelResult { ··· 287 295 ) 288 296 .await; 289 297 290 - let b = ctx.context().b.subscribe(); 298 + let Context { b, shutdown, .. } = reqctx.context(); 299 + let sub_token = shutdown.child_token(); 300 + let subscription = b.subscribe(); 291 301 292 - subscriber::subscribe(b, ws, query) 302 + Subscriber::new(query, sub_token) 303 + .start(ws, subscription) 293 304 .await 294 305 .map_err(|e| format!("boo: {e:?}"))?; 295 306
+142 -55
spacedust/src/subscriber.rs
··· 1 + use tokio::time::interval; 2 + use std::time::Duration; 3 + use futures::StreamExt; 1 4 use crate::ClientEvent; 2 5 use crate::LinkEvent; 3 6 use crate::server::MultiSubscribeQuery; 4 7 use futures::SinkExt; 5 8 use std::error::Error; 6 - use tokio::sync::broadcast; 9 + use tokio::sync::broadcast::{self, error::RecvError}; 7 10 use tokio_tungstenite::{WebSocketStream, tungstenite::Message}; 11 + use tokio_util::sync::CancellationToken; 8 12 use dropshot::WebsocketConnectionRaw; 9 13 10 - pub async fn subscribe( 11 - mut sub: broadcast::Receiver<LinkEvent>, 12 - mut ws: WebSocketStream<WebsocketConnectionRaw>, 14 + const PING_PERIOD: Duration = Duration::from_secs(30); 15 + 16 + pub struct Subscriber { 13 17 query: MultiSubscribeQuery, 14 - ) -> Result<(), Box<dyn Error>> { 15 - // TODO: pingpong 18 + shutdown: CancellationToken, 19 + } 16 20 17 - loop { 18 - match sub.recv().await { 19 - Ok(link) => { 21 + impl Subscriber { 22 + pub fn new( 23 + query: MultiSubscribeQuery, 24 + shutdown: CancellationToken, 25 + ) -> Self { 26 + log::warn!("new sub..."); 27 + Self { query, shutdown } 28 + } 20 29 21 - // subject + subject DIDs are logical OR 22 - let target_did = if link.target.starts_with("did:") { 23 - link.target.clone() 24 - } else { 25 - let Some(rest) = link.target.strip_prefix("at://") else { 26 - continue; 27 - }; 28 - if let Some((did, _)) = rest.split_once("/") { 29 - did 30 - } else { 31 - rest 32 - }.to_string() 33 - }; 34 - if !(query.wanted_subjects.contains(&link.target) || query.wanted_subject_dids.contains(&target_did) || query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty()) { 35 - // wowwww ^^ fix that 36 - continue; 37 - } 30 + pub async fn start( 31 + self, 32 + ws: WebSocketStream<WebsocketConnectionRaw>, 33 + mut receiver: broadcast::Receiver<LinkEvent> 34 + ) -> Result<(), Box<dyn Error>> { 35 + log::warn!("starting new sub..."); 36 + let mut ping_state = None; 37 + let (mut ws_sender, mut ws_receiver) = ws.split(); 38 + let mut ping_interval = interval(PING_PERIOD); 39 + let _guard = self.shutdown.clone().drop_guard(); 38 40 39 - // subjects together with sources are logical AND 41 + // TODO: do we need to timeout ws sends?? 40 42 41 - if !query.wanted_sources.is_empty() { 42 - let undotted = link.path.strip_prefix('.').unwrap_or_else(|| { 43 - eprintln!("link path did not have expected '.' prefix: {}", link.path); 44 - "" 45 - }); 46 - let source = format!("{}:{undotted}", link.collection); 47 - if !query.wanted_sources.contains(&source) { 48 - continue; 43 + loop { 44 + tokio::select! { 45 + l = receiver.recv() => match l { 46 + Ok(link) => if let Some(message) = self.filter(link) { 47 + if let Err(e) = ws_sender.send(message).await { 48 + log::warn!("failed to send link, dropping subscriber: {e:?}"); 49 + break; 50 + } 51 + }, 52 + Err(RecvError::Closed) => self.shutdown.cancel(), 53 + Err(RecvError::Lagged(n)) => { 54 + log::warn!("dropping lagging subscriber (missed {n} messages already)"); 55 + self.shutdown.cancel(); 56 + } 57 + }, 58 + cm = ws_receiver.next() => match cm { 59 + Some(Ok(Message::Ping(state))) => { 60 + if let Err(e) = ws_sender.send(Message::Pong(state)).await { 61 + log::error!("failed to reply pong to subscriber: {e:?}"); 62 + break; 63 + } 64 + } 65 + Some(Ok(Message::Pong(state))) => { 66 + if let Some(expected_state) = ping_state { 67 + if *state == expected_state { 68 + ping_state = None; // good 69 + } else { 70 + log::error!("subscriber returned a pong with the wrong state, dropping"); 71 + self.shutdown.cancel(); 72 + } 73 + } else { 74 + log::error!("subscriber sent a pong when none was expected"); 75 + self.shutdown.cancel(); 76 + } 77 + } 78 + Some(Ok(m)) => log::trace!("subscriber sent an unexpected message: {m:?}"), 79 + Some(Err(e)) => { 80 + log::error!("failed to receive subscriber message: {e:?}"); 81 + break; 82 + } 83 + None => { 84 + log::trace!("end of subscriber messages. bye!"); 85 + break; 86 + } 87 + }, 88 + _ = ping_interval.tick() => { 89 + if ping_state.is_some() { 90 + log::warn!("did not recieve pong within {PING_PERIOD:?}, dropping subscriber"); 91 + self.shutdown.cancel(); 92 + } else { 93 + let new_state: [u8; 8] = rand::random(); 94 + let ping = new_state.to_vec().into(); 95 + ping_state = Some(new_state); 96 + if let Err(e) = ws_sender.send(Message::Ping(ping)).await { 97 + log::error!("failed to send ping to subscriber, dropping: {e:?}"); 98 + self.shutdown.cancel(); 99 + } 49 100 } 50 101 } 51 - 52 - let ev = ClientEvent { 53 - kind: "link".to_string(), 54 - origin: "live".to_string(), 55 - link: link.into(), 56 - }; 57 - let json = serde_json::to_string(&ev)?; 58 - if let Err(e) = ws.send(Message::Text(json.into())).await { 59 - eprintln!("client: failed to send event: {e:?}"); 60 - ws.close(None).await?; // TODO: do we need this one?? 102 + _ = self.shutdown.cancelled() => { 103 + log::info!("subscriber shutdown requested, bye!"); 104 + if let Err(e) = ws_sender.close().await { 105 + log::warn!("failed to close subscriber: {e:?}"); 106 + } 61 107 break; 62 - } 63 - } 64 - Err(broadcast::error::RecvError::Closed) => { 65 - ws.close(None).await?; // TODO: send reason 66 - break; 108 + }, 67 109 } 68 - Err(broadcast::error::RecvError::Lagged(_n_missed)) => { 69 - eprintln!("client lagged, closing"); 70 - ws.close(None).await?; // TODO: send reason 71 - break; 110 + } 111 + log::trace!("end of subscriber. bye!"); 112 + Ok(()) 113 + } 114 + 115 + fn filter( 116 + &self, 117 + link: LinkEvent, 118 + // mut sender: impl Sink<Message> + Unpin 119 + ) -> Option<Message> { 120 + let query = &self.query; 121 + 122 + // subject + subject DIDs are logical OR 123 + let target_did = if link.target.starts_with("did:") { 124 + link.target.clone() 125 + } else { 126 + let Some(rest) = link.target.strip_prefix("at://") else { 127 + return None 128 + }; 129 + if let Some((did, _)) = rest.split_once("/") { 130 + did 131 + } else { 132 + rest 133 + }.to_string() 134 + }; 135 + if !(query.wanted_subjects.contains(&link.target) || query.wanted_subject_dids.contains(&target_did) || query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty()) { 136 + // wowwww ^^ fix that 137 + return None 138 + } 139 + 140 + // subjects together with sources are logical AND 141 + 142 + if !query.wanted_sources.is_empty() { 143 + let undotted = link.path.strip_prefix('.').unwrap_or_else(|| { 144 + eprintln!("link path did not have expected '.' prefix: {}", link.path); 145 + "" 146 + }); 147 + let source = format!("{}:{undotted}", link.collection); 148 + if !query.wanted_sources.contains(&source) { 149 + return None 72 150 } 73 151 } 152 + 153 + let ev = ClientEvent { 154 + kind: "link".to_string(), 155 + origin: "live".to_string(), 156 + link: link.into(), 157 + }; 158 + 159 + let json = serde_json::to_string(&ev).unwrap(); 160 + 161 + Some(Message::Text(json.into())) 74 162 } 75 - Ok(()) 76 163 }