APIs for links and references in the ATmosphere

even nicer shutdown, probably maybe

+78 -26
-1
jetstream/src/error.rs
··· 38 38 pub enum JetstreamEventError { 39 39 #[error("failed to load built-in zstd dictionary for decoding: {0}")] 40 40 CompressionDictionaryError(io::Error), 41 - 42 41 #[error("failed to send ping or pong: {0}")] 43 42 PingPongError(#[from] tokio_tungstenite::tungstenite::Error), 44 43 #[error("jetstream event receiver closed")]
+9 -4
spacedust/src/consumer.rs
··· 42 42 loop { 43 43 if shutdown.is_cancelled() { 44 44 log::info!("exiting consumer for shutdown"); 45 - break; 45 + return Ok(()); 46 46 } 47 47 let Some(event) = receiver.recv().await else { 48 - log::error!("could not receive jetstream event, shutting down..."); 49 - shutdown.cancel(); 48 + log::error!("could not receive jetstream event, bailing"); 50 49 break; 51 50 }; 52 51 ··· 68 67 continue; 69 68 }; 70 69 71 - let jv = record.get().parse()?; 70 + let jv = match record.get().parse() { 71 + Ok(v) => v, 72 + Err(e) => { 73 + log::warn!("jetstream record failed to parse, ignoring: {e}"); 74 + continue; 75 + } 76 + }; 72 77 73 78 // todo: indicate if the link limit was reached (-> links omitted) 74 79 for (i, link) in collect_links(&jv).into_iter().enumerate() {
+22 -2
spacedust/src/error.rs
··· 1 1 use thiserror::Error; 2 2 3 3 #[derive(Debug, Error)] 4 + pub enum MainTaskError { 5 + #[error(transparent)] 6 + ConsumerTaskError(#[from] ConsumerError), 7 + #[error(transparent)] 8 + ServerTaskError(#[from] ServerError), 9 + } 10 + 11 + #[derive(Debug, Error)] 4 12 pub enum ConsumerError { 5 13 #[error(transparent)] 6 14 JetstreamConnectionError(#[from] jetstream::error::ConnectionError), 7 15 #[error(transparent)] 8 16 JetstreamConfigValidationError(#[from] jetstream::error::ConfigValidationError), 9 - #[error(transparent)] 10 - JsonParseError(#[from] tinyjson::JsonParseError), 11 17 #[error("jetstream ended")] 12 18 JetstreamEnded 13 19 } 20 + 21 + #[derive(Debug, Error)] 22 + pub enum ServerError { 23 + #[error("failed to configure server logger: {0}")] 24 + ConfigLogError(std::io::Error), 25 + #[error("failed to render json for openapi: {0}")] 26 + OpenApiJsonFail(serde_json::Error), 27 + #[error(transparent)] 28 + FailedToBuildServer(#[from] dropshot::BuildError), 29 + #[error("server exited: {0}")] 30 + ServerExited(String), 31 + #[error("server closed badly: {0}")] 32 + BadClose(String), 33 + }
+35 -7
spacedust/src/main.rs
··· 1 + use spacedust::error::MainTaskError; 1 2 use spacedust::consumer; 2 3 use spacedust::server; 3 4 ··· 55 56 log::error!("failed to install metrics server: {e:?}"); 56 57 }; 57 58 59 + let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 60 + 58 61 let server_shutdown = shutdown.clone(); 59 - let serving = tokio::spawn(async move { 60 - server::serve(b, server_shutdown).await 62 + tasks.spawn(async move { 63 + server::serve(b, server_shutdown).await?; 64 + Ok(()) 61 65 }); 62 66 63 67 let consumer_shutdown = shutdown.clone(); 64 - let consuming = tokio::spawn(async move { 65 - consumer::consume(consumer_sender, args.jetstream, None, args.jetstream_no_zstd, consumer_shutdown).await 68 + tasks.spawn(async move { 69 + consumer::consume( 70 + consumer_sender, 71 + args.jetstream, 72 + None, 73 + args.jetstream_no_zstd, 74 + consumer_shutdown 75 + ) 76 + .await?; 77 + Ok(()) 66 78 }); 67 79 68 - let (served, consumed) = tokio::join!(serving, consuming); 69 - log::info!("serving ended: {served:?}"); 70 - log::info!("consuming ended: {consumed:?}"); 80 + tokio::select! { 81 + _ = shutdown.cancelled() => log::warn!("shutdown requested"), 82 + Some(r) = tasks.join_next() => { 83 + log::warn!("a task exited, shutting down: {r:?}"); 84 + shutdown.cancel(); 85 + } 86 + } 87 + 88 + tokio::select! { 89 + _ = async { 90 + while let Some(completed) = tasks.join_next().await { 91 + log::info!("shutdown: task completed: {completed:?}"); 92 + } 93 + } => {}, 94 + _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { 95 + log::info!("shutdown: not all tasks completed on time. aborting..."); 96 + tasks.shutdown().await; 97 + }, 98 + } 71 99 72 100 log::info!("bye!"); 73 101
+11 -11
spacedust/src/server.rs
··· 1 + use crate::error::ServerError; 1 2 use crate::subscriber::Subscriber; 2 3 use metrics::{histogram, counter}; 3 4 use std::sync::Arc; ··· 26 27 const INDEX_HTML: &str = include_str!("../static/index.html"); 27 28 const FAVICON: &[u8] = include_bytes!("../static/favicon.ico"); 28 29 29 - pub async fn serve(b: broadcast::Sender<LinkEvent>, shutdown: CancellationToken) -> Result<(), String> { 30 + pub async fn serve(b: broadcast::Sender<LinkEvent>, shutdown: CancellationToken) -> Result<(), ServerError> { 30 31 let config_logging = ConfigLogging::StderrTerminal { 31 32 level: ConfigLoggingLevel::Info, 32 33 }; 33 34 34 35 let log = config_logging 35 36 .to_logger("example-basic") 36 - .map_err(|error| format!("failed to create logger: {}", error))?; 37 + .map_err(ServerError::ConfigLogError)?; 37 38 38 39 let mut api = ApiDescription::new(); 39 40 api.register(index).unwrap(); ··· 56 57 .contact_name("part of @microcosm.blue") 57 58 .contact_url("https://microcosm.blue") 58 59 .json() 59 - .map_err(|e| e.to_string())?, 60 + .map_err(ServerError::OpenApiJsonFail)?, 60 61 ); 61 62 62 63 let sub_shutdown = shutdown.clone(); ··· 67 68 bind_address: "0.0.0.0:9998".parse().unwrap(), 68 69 ..Default::default() 69 70 }) 70 - .start() 71 - .map_err(|error| format!("failed to create server: {}", error))?; 71 + .start()?; 72 72 73 73 tokio::select! { 74 74 s = server.wait_for_shutdown() => { 75 - log::error!("dropshot server ended: {s:?}"); 76 - s 75 + s.map_err(ServerError::ServerExited)?; 76 + log::info!("server shut down normally."); 77 77 }, 78 78 _ = shutdown.cancelled() => { 79 - log::info!("shutting down server"); 80 - server.close().await?; 81 - Err("shutdown requested".to_string()) 82 - } 79 + log::info!("shutting down: closing server"); 80 + server.close().await.map_err(ServerError::BadClose)?; 81 + }, 83 82 } 83 + Ok(()) 84 84 } 85 85 86 86 #[derive(Debug, Clone)]
+1 -1
ufos/src/storage_fjall.rs
··· 40 40 /// 41 41 /// new data format, roughly: 42 42 /// 43 - /// Partion: 'global' 43 + /// Partition: 'global' 44 44 /// 45 45 /// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps) 46 46 /// - key: "js_cursor" (literal)