APIs for links and references in the ATmosphere

handle shutdown in all tasks

+47 -12
+2
slingshot/src/error.rs
··· 54 ServerTaskError(#[from] ServerError), 55 #[error(transparent)] 56 IdentityTaskError(#[from] IdentityError), 57 } 58 59 #[derive(Debug, Error)]
··· 54 ServerTaskError(#[from] ServerError), 55 #[error(transparent)] 56 IdentityTaskError(#[from] IdentityError), 57 + #[error("firehose cache failed to close: {0}")] 58 + FirehoseCacheCloseError(foyer::Error), 59 } 60 61 #[derive(Debug, Error)]
+5 -1
slingshot/src/firehose_cache.rs
··· 10 .memory(64 * 2_usize.pow(20)) 11 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 .storage(Engine::large()) 13 - .with_device_options(DirectFsDeviceOptions::new(cache_dir)) 14 .build() 15 .await 16 .map_err(|e| format!("foyer setup error: {e:?}"))?;
··· 10 .memory(64 * 2_usize.pow(20)) 11 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 .storage(Engine::large()) 13 + .with_device_options( 14 + DirectFsDeviceOptions::new(cache_dir) 15 + .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 16 + .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 17 + ) 18 .build() 19 .await 20 .map_err(|e| format!("foyer setup error: {e:?}"))?;
+17 -3
slingshot/src/identity.rs
··· 13 /// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14 use std::time::Duration; 15 use tokio::sync::Mutex; 16 17 use crate::error::IdentityError; 18 use atrium_api::{ ··· 175 .with_name("identity") 176 .memory(16 * 2_usize.pow(20)) 177 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 178 - .storage(Engine::large()) 179 - .with_device_options(DirectFsDeviceOptions::new(cache_dir)) 180 .build() 181 .await?; 182 ··· 403 } 404 405 /// run the refresh queue consumer 406 - pub async fn run_refresher(&self) -> Result<(), IdentityError> { 407 let _guard = self 408 .refresher 409 .try_lock() 410 .expect("there to only be one refresher running"); 411 loop { 412 let Some(task_key) = self.peek_refresh().await else { 413 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 414 continue;
··· 13 /// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14 use std::time::Duration; 15 use tokio::sync::Mutex; 16 + use tokio_util::sync::CancellationToken; 17 18 use crate::error::IdentityError; 19 use atrium_api::{ ··· 176 .with_name("identity") 177 .memory(16 * 2_usize.pow(20)) 178 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 179 + .storage(Engine::small()) 180 + .with_device_options( 181 + DirectFsDeviceOptions::new(cache_dir) 182 + .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 183 + .with_file_size(2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 184 + ) 185 .build() 186 .await?; 187 ··· 408 } 409 410 /// run the refresh queue consumer 411 + pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 412 let _guard = self 413 .refresher 414 .try_lock() 415 .expect("there to only be one refresher running"); 416 loop { 417 + if shutdown.is_cancelled() { 418 + log::info!("identity refresher: exiting for shutdown: closing cache..."); 419 + if let Err(e) = self.cache.close().await { 420 + log::error!("cache close errored: {e}"); 421 + } else { 422 + log::info!("identity cache closed.") 423 + } 424 + return Ok(()); 425 + } 426 let Some(task_key) = self.peek_refresh().await else { 427 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 428 continue;
+12 -3
slingshot/src/main.rs
··· 89 .map_err(|e| format!("identity setup failed: {e:?}"))?; 90 log::info!("identity service ready."); 91 let identity_refresher = identity.clone(); 92 tasks.spawn(async move { 93 - identity_refresher.run_refresher().await?; 94 Ok(()) 95 }); 96 ··· 113 }); 114 115 let consumer_shutdown = shutdown.clone(); 116 tasks.spawn(async move { 117 consume( 118 args.jetstream, 119 None, 120 args.jetstream_no_zstd, 121 consumer_shutdown, 122 - cache, 123 ) 124 .await?; 125 Ok(()) ··· 133 } 134 } 135 136 tokio::select! { 137 _ = async { 138 while let Some(completed) = tasks.join_next().await { 139 log::info!("shutdown: task completed: {completed:?}"); 140 } 141 } => {}, 142 - _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { 143 log::info!("shutdown: not all tasks completed on time. aborting..."); 144 tasks.shutdown().await; 145 },
··· 89 .map_err(|e| format!("identity setup failed: {e:?}"))?; 90 log::info!("identity service ready."); 91 let identity_refresher = identity.clone(); 92 + let identity_shutdown = shutdown.clone(); 93 tasks.spawn(async move { 94 + identity_refresher.run_refresher(identity_shutdown).await?; 95 Ok(()) 96 }); 97 ··· 114 }); 115 116 let consumer_shutdown = shutdown.clone(); 117 + let consumer_cache = cache.clone(); 118 tasks.spawn(async move { 119 consume( 120 args.jetstream, 121 None, 122 args.jetstream_no_zstd, 123 consumer_shutdown, 124 + consumer_cache, 125 ) 126 .await?; 127 Ok(()) ··· 135 } 136 } 137 138 + tasks.spawn(async move { 139 + cache 140 + .close() 141 + .await 142 + .map_err(MainTaskError::FirehoseCacheCloseError) 143 + }); 144 + 145 tokio::select! { 146 _ = async { 147 while let Some(completed) = tasks.join_next().await { 148 log::info!("shutdown: task completed: {completed:?}"); 149 } 150 } => {}, 151 + _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => { 152 log::info!("shutdown: not all tasks completed on time. aborting..."); 153 tasks.shutdown().await; 154 },
+11 -5
slingshot/src/server.rs
··· 410 host: Option<String>, 411 acme_contact: Option<String>, 412 certs: Option<PathBuf>, 413 - _shutdown: CancellationToken, 414 ) -> Result<(), ServerError> { 415 let repo = Arc::new(repo); 416 let api_service = OpenApiService::new( ··· 452 } 453 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 454 455 - run(TcpListener::bind("0.0.0.0:443").acme(auto_cert), app).await 456 } else { 457 - run(TcpListener::bind("127.0.0.1:3000"), app).await 458 } 459 } 460 461 - async fn run<L>(listener: L, app: Route) -> Result<(), ServerError> 462 where 463 L: Listener + 'static, 464 { ··· 472 .with(Tracing); 473 Server::new(listener) 474 .name("slingshot") 475 - .run(app) 476 .await 477 .map_err(ServerError::ServerExited) 478 }
··· 410 host: Option<String>, 411 acme_contact: Option<String>, 412 certs: Option<PathBuf>, 413 + shutdown: CancellationToken, 414 ) -> Result<(), ServerError> { 415 let repo = Arc::new(repo); 416 let api_service = OpenApiService::new( ··· 452 } 453 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 454 455 + run( 456 + TcpListener::bind("0.0.0.0:443").acme(auto_cert), 457 + app, 458 + shutdown, 459 + ) 460 + .await 461 } else { 462 + run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await 463 } 464 } 465 466 + async fn run<L>(listener: L, app: Route, shutdown: CancellationToken) -> Result<(), ServerError> 467 where 468 L: Listener + 'static, 469 { ··· 477 .with(Tracing); 478 Server::new(listener) 479 .name("slingshot") 480 + .run_with_graceful_shutdown(app, shutdown.cancelled(), None) 481 .await 482 .map_err(ServerError::ServerExited) 483 + .inspect(|()| log::info!("server ended. goodbye.")) 484 }