Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::ClientMessage;
2use crate::error::ServerError;
3use crate::subscriber::Subscriber;
4use dropshot::{
5 ApiDescription, ApiEndpointBodyContentType, Body, ConfigDropshot, ConfigLogging,
6 ConfigLoggingLevel, ExtractorMetadata, HttpError, HttpResponse, Query, RequestContext,
7 ServerBuilder, ServerContext, SharedExtractor, WebsocketConnection, channel, endpoint,
8};
9use http::{
10 Response, StatusCode,
11 header::{ORIGIN, USER_AGENT},
12};
13use metrics::{counter, histogram};
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use schemars::JsonSchema;
18use serde::{Deserialize, Serialize};
19use std::collections::HashSet;
20use tokio::sync::broadcast;
21use tokio::time::Instant;
22use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig};
23use tokio_util::sync::CancellationToken;
24
25const INDEX_HTML: &str = include_str!("../static/index.html");
26const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
27
28pub async fn serve(
29 b: broadcast::Sender<Arc<ClientMessage>>,
30 d: broadcast::Sender<Arc<ClientMessage>>,
31 shutdown: CancellationToken,
32 bind: std::net::SocketAddr,
33) -> Result<(), ServerError> {
34 let config_logging = ConfigLogging::StderrTerminal {
35 level: ConfigLoggingLevel::Info,
36 };
37
38 let log = config_logging
39 .to_logger("example-basic")
40 .map_err(ServerError::ConfigLogError)?;
41
42 let mut api = ApiDescription::new();
43 api.register(index).unwrap();
44 api.register(favicon).unwrap();
45 api.register(openapi).unwrap();
46 api.register(subscribe).unwrap();
47
48 // TODO: put spec in a once cell / lazy lock thing?
49 let spec = Arc::new(
50 api.openapi(
51 "Spacedust",
52 env!("CARGO_PKG_VERSION")
53 .parse()
54 .inspect_err(|e| {
55 eprintln!("failed to parse cargo package version for openapi: {e:?}")
56 })
57 .unwrap_or(semver::Version::new(0, 0, 1)),
58 )
59 .description("A configurable ATProto notifications firehose.")
60 .contact_name("part of @microcosm.blue")
61 .contact_url("https://microcosm.blue")
62 .json()
63 .map_err(ServerError::OpenApiJsonFail)?,
64 );
65
66 let sub_shutdown = shutdown.clone();
67 let ctx = Context {
68 spec,
69 b,
70 d,
71 shutdown: sub_shutdown,
72 };
73
74 let server = ServerBuilder::new(api, ctx, log)
75 .config(ConfigDropshot {
76 bind_address: bind,
77 ..Default::default()
78 })
79 .start()?;
80
81 tokio::select! {
82 s = server.wait_for_shutdown() => {
83 s.map_err(ServerError::ServerExited)?;
84 log::info!("server shut down normally.");
85 },
86 _ = shutdown.cancelled() => {
87 log::info!("shutting down: closing server");
88 server.close().await.map_err(ServerError::BadClose)?;
89 },
90 }
91 Ok(())
92}
93
94#[derive(Debug, Clone)]
95struct Context {
96 pub spec: Arc<serde_json::Value>,
97 pub b: broadcast::Sender<Arc<ClientMessage>>,
98 pub d: broadcast::Sender<Arc<ClientMessage>>,
99 pub shutdown: CancellationToken,
100}
101
102async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError>
103where
104 R: HttpResponse,
105 H: Future<Output = Result<R, HttpError>>,
106 T: ServerContext,
107{
108 let start = Instant::now();
109 let result = handler.await;
110 let latency = start.elapsed();
111 let status_code = match &result {
112 Ok(response) => response.status_code(),
113 Err(e) => e.status_code.as_status(),
114 }
115 .as_str() // just the number (.to_string()'s Display does eg `200 OK`)
116 .to_string();
117 let endpoint = ctx.endpoint.operation_id.clone();
118 let headers = ctx.request.headers();
119 let origin = headers
120 .get(ORIGIN)
121 .and_then(|v| v.to_str().ok())
122 .unwrap_or("")
123 .to_string();
124 let ua = headers
125 .get(USER_AGENT)
126 .and_then(|v| v.to_str().ok())
127 .map(|ua| {
128 if ua.starts_with("Mozilla/5.0 ") {
129 "browser"
130 } else {
131 ua
132 }
133 })
134 .unwrap_or("")
135 .to_string();
136 counter!("server_requests_total",
137 "endpoint" => endpoint.clone(),
138 "origin" => origin,
139 "ua" => ua,
140 "status_code" => status_code,
141 )
142 .increment(1);
143 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64);
144 result
145}
146
147use dropshot::{HttpResponseHeaders, HttpResponseOk};
148
149pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
150
151/// Helper for constructing Ok responses: return OkCors(T).into()
152/// (not happy with this yet)
153pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
154
155impl<T> From<OkCors<T>> for OkCorsResponse<T>
156where
157 T: Serialize + JsonSchema + Send + Sync,
158{
159 fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
160 let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
161 res.headers_mut()
162 .insert("access-control-allow-origin", "*".parse().unwrap());
163 Ok(res)
164 }
165}
166
167// TODO: cors for HttpError
168
169/// Serve index page as html
170#[endpoint {
171 method = GET,
172 path = "/",
173 /*
174 * not useful to have this in openapi
175 */
176 unpublished = true,
177}]
178async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
179 instrument_handler(&ctx, async {
180 Ok(Response::builder()
181 .status(StatusCode::OK)
182 .header(http::header::CONTENT_TYPE, "text/html")
183 .body(INDEX_HTML.into())?)
184 })
185 .await
186}
187
188/// Serve index page as html
189#[endpoint {
190 method = GET,
191 path = "/favicon.ico",
192 /*
193 * not useful to have this in openapi
194 */
195 unpublished = true,
196}]
197async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
198 instrument_handler(&ctx, async {
199 Ok(Response::builder()
200 .status(StatusCode::OK)
201 .header(http::header::CONTENT_TYPE, "image/x-icon")
202 .body(FAVICON.to_vec().into())?)
203 })
204 .await
205}
206
207/// Meta: get the openapi spec for this api
208#[endpoint {
209 method = GET,
210 path = "/openapi",
211 /*
212 * not useful to have this in openapi
213 */
214 unpublished = true,
215}]
216async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
217 instrument_handler(&ctx, async {
218 let spec = (*ctx.context().spec).clone();
219 OkCors(spec).into()
220 })
221 .await
222}
223
224/// The real type that gets deserialized
225#[derive(Debug, Deserialize, JsonSchema)]
226#[serde(rename_all = "camelCase")]
227pub struct MultiSubscribeQuery {
228 #[serde(default)]
229 pub wanted_subjects: HashSet<String>,
230 #[serde(default)]
231 pub wanted_subject_prefixes: HashSet<String>,
232 #[serde(default)]
233 pub wanted_subject_dids: HashSet<String>,
234 #[serde(default)]
235 pub wanted_sources: HashSet<String>,
236}
237/// The fake corresponding type for docs that dropshot won't freak out about a
238/// vec for
239#[derive(Deserialize, JsonSchema)]
240#[allow(dead_code)]
241#[serde(rename_all = "camelCase")]
242struct MultiSubscribeQueryForDocs {
243 /// One or more at-uris to receive links about
244 ///
245 /// The at-uri must be url-encoded
246 ///
247 /// Pass this parameter multiple times to specify multiple subjects, like
248 /// `wantedSubjects=[...]&wantedSubjects=[...]`
249 pub wanted_subjects: String,
250 /// One or more at-uri, URI, or DID prefixes to receive links about
251 ///
252 /// The uri must be url-encoded
253 ///
254 /// Pass this parameter multiple times to specify multiple prefixes, like
255 /// `wantedSubjectPrefixes=[...]&wantedSubjectPrefixes=[...]`
256 pub wanted_subject_prefixes: String,
257 /// One or more DIDs to receive links about
258 ///
259 /// Pass this parameter multiple times to specify multiple subjects
260 pub wanted_subject_dids: String,
261 /// One or more link sources to receive links about
262 ///
263 /// TODO: docs about link sources
264 ///
265 /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri`
266 ///
267 /// Pass this parameter multiple times to specify multiple sources
268 pub wanted_sources: String,
269}
270
271// The `SharedExtractor` implementation for Query<QueryType> describes how to
272// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
273// parsing the query string to an instance of `QueryType`.
274#[async_trait]
275impl SharedExtractor for MultiSubscribeQuery {
276 async fn from_request<Context: ServerContext>(
277 ctx: &RequestContext<Context>,
278 ) -> Result<MultiSubscribeQuery, HttpError> {
279 let raw_query = ctx.request.uri().query().unwrap_or("");
280 let q = serde_qs::from_str(raw_query).map_err(|e| {
281 HttpError::for_bad_request(None, format!("unable to parse query string: {e}"))
282 })?;
283 Ok(q)
284 }
285
286 fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
287 // HACK: query type switcheroo: passing MultiSubscribeQuery to
288 // `metadata` would "helpfully" panic because dropshot believes we can
289 // only have scalar types in a query.
290 //
291 // so instead we have a fake second type whose only job is to look the
292 // same as MultiSubscribeQuery exept that it has `String` instead of
293 // `Vec<String>`, which dropshot will accept, and generate ~close-enough
294 // docs for.
295 <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type)
296 }
297}
298
299#[derive(Deserialize, JsonSchema)]
300#[serde(rename_all = "camelCase")]
301struct ScalarSubscribeQuery {
302 /// Bypass the 21-sec delay buffer
303 ///
304 /// By default, spacedust holds all firehose links for 21 seconds before
305 /// emitting them, to prevent quickly- undone interactions from generating
306 /// notifications.
307 ///
308 /// Setting `instant` to true bypasses this buffer, allowing faster (and
309 /// noisier) notification delivery.
310 ///
311 /// Typically [a little less than 1%](https://bsky.app/profile/bad-example.com/post/3ls32wctsrs2l)
312 /// of links links get deleted within 21s of being created.
313 #[serde(default)]
314 pub instant: bool,
315}
316
317#[channel {
318 protocol = WEBSOCKETS,
319 path = "/subscribe",
320}]
321async fn subscribe(
322 reqctx: RequestContext<Context>,
323 query: MultiSubscribeQuery,
324 scalar_query: Query<ScalarSubscribeQuery>,
325 upgraded: WebsocketConnection,
326) -> dropshot::WebsocketChannelResult {
327 let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
328 upgraded.into_inner(),
329 Role::Server,
330 Some(WebSocketConfig::default().max_message_size(
331 Some(10 * 2_usize.pow(20)), // 10MiB, matching jetstream
332 )),
333 )
334 .await;
335
336 let Context { b, d, shutdown, .. } = reqctx.context();
337 let sub_token = shutdown.child_token();
338
339 let q = scalar_query.into_inner();
340 let subscription = if q.instant { b } else { d }.subscribe();
341 log::info!("starting subscriber with broadcast: instant={}", q.instant);
342
343 Subscriber::new(query, sub_token)
344 .start(ws, subscription)
345 .await
346 .map_err(|e| format!("boo: {e:?}"))?;
347
348 Ok(())
349}