APIs for links and references in the ATmosphere

add a delay/debounce queue

this catches some amount of spurious notifications (accidental like + quick unlike) and also possibly some cases of bsky moderation actions? or maybe would need to listen to labellers for that (and prob should)

+203 -17
+10 -7
spacedust/src/consumer.rs
··· 1 1 use tokio_util::sync::CancellationToken; 2 2 use crate::LinkEvent; 3 3 use crate::error::ConsumerError; 4 + use crate::removable_delay_queue; 4 5 use jetstream::{ 5 6 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 6 7 events::{CommitOp, Cursor, EventKind}, ··· 12 13 13 14 pub async fn consume( 14 15 b: broadcast::Sender<LinkEvent>, 16 + d: removable_delay_queue::Input<(String, usize), LinkEvent>, 15 17 jetstream_endpoint: String, 16 18 cursor: Option<Cursor>, 17 19 no_zstd: bool, ··· 57 59 continue; 58 60 }; 59 61 62 + let at_uri = format!("at://{}/{}/{}", &*event.did, &*commit.collection, &*commit.rkey); 63 + 60 64 // TODO: keep a buffer and remove quick deletes to debounce notifs 61 65 // for now we just drop all deletes eek 62 66 if commit.operation == CommitOp::Delete { 67 + d.remove_range((at_uri.clone(), 0)..=(at_uri.clone(), MAX_LINKS_PER_EVENT)).await; 63 68 continue; 64 69 } 65 70 let Some(record) = commit.record else { ··· 84 89 let link_ev = LinkEvent { 85 90 collection: commit.collection.to_string(), 86 91 path: link.path, 87 - origin: format!( 88 - "at://{}/{}/{}", 89 - &*event.did, 90 - &*commit.collection, 91 - &*commit.rkey, 92 - ), 92 + origin: at_uri.clone(), 93 93 rev: commit.rev.to_string(), 94 94 target: link.target.into_string(), 95 95 }; 96 - let _ = b.send(link_ev); // only errors if no subscribers are connected, which is just fine. 96 + let _ = b.send(link_ev.clone()); // only errors if no subscribers are connected, which is just fine. 97 + d.enqueue((at_uri.clone(), i), link_ev) 98 + .await 99 + .map_err(|_| ConsumerError::DelayQueueOutputDropped)?; 97 100 } 98 101 } 99 102
+23
spacedust/src/delay.rs
··· 1 + use crate::removable_delay_queue; 2 + use crate::LinkEvent; 3 + use tokio_util::sync::CancellationToken; 4 + use tokio::sync::broadcast; 5 + use crate::error::DelayError; 6 + 7 + pub async fn to_broadcast( 8 + source: removable_delay_queue::Output<(String, usize), LinkEvent>, 9 + dest: broadcast::Sender<LinkEvent>, 10 + shutdown: CancellationToken, 11 + ) -> Result<(), DelayError> { 12 + loop { 13 + tokio::select! { 14 + ev = source.next() => match ev { 15 + Some(event) => { 16 + let _ = dest.send(event); // only errors of there are no listeners, but that's normal 17 + }, 18 + None => return Err(DelayError::DelayEnded), 19 + }, 20 + _ = shutdown.cancelled() => return Ok(()), 21 + } 22 + } 23 + }
+11 -1
spacedust/src/error.rs
··· 6 6 ConsumerTaskError(#[from] ConsumerError), 7 7 #[error(transparent)] 8 8 ServerTaskError(#[from] ServerError), 9 + #[error(transparent)] 10 + DelayTaskError(#[from] DelayError), 9 11 } 10 12 11 13 #[derive(Debug, Error)] ··· 15 17 #[error(transparent)] 16 18 JetstreamConfigValidationError(#[from] jetstream::error::ConfigValidationError), 17 19 #[error("jetstream ended")] 18 - JetstreamEnded 20 + JetstreamEnded, 21 + #[error("delay queue output dropped")] 22 + DelayQueueOutputDropped, 23 + } 24 + 25 + #[derive(Debug, Error)] 26 + pub enum DelayError { 27 + #[error("delay ended")] 28 + DelayEnded, 19 29 } 20 30 21 31 #[derive(Debug, Error)]
+2
spacedust/src/lib.rs
··· 1 1 pub mod consumer; 2 + pub mod delay; 2 3 pub mod error; 3 4 pub mod server; 4 5 pub mod subscriber; 6 + pub mod removable_delay_queue; 5 7 6 8 use serde::Serialize; 7 9
+16 -1
spacedust/src/main.rs
··· 1 1 use spacedust::error::MainTaskError; 2 2 use spacedust::consumer; 3 3 use spacedust::server; 4 + use spacedust::delay; 5 + use spacedust::removable_delay_queue::removable_delay_queue; 4 6 5 7 use clap::Parser; 6 8 use metrics_exporter_prometheus::PrometheusBuilder; 7 9 use tokio::sync::broadcast; 8 10 use tokio_util::sync::CancellationToken; 11 + use std::time::Duration; 9 12 10 13 /// Aggregate links in the at-mosphere 11 14 #[derive(Parser, Debug, Clone)] ··· 44 47 // paths + slow subscriber -> 16GiB queue) 45 48 let (b, _) = broadcast::channel(16_384); 46 49 let consumer_sender = b.clone(); 50 + let (d, _) = broadcast::channel(16_384); 51 + let consumer_delayed_sender = d.clone(); 52 + 53 + let delay = Duration::from_secs(21); 54 + let (delay_queue_sender, delay_queue_receiver) = removable_delay_queue(delay); 47 55 48 56 let shutdown = CancellationToken::new(); 49 57 ··· 60 68 61 69 let server_shutdown = shutdown.clone(); 62 70 tasks.spawn(async move { 63 - server::serve(b, server_shutdown).await?; 71 + server::serve(b, d, server_shutdown).await?; 64 72 Ok(()) 65 73 }); 66 74 ··· 68 76 tasks.spawn(async move { 69 77 consumer::consume( 70 78 consumer_sender, 79 + delay_queue_sender, 71 80 args.jetstream, 72 81 None, 73 82 args.jetstream_no_zstd, 74 83 consumer_shutdown 75 84 ) 76 85 .await?; 86 + Ok(()) 87 + }); 88 + 89 + let delay_shutdown = shutdown.clone(); 90 + tasks.spawn(async move { 91 + delay::to_broadcast(delay_queue_receiver, consumer_delayed_sender, delay_shutdown).await?; 77 92 Ok(()) 78 93 }); 79 94
+118
spacedust/src/removable_delay_queue.rs
··· 1 + use std::ops::RangeBounds; 2 + use std::collections::{BTreeMap, VecDeque}; 3 + use std::time::{Duration, Instant}; 4 + use tokio::sync::Mutex; 5 + use std::sync::Arc; 6 + use thiserror::Error; 7 + 8 + #[derive(Debug, Error)] 9 + pub enum EnqueueError<T> { 10 + #[error("queue ouput dropped")] 11 + OutputDropped(T), 12 + } 13 + 14 + pub trait Key: Eq + Ord + Clone {} 15 + impl<T: Eq + Ord + Clone> Key for T {} 16 + 17 + #[derive(Debug)] 18 + struct Queue<K: Key, T> { 19 + queue: VecDeque<(Instant, K)>, 20 + items: BTreeMap<K, T> 21 + } 22 + 23 + pub struct Input<K: Key, T> { 24 + q: Arc<Mutex<Queue<K, T>>>, 25 + } 26 + 27 + impl<K: Key, T> Input<K, T> { 28 + /// if a key is already present, its previous item will be overwritten and 29 + /// its delay time will be reset for the new item. 30 + /// 31 + /// errors if the remover has been dropped 32 + pub async fn enqueue(&self, key: K, item: T) -> Result<(), EnqueueError<T>> { 33 + if Arc::strong_count(&self.q) == 1 { 34 + return Err(EnqueueError::OutputDropped(item)); 35 + } 36 + // TODO: try to push out an old element first 37 + // for now we just hope there's a listener 38 + let now = Instant::now(); 39 + let mut q = self.q.lock().await; 40 + q.queue.push_back((now, key.clone())); 41 + q.items.insert(key, item); 42 + Ok(()) 43 + } 44 + /// remove an item from the queue, by key 45 + /// 46 + /// the item itself is removed, but the key will remain in the queue -- it 47 + /// will simply be skipped over when a new output item is requested. this 48 + /// keeps the removal cheap (=btreemap remove), for a bit of space overhead 49 + pub async fn remove_range(&self, range: impl RangeBounds<K>) { 50 + let n = { 51 + let mut q = self.q.lock().await; 52 + let keys = q.items.range(range).map(|(k, _)| k).cloned().collect::<Vec<_>>(); 53 + for k in &keys { 54 + q.items.remove(k); 55 + } 56 + keys.len() 57 + }; 58 + if n == 0 { 59 + metrics::counter!("delay_queue_remove_not_found").increment(1); 60 + } else { 61 + metrics::counter!("delay_queue_remove_total_records").increment(1); 62 + metrics::counter!("delay_queue_remove_total_links").increment(n as u64); 63 + } 64 + } 65 + } 66 + 67 + pub struct Output<K: Key, T> { 68 + delay: Duration, 69 + q: Arc<Mutex<Queue<K, T>>>, 70 + } 71 + 72 + impl<K: Key, T> Output<K, T> { 73 + pub async fn next(&self) -> Option<T> { 74 + let get = || async { 75 + let mut q = self.q.lock().await; 76 + while let Some((t, k)) = q.queue.pop_front() { 77 + // skip over queued keys that were removed from items 78 + if let Some(item) = q.items.remove(&k) { 79 + return Some((t, item)); 80 + } 81 + } 82 + None 83 + }; 84 + loop { 85 + if let Some((t, item)) = get().await { 86 + let expected_release = t + self.delay; 87 + let now = Instant::now(); 88 + if expected_release > now { 89 + tokio::time::sleep_until(expected_release.into()).await; 90 + metrics::counter!("delay_queue_emit_total", "early" => "yes").increment(1); 91 + } else { 92 + metrics::counter!("delay_queue_emit_total", "early" => "no").increment(1); 93 + let overshoot = now - expected_release; 94 + metrics::histogram!("delay_queue_emit_overshoot").record(overshoot.as_secs_f64()); 95 + } 96 + return Some(item) 97 + } else if Arc::strong_count(&self.q) == 1 { 98 + return None; 99 + } 100 + // the queue is *empty*, so we need to wait at least as long as the current delay 101 + tokio::time::sleep(self.delay).await; 102 + metrics::counter!("delay_queue_entirely_empty_total").increment(1); 103 + }; 104 + } 105 + } 106 + 107 + pub fn removable_delay_queue<K: Key, T>( 108 + delay: Duration, 109 + ) -> (Input<K, T>, Output<K, T>) { 110 + let q: Arc<Mutex<Queue<K, T>>> = Arc::new(Mutex::new(Queue { 111 + queue: VecDeque::new(), 112 + items: BTreeMap::new(), 113 + })); 114 + 115 + let input = Input::<K, T> { q: q.clone() }; 116 + let output = Output::<K, T> { q, delay }; 117 + (input, output) 118 + }
+20 -4
spacedust/src/server.rs
··· 27 27 const INDEX_HTML: &str = include_str!("../static/index.html"); 28 28 const FAVICON: &[u8] = include_bytes!("../static/favicon.ico"); 29 29 30 - pub async fn serve(b: broadcast::Sender<LinkEvent>, shutdown: CancellationToken) -> Result<(), ServerError> { 30 + pub async fn serve( 31 + b: broadcast::Sender<LinkEvent>, 32 + d: broadcast::Sender<LinkEvent>, 33 + shutdown: CancellationToken 34 + ) -> Result<(), ServerError> { 31 35 let config_logging = ConfigLogging::StderrTerminal { 32 36 level: ConfigLoggingLevel::Info, 33 37 }; ··· 61 65 ); 62 66 63 67 let sub_shutdown = shutdown.clone(); 64 - let ctx = Context { spec, b, shutdown: sub_shutdown }; 68 + let ctx = Context { spec, b, d, shutdown: sub_shutdown }; 65 69 66 70 let server = ServerBuilder::new(api, ctx, log) 67 71 .config(ConfigDropshot { ··· 87 91 struct Context { 88 92 pub spec: Arc<serde_json::Value>, 89 93 pub b: broadcast::Sender<LinkEvent>, 94 + pub d: broadcast::Sender<LinkEvent>, 90 95 pub shutdown: CancellationToken, 91 96 } 92 97 ··· 279 284 } 280 285 } 281 286 287 + #[derive(Deserialize, JsonSchema)] 288 + #[serde(rename_all = "camelCase")] 289 + struct ScalarSubscribeQuery { 290 + #[serde(default)] 291 + pub instant: bool, 292 + } 293 + 282 294 #[channel { 283 295 protocol = WEBSOCKETS, 284 296 path = "/subscribe", ··· 286 298 async fn subscribe( 287 299 reqctx: RequestContext<Context>, 288 300 query: MultiSubscribeQuery, 301 + scalar_query: Query<ScalarSubscribeQuery>, 289 302 upgraded: WebsocketConnection, 290 303 ) -> dropshot::WebsocketChannelResult { 291 304 let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( ··· 295 308 ) 296 309 .await; 297 310 298 - let Context { b, shutdown, .. } = reqctx.context(); 311 + let Context { b, d, shutdown, .. } = reqctx.context(); 299 312 let sub_token = shutdown.child_token(); 300 - let subscription = b.subscribe(); 313 + 314 + let q = scalar_query.into_inner(); 315 + let subscription = if q.instant { b } else { d }.subscribe(); 316 + log::info!("starting subscriber with broadcast: instant={}", q.instant); 301 317 302 318 Subscriber::new(query, sub_token) 303 319 .start(ws, subscription)
+3 -4
spacedust/src/subscriber.rs
··· 23 23 query: MultiSubscribeQuery, 24 24 shutdown: CancellationToken, 25 25 ) -> Self { 26 - log::warn!("new sub..."); 27 26 Self { query, shutdown } 28 27 } 29 28 ··· 32 31 ws: WebSocketStream<WebsocketConnectionRaw>, 33 32 mut receiver: broadcast::Receiver<LinkEvent> 34 33 ) -> Result<(), Box<dyn Error>> { 35 - log::warn!("starting new sub..."); 36 34 let mut ping_state = None; 37 35 let (mut ws_sender, mut ws_receiver) = ws.split(); 38 36 let mut ping_interval = interval(PING_PERIOD); ··· 40 38 41 39 // TODO: do we need to timeout ws sends?? 42 40 43 - metrics::gauge!("subscribers_connected_total").increment(1); 41 + metrics::counter!("subscribers_connected_total").increment(1); 42 + metrics::gauge!("subscribers_connected").increment(1); 44 43 45 44 loop { 46 45 tokio::select! { ··· 111 110 } 112 111 } 113 112 log::trace!("end of subscriber. bye!"); 114 - metrics::gauge!("subscribers_connected_total").decrement(1); 113 + metrics::gauge!("subscribers_connected").decrement(1); 115 114 Ok(()) 116 115 } 117 116