Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 349 lines 11 kB view raw
1use crate::ClientMessage; 2use crate::error::ServerError; 3use crate::subscriber::Subscriber; 4use dropshot::{ 5 ApiDescription, ApiEndpointBodyContentType, Body, ConfigDropshot, ConfigLogging, 6 ConfigLoggingLevel, ExtractorMetadata, HttpError, HttpResponse, Query, RequestContext, 7 ServerBuilder, ServerContext, SharedExtractor, WebsocketConnection, channel, endpoint, 8}; 9use http::{ 10 Response, StatusCode, 11 header::{ORIGIN, USER_AGENT}, 12}; 13use metrics::{counter, histogram}; 14use std::sync::Arc; 15 16use async_trait::async_trait; 17use schemars::JsonSchema; 18use serde::{Deserialize, Serialize}; 19use std::collections::HashSet; 20use tokio::sync::broadcast; 21use tokio::time::Instant; 22use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig}; 23use tokio_util::sync::CancellationToken; 24 25const INDEX_HTML: &str = include_str!("../static/index.html"); 26const FAVICON: &[u8] = include_bytes!("../static/favicon.ico"); 27 28pub async fn serve( 29 b: broadcast::Sender<Arc<ClientMessage>>, 30 d: broadcast::Sender<Arc<ClientMessage>>, 31 shutdown: CancellationToken, 32 bind: std::net::SocketAddr, 33) -> Result<(), ServerError> { 34 let config_logging = ConfigLogging::StderrTerminal { 35 level: ConfigLoggingLevel::Info, 36 }; 37 38 let log = config_logging 39 .to_logger("example-basic") 40 .map_err(ServerError::ConfigLogError)?; 41 42 let mut api = ApiDescription::new(); 43 api.register(index).unwrap(); 44 api.register(favicon).unwrap(); 45 api.register(openapi).unwrap(); 46 api.register(subscribe).unwrap(); 47 48 // TODO: put spec in a once cell / lazy lock thing? 49 let spec = Arc::new( 50 api.openapi( 51 "Spacedust", 52 env!("CARGO_PKG_VERSION") 53 .parse() 54 .inspect_err(|e| { 55 eprintln!("failed to parse cargo package version for openapi: {e:?}") 56 }) 57 .unwrap_or(semver::Version::new(0, 0, 1)), 58 ) 59 .description("A configurable ATProto notifications firehose.") 60 .contact_name("part of @microcosm.blue") 61 .contact_url("https://microcosm.blue") 62 .json() 63 .map_err(ServerError::OpenApiJsonFail)?, 64 ); 65 66 let sub_shutdown = shutdown.clone(); 67 let ctx = Context { 68 spec, 69 b, 70 d, 71 shutdown: sub_shutdown, 72 }; 73 74 let server = ServerBuilder::new(api, ctx, log) 75 .config(ConfigDropshot { 76 bind_address: bind, 77 ..Default::default() 78 }) 79 .start()?; 80 81 tokio::select! { 82 s = server.wait_for_shutdown() => { 83 s.map_err(ServerError::ServerExited)?; 84 log::info!("server shut down normally."); 85 }, 86 _ = shutdown.cancelled() => { 87 log::info!("shutting down: closing server"); 88 server.close().await.map_err(ServerError::BadClose)?; 89 }, 90 } 91 Ok(()) 92} 93 94#[derive(Debug, Clone)] 95struct Context { 96 pub spec: Arc<serde_json::Value>, 97 pub b: broadcast::Sender<Arc<ClientMessage>>, 98 pub d: broadcast::Sender<Arc<ClientMessage>>, 99 pub shutdown: CancellationToken, 100} 101 102async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError> 103where 104 R: HttpResponse, 105 H: Future<Output = Result<R, HttpError>>, 106 T: ServerContext, 107{ 108 let start = Instant::now(); 109 let result = handler.await; 110 let latency = start.elapsed(); 111 let status_code = match &result { 112 Ok(response) => response.status_code(), 113 Err(e) => e.status_code.as_status(), 114 } 115 .as_str() // just the number (.to_string()'s Display does eg `200 OK`) 116 .to_string(); 117 let endpoint = ctx.endpoint.operation_id.clone(); 118 let headers = ctx.request.headers(); 119 let origin = headers 120 .get(ORIGIN) 121 .and_then(|v| v.to_str().ok()) 122 .unwrap_or("") 123 .to_string(); 124 let ua = headers 125 .get(USER_AGENT) 126 .and_then(|v| v.to_str().ok()) 127 .map(|ua| { 128 if ua.starts_with("Mozilla/5.0 ") { 129 "browser" 130 } else { 131 ua 132 } 133 }) 134 .unwrap_or("") 135 .to_string(); 136 counter!("server_requests_total", 137 "endpoint" => endpoint.clone(), 138 "origin" => origin, 139 "ua" => ua, 140 "status_code" => status_code, 141 ) 142 .increment(1); 143 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64); 144 result 145} 146 147use dropshot::{HttpResponseHeaders, HttpResponseOk}; 148 149pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>; 150 151/// Helper for constructing Ok responses: return OkCors(T).into() 152/// (not happy with this yet) 153pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T); 154 155impl<T> From<OkCors<T>> for OkCorsResponse<T> 156where 157 T: Serialize + JsonSchema + Send + Sync, 158{ 159 fn from(ok: OkCors<T>) -> OkCorsResponse<T> { 160 let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0)); 161 res.headers_mut() 162 .insert("access-control-allow-origin", "*".parse().unwrap()); 163 Ok(res) 164 } 165} 166 167// TODO: cors for HttpError 168 169/// Serve index page as html 170#[endpoint { 171 method = GET, 172 path = "/", 173 /* 174 * not useful to have this in openapi 175 */ 176 unpublished = true, 177}] 178async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 179 instrument_handler(&ctx, async { 180 Ok(Response::builder() 181 .status(StatusCode::OK) 182 .header(http::header::CONTENT_TYPE, "text/html") 183 .body(INDEX_HTML.into())?) 184 }) 185 .await 186} 187 188/// Serve index page as html 189#[endpoint { 190 method = GET, 191 path = "/favicon.ico", 192 /* 193 * not useful to have this in openapi 194 */ 195 unpublished = true, 196}] 197async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 198 instrument_handler(&ctx, async { 199 Ok(Response::builder() 200 .status(StatusCode::OK) 201 .header(http::header::CONTENT_TYPE, "image/x-icon") 202 .body(FAVICON.to_vec().into())?) 203 }) 204 .await 205} 206 207/// Meta: get the openapi spec for this api 208#[endpoint { 209 method = GET, 210 path = "/openapi", 211 /* 212 * not useful to have this in openapi 213 */ 214 unpublished = true, 215}] 216async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> { 217 instrument_handler(&ctx, async { 218 let spec = (*ctx.context().spec).clone(); 219 OkCors(spec).into() 220 }) 221 .await 222} 223 224/// The real type that gets deserialized 225#[derive(Debug, Deserialize, JsonSchema)] 226#[serde(rename_all = "camelCase")] 227pub struct MultiSubscribeQuery { 228 #[serde(default)] 229 pub wanted_subjects: HashSet<String>, 230 #[serde(default)] 231 pub wanted_subject_prefixes: HashSet<String>, 232 #[serde(default)] 233 pub wanted_subject_dids: HashSet<String>, 234 #[serde(default)] 235 pub wanted_sources: HashSet<String>, 236} 237/// The fake corresponding type for docs that dropshot won't freak out about a 238/// vec for 239#[derive(Deserialize, JsonSchema)] 240#[allow(dead_code)] 241#[serde(rename_all = "camelCase")] 242struct MultiSubscribeQueryForDocs { 243 /// One or more at-uris to receive links about 244 /// 245 /// The at-uri must be url-encoded 246 /// 247 /// Pass this parameter multiple times to specify multiple subjects, like 248 /// `wantedSubjects=[...]&wantedSubjects=[...]` 249 pub wanted_subjects: String, 250 /// One or more at-uri, URI, or DID prefixes to receive links about 251 /// 252 /// The uri must be url-encoded 253 /// 254 /// Pass this parameter multiple times to specify multiple prefixes, like 255 /// `wantedSubjectPrefixes=[...]&wantedSubjectPrefixes=[...]` 256 pub wanted_subject_prefixes: String, 257 /// One or more DIDs to receive links about 258 /// 259 /// Pass this parameter multiple times to specify multiple subjects 260 pub wanted_subject_dids: String, 261 /// One or more link sources to receive links about 262 /// 263 /// TODO: docs about link sources 264 /// 265 /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri` 266 /// 267 /// Pass this parameter multiple times to specify multiple sources 268 pub wanted_sources: String, 269} 270 271// The `SharedExtractor` implementation for Query<QueryType> describes how to 272// construct an instance of `Query<QueryType>` from an HTTP request: namely, by 273// parsing the query string to an instance of `QueryType`. 274#[async_trait] 275impl SharedExtractor for MultiSubscribeQuery { 276 async fn from_request<Context: ServerContext>( 277 ctx: &RequestContext<Context>, 278 ) -> Result<MultiSubscribeQuery, HttpError> { 279 let raw_query = ctx.request.uri().query().unwrap_or(""); 280 let q = serde_qs::from_str(raw_query).map_err(|e| { 281 HttpError::for_bad_request(None, format!("unable to parse query string: {e}")) 282 })?; 283 Ok(q) 284 } 285 286 fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata { 287 // HACK: query type switcheroo: passing MultiSubscribeQuery to 288 // `metadata` would "helpfully" panic because dropshot believes we can 289 // only have scalar types in a query. 290 // 291 // so instead we have a fake second type whose only job is to look the 292 // same as MultiSubscribeQuery exept that it has `String` instead of 293 // `Vec<String>`, which dropshot will accept, and generate ~close-enough 294 // docs for. 295 <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type) 296 } 297} 298 299#[derive(Deserialize, JsonSchema)] 300#[serde(rename_all = "camelCase")] 301struct ScalarSubscribeQuery { 302 /// Bypass the 21-sec delay buffer 303 /// 304 /// By default, spacedust holds all firehose links for 21 seconds before 305 /// emitting them, to prevent quickly- undone interactions from generating 306 /// notifications. 307 /// 308 /// Setting `instant` to true bypasses this buffer, allowing faster (and 309 /// noisier) notification delivery. 310 /// 311 /// Typically [a little less than 1%](https://bsky.app/profile/bad-example.com/post/3ls32wctsrs2l) 312 /// of links links get deleted within 21s of being created. 313 #[serde(default)] 314 pub instant: bool, 315} 316 317#[channel { 318 protocol = WEBSOCKETS, 319 path = "/subscribe", 320}] 321async fn subscribe( 322 reqctx: RequestContext<Context>, 323 query: MultiSubscribeQuery, 324 scalar_query: Query<ScalarSubscribeQuery>, 325 upgraded: WebsocketConnection, 326) -> dropshot::WebsocketChannelResult { 327 let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( 328 upgraded.into_inner(), 329 Role::Server, 330 Some(WebSocketConfig::default().max_message_size( 331 Some(10 * 2_usize.pow(20)), // 10MiB, matching jetstream 332 )), 333 ) 334 .await; 335 336 let Context { b, d, shutdown, .. } = reqctx.context(); 337 let sub_token = shutdown.child_token(); 338 339 let q = scalar_query.into_inner(); 340 let subscription = if q.instant { b } else { d }.subscribe(); 341 log::info!("starting subscriber with broadcast: instant={}", q.instant); 342 343 Subscriber::new(query, sub_token) 344 .start(ws, subscription) 345 .await 346 .map_err(|e| format!("boo: {e:?}"))?; 347 348 Ok(()) 349}