···7272 &*commit.collection,
7373 &*commit.rkey,
7474 ),
7575+ rev: commit.rev.to_string(),
7576 target: link.target.into_string(),
7677 };
7778 let _ = b.send(link_ev); // only errors if no subscribers are connected, which is just fine.
+11
spacedust/src/lib.rs
···11pub mod consumer;
22pub mod server;
33+pub mod subscriber;
3445use serde::Serialize;
56···910 path: String,
1011 origin: String,
1112 target: String,
1313+ rev: String,
1414+}
1515+1616+#[derive(Debug, Serialize)]
1717+#[serde(rename_all="snake_case")]
1818+pub struct ClientEvent {
1919+ kind: String,
2020+ link: ClientLinkEvent,
1221}
13221423#[derive(Debug, Serialize)]
···1625 operation: String,
1726 source: String,
1827 source_record: String,
2828+ source_rev: String,
1929 subject: String,
2030 // TODO: include the record too? would save clients a level of hydration
2131}
···3040 operation: "create".to_string(),
3141 source: format!("{}:{undotted}", link.collection),
3242 source_record: link.origin,
4343+ source_rev: link.rev,
3344 subject: link.target,
3445 }
3546 }
+244-41
spacedust/src/server.rs
···11-use crate::{ClientLinkEvent, LinkEvent};
11+use crate::subscriber;
22+use metrics::{histogram, counter};
33+use std::sync::Arc;
44+use crate::LinkEvent;
55+use http::{
66+ header::{ORIGIN, USER_AGENT},
77+ Response, StatusCode,
88+};
29use dropshot::{
1010+ Body,
311 ApiDescription, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, Query, RequestContext,
44- ServerBuilder, WebsocketConnection, channel,
1212+ ServerBuilder, WebsocketConnection, channel, endpoint, HttpResponse,
1313+ ApiEndpointBodyContentType, ExtractorMetadata, HttpError, ServerContext,
1414+ SharedExtractor,
515};
66-use futures::SinkExt;
1616+717use schemars::JsonSchema;
818use serde::{Deserialize, Serialize};
919use tokio::sync::broadcast;
1010-use tokio_tungstenite::tungstenite::Message;
2020+use tokio::time::Instant;
1121use tokio_tungstenite::tungstenite::protocol::Role;
2222+use async_trait::async_trait;
2323+use std::collections::HashSet;
2424+2525+const INDEX_HTML: &str = include_str!("../static/index.html");
2626+const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
12271328pub async fn serve(b: broadcast::Sender<LinkEvent>) -> Result<(), String> {
1429 let config_logging = ConfigLogging::StderrTerminal {
···2035 .map_err(|error| format!("failed to create logger: {}", error))?;
21362237 let mut api = ApiDescription::new();
3838+ api.register(index).unwrap();
3939+ api.register(favicon).unwrap();
4040+ api.register(openapi).unwrap();
2341 api.register(subscribe).unwrap();
24422525- let server = ServerBuilder::new(api, b, log)
4343+ // TODO: put spec in a once cell / lazy lock thing?
4444+ let spec = Arc::new(
4545+ api.openapi(
4646+ "Spacedust",
4747+ env!("CARGO_PKG_VERSION")
4848+ .parse()
4949+ .inspect_err(|e| {
5050+ eprintln!("failed to parse cargo package version for openapi: {e:?}")
5151+ })
5252+ .unwrap_or(semver::Version::new(0, 0, 1)),
5353+ )
5454+ .description("A configurable ATProto notifications firehose.")
5555+ .contact_name("part of @microcosm.blue")
5656+ .contact_url("https://microcosm.blue")
5757+ .json()
5858+ .map_err(|e| e.to_string())?,
5959+ );
6060+6161+ let ctx = Context { spec, b };
6262+6363+ let server = ServerBuilder::new(api, ctx, log)
2664 .config(ConfigDropshot {
2765 bind_address: "0.0.0.0:9998".parse().unwrap(),
2866 ..Default::default()
···3371 server.await
3472}
35733636-#[derive(Debug, Serialize)]
3737-#[serde(rename_all="snake_case")]
3838-struct ClientEvent {
3939- r#type: String,
4040- link: ClientLinkEvent,
7474+#[derive(Debug, Clone)]
7575+struct Context {
7676+ pub spec: Arc<serde_json::Value>,
7777+ pub b: broadcast::Sender<LinkEvent>,
7878+}
7979+8080+async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError>
8181+where
8282+ R: HttpResponse,
8383+ H: Future<Output = Result<R, HttpError>>,
8484+ T: ServerContext,
8585+{
8686+ let start = Instant::now();
8787+ let result = handler.await;
8888+ let latency = start.elapsed();
8989+ let status_code = match &result {
9090+ Ok(response) => response.status_code(),
9191+ Err(e) => e.status_code.as_status(),
9292+ }
9393+ .as_str() // just the number (.to_string()'s Display does eg `200 OK`)
9494+ .to_string();
9595+ let endpoint = ctx.endpoint.operation_id.clone();
9696+ let headers = ctx.request.headers();
9797+ let origin = headers
9898+ .get(ORIGIN)
9999+ .and_then(|v| v.to_str().ok())
100100+ .unwrap_or("")
101101+ .to_string();
102102+ let ua = headers
103103+ .get(USER_AGENT)
104104+ .and_then(|v| v.to_str().ok())
105105+ .map(|ua| {
106106+ if ua.starts_with("Mozilla/5.0 ") {
107107+ "browser"
108108+ } else {
109109+ ua
110110+ }
111111+ })
112112+ .unwrap_or("")
113113+ .to_string();
114114+ counter!("server_requests_total",
115115+ "endpoint" => endpoint.clone(),
116116+ "origin" => origin,
117117+ "ua" => ua,
118118+ "status_code" => status_code,
119119+ )
120120+ .increment(1);
121121+ histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64);
122122+ result
123123+}
124124+125125+use dropshot::{HttpResponseHeaders, HttpResponseOk};
126126+127127+pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
128128+129129+/// Helper for constructing Ok responses: return OkCors(T).into()
130130+/// (not happy with this yet)
131131+pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
132132+133133+impl<T> From<OkCors<T>> for OkCorsResponse<T>
134134+where
135135+ T: Serialize + JsonSchema + Send + Sync,
136136+{
137137+ fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
138138+ let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
139139+ res.headers_mut()
140140+ .insert("access-control-allow-origin", "*".parse().unwrap());
141141+ Ok(res)
142142+ }
143143+}
144144+145145+// TODO: cors for HttpError
146146+147147+148148+/// Serve index page as html
149149+#[endpoint {
150150+ method = GET,
151151+ path = "/",
152152+ /*
153153+ * not useful to have this in openapi
154154+ */
155155+ unpublished = true,
156156+}]
157157+async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
158158+ instrument_handler(&ctx, async {
159159+ Ok(Response::builder()
160160+ .status(StatusCode::OK)
161161+ .header(http::header::CONTENT_TYPE, "text/html")
162162+ .body(INDEX_HTML.into())?)
163163+ })
164164+ .await
165165+}
166166+167167+/// Serve index page as html
168168+#[endpoint {
169169+ method = GET,
170170+ path = "/favicon.ico",
171171+ /*
172172+ * not useful to have this in openapi
173173+ */
174174+ unpublished = true,
175175+}]
176176+async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
177177+ instrument_handler(&ctx, async {
178178+ Ok(Response::builder()
179179+ .status(StatusCode::OK)
180180+ .header(http::header::CONTENT_TYPE, "image/x-icon")
181181+ .body(FAVICON.to_vec().into())?)
182182+ })
183183+ .await
184184+}
185185+186186+/// Meta: get the openapi spec for this api
187187+#[endpoint {
188188+ method = GET,
189189+ path = "/openapi",
190190+ /*
191191+ * not useful to have this in openapi
192192+ */
193193+ unpublished = true,
194194+}]
195195+async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
196196+ instrument_handler(&ctx, async {
197197+ let spec = (*ctx.context().spec).clone();
198198+ OkCors(spec).into()
199199+ })
200200+ .await
201201+}
202202+203203+/// The real type that gets deserialized
204204+#[derive(Debug, Deserialize, JsonSchema)]
205205+#[serde(rename_all = "camelCase")]
206206+pub struct MultiSubscribeQuery {
207207+ #[serde(default)]
208208+ pub wanted_subjects: HashSet<String>,
209209+ #[serde(default)]
210210+ pub wanted_subject_dids: HashSet<String>,
211211+ #[serde(default)]
212212+ pub wanted_sources: HashSet<String>,
213213+}
214214+/// The fake corresponding type for docs that dropshot won't freak out about a
215215+/// vec for
216216+#[derive(Deserialize, JsonSchema)]
217217+#[allow(dead_code)]
218218+#[serde(rename_all = "camelCase")]
219219+struct MultiSubscribeQueryForDocs {
220220+ /// One or more at-uris to receive links about
221221+ ///
222222+ /// The at-uri must be url-encoded
223223+ ///
224224+ /// Pass this parameter multiple times to specify multiple collections, like
225225+ /// `wantedSubjects=[...]&wantedSubjects=[...]`
226226+ pub wanted_subjects: String,
227227+ /// One or more DIDs to receive links about
228228+ ///
229229+ /// Pass this parameter multiple times to specify multiple collections
230230+ pub wanted_subject_dids: String,
231231+ /// One or more link sources to receive links about
232232+ ///
233233+ /// TODO: docs about link sources
234234+ ///
235235+ /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri`
236236+ ///
237237+ /// Pass this parameter multiple times to specify multiple sources
238238+ pub wanted_sources: String,
239239+}
240240+241241+// The `SharedExtractor` implementation for Query<QueryType> describes how to
242242+// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
243243+// parsing the query string to an instance of `QueryType`.
244244+#[async_trait]
245245+impl SharedExtractor for MultiSubscribeQuery {
246246+ async fn from_request<Context: ServerContext>(
247247+ ctx: &RequestContext<Context>,
248248+ ) -> Result<MultiSubscribeQuery, HttpError> {
249249+ let raw_query = ctx.request.uri().query().unwrap_or("");
250250+ let q = serde_qs::from_str(raw_query).map_err(|e| {
251251+ HttpError::for_bad_request(None, format!("unable to parse query string: {}", e))
252252+ })?;
253253+ Ok(q)
254254+ }
255255+256256+ fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
257257+ // HACK: query type switcheroo: passing MultiSubscribeQuery to
258258+ // `metadata` would "helpfully" panic because dropshot believes we can
259259+ // only have scalar types in a query.
260260+ //
261261+ // so instead we have a fake second type whose only job is to look the
262262+ // same as MultiSubscribeQuery exept that it has `String` instead of
263263+ // `Vec<String>`, which dropshot will accept, and generate ~close-enough
264264+ // docs for.
265265+ <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type)
266266+ }
41267}
4226843269#[derive(Deserialize, JsonSchema)]
···50276 path = "/subscribe",
51277}]
52278async fn subscribe(
5353- ctx: RequestContext<broadcast::Sender<LinkEvent>>,
5454- _qp: Query<QueryParams>,
279279+ ctx: RequestContext<Context>,
280280+ query: MultiSubscribeQuery,
55281 upgraded: WebsocketConnection,
56282) -> dropshot::WebsocketChannelResult {
5757- let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
283283+ let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
58284 upgraded.into_inner(),
59285 Role::Server,
60286 None,
61287 )
62288 .await;
6363- let mut sub = ctx.context().subscribe();
642896565- // TODO: pingpong
6666- // TODO: filtering subscription
290290+ let b = ctx.context().b.subscribe();
672916868- loop {
6969- match sub.recv().await {
7070- Ok(link) => {
7171- let ev = ClientEvent {
7272- r#type: "link".to_string(),
7373- link: link.into(),
7474- };
7575- let json = serde_json::to_string(&ev)?;
7676- if let Err(e) = ws.send(Message::Text(json.into())).await {
7777- eprintln!("client: failed to send event: {e:?}");
7878- ws.close(None).await?; // TODO: do we need this one??
7979- break;
8080- }
8181- }
8282- Err(broadcast::error::RecvError::Closed) => {
8383- ws.close(None).await?; // TODO: send reason
8484- break;
8585- }
8686- Err(broadcast::error::RecvError::Lagged(_n_missed)) => {
8787- eprintln!("client lagged, closing");
8888- ws.close(None).await?; // TODO: send reason
8989- break;
9090- }
9191- }
9292- }
292292+ subscriber::subscribe(b, ws, query)
293293+ .await
294294+ .map_err(|e| format!("boo: {e:?}"))?;
295295+93296 Ok(())
94297}
+75
spacedust/src/subscriber.rs
···11+use crate::ClientEvent;
22+use crate::LinkEvent;
33+use crate::server::MultiSubscribeQuery;
44+use futures::SinkExt;
55+use std::error::Error;
66+use tokio::sync::broadcast;
77+use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
88+use dropshot::WebsocketConnectionRaw;
99+1010+pub async fn subscribe(
1111+ mut sub: broadcast::Receiver<LinkEvent>,
1212+ mut ws: WebSocketStream<WebsocketConnectionRaw>,
1313+ query: MultiSubscribeQuery,
1414+) -> Result<(), Box<dyn Error>> {
1515+ // TODO: pingpong
1616+1717+ loop {
1818+ match sub.recv().await {
1919+ Ok(link) => {
2020+2121+ // subject + subject DIDs are logical OR
2222+ let target_did = if link.target.starts_with("did:") {
2323+ link.target.clone()
2424+ } else {
2525+ let Some(rest) = link.target.strip_prefix("at://") else {
2626+ continue;
2727+ };
2828+ if let Some((did, _)) = rest.split_once("/") {
2929+ did
3030+ } else {
3131+ rest
3232+ }.to_string()
3333+ };
3434+ if !(query.wanted_subjects.contains(&link.target) || query.wanted_subject_dids.contains(&target_did) || query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty()) {
3535+ // wowwww ^^ fix that
3636+ continue;
3737+ }
3838+3939+ // subjects together with sources are logical AND
4040+4141+ if !query.wanted_sources.is_empty() {
4242+ let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
4343+ eprintln!("link path did not have expected '.' prefix: {}", link.path);
4444+ ""
4545+ });
4646+ let source = format!("{}:{undotted}", link.collection);
4747+ if !query.wanted_sources.contains(&source) {
4848+ continue;
4949+ }
5050+ }
5151+5252+ let ev = ClientEvent {
5353+ kind: "link".to_string(),
5454+ link: link.into(),
5555+ };
5656+ let json = serde_json::to_string(&ev)?;
5757+ if let Err(e) = ws.send(Message::Text(json.into())).await {
5858+ eprintln!("client: failed to send event: {e:?}");
5959+ ws.close(None).await?; // TODO: do we need this one??
6060+ break;
6161+ }
6262+ }
6363+ Err(broadcast::error::RecvError::Closed) => {
6464+ ws.close(None).await?; // TODO: send reason
6565+ break;
6666+ }
6767+ Err(broadcast::error::RecvError::Lagged(_n_missed)) => {
6868+ eprintln!("client lagged, closing");
6969+ ws.close(None).await?; // TODO: send reason
7070+ break;
7171+ }
7272+ }
7373+ }
7474+ Ok(())
7575+}