Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use askama::Template;
2use axum::{
3 extract::{Query, Request},
4 http::{self, header},
5 middleware::{self, Next},
6 response::{IntoResponse, Response},
7 routing::get,
8 Router,
9};
10use axum_metrics::{ExtraMetricLabels, MetricLayer};
11use bincode::Options;
12use serde::{Deserialize, Serialize};
13use serde_with::serde_as;
14use std::collections::{HashMap, HashSet};
15use std::time::{Duration, UNIX_EPOCH};
16use tokio::net::{TcpListener, ToSocketAddrs};
17use tokio::task::spawn_blocking;
18use tokio_util::sync::CancellationToken;
19
20use crate::storage::{LinkReader, Order, StorageStats};
21use crate::{CountsByCount, Did, RecordId};
22
23mod acceptable;
24mod filters;
25
26use acceptable::{acceptable, ExtractAccept};
27
28const DEFAULT_CURSOR_LIMIT: u64 = 100;
29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000;
30
31fn get_default_cursor_limit() -> u64 {
32 DEFAULT_CURSOR_LIMIT
33}
34
35fn to500(e: tokio::task::JoinError) -> http::StatusCode {
36 eprintln!("handler error: {e}");
37 http::StatusCode::INTERNAL_SERVER_ERROR
38}
39
40pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
41where
42 S: LinkReader,
43 A: ToSocketAddrs,
44{
45 let app = Router::new()
46 .route("/robots.txt", get(robots))
47 .route(
48 "/",
49 get({
50 let store = store.clone();
51 move |accept| async {
52 spawn_blocking(|| hello(accept, store))
53 .await
54 .map_err(to500)?
55 }
56 }),
57 )
58 .route(
59 "/xrpc/blue.microcosm.links.getManyToManyCounts",
60 get({
61 let store = store.clone();
62 move |accept, query| async {
63 spawn_blocking(|| get_many_to_many_counts(accept, query, store))
64 .await
65 .map_err(to500)?
66 }
67 }),
68 )
69 .route(
70 "/links/count",
71 get({
72 let store = store.clone();
73 move |accept, query| async {
74 spawn_blocking(|| count_links(accept, query, store))
75 .await
76 .map_err(to500)?
77 }
78 }),
79 )
80 .route(
81 "/links/count/distinct-dids",
82 get({
83 let store = store.clone();
84 move |accept, query| async {
85 spawn_blocking(|| count_distinct_dids(accept, query, store))
86 .await
87 .map_err(to500)?
88 }
89 }),
90 )
91 .route(
92 "/xrpc/blue.microcosm.links.getBacklinks",
93 get({
94 let store = store.clone();
95 move |accept, query| async {
96 spawn_blocking(|| get_backlinks(accept, query, store))
97 .await
98 .map_err(to500)?
99 }
100 }),
101 )
102 .route(
103 "/links",
104 get({
105 let store = store.clone();
106 move |accept, query| async {
107 spawn_blocking(|| get_links(accept, query, store))
108 .await
109 .map_err(to500)?
110 }
111 }),
112 )
113 .route(
114 "/links/distinct-dids",
115 get({
116 let store = store.clone();
117 move |accept, query| async {
118 spawn_blocking(|| get_distinct_dids(accept, query, store))
119 .await
120 .map_err(to500)?
121 }
122 }),
123 )
124 .route(
125 // deprecated
126 "/links/all/count",
127 get({
128 let store = store.clone();
129 move |accept, query| async {
130 spawn_blocking(|| count_all_links(accept, query, store))
131 .await
132 .map_err(to500)?
133 }
134 }),
135 )
136 .route(
137 "/links/all",
138 get({
139 let store = store.clone();
140 move |accept, query| async {
141 spawn_blocking(|| explore_links(accept, query, store))
142 .await
143 .map_err(to500)?
144 }
145 }),
146 )
147 .layer(tower_http::cors::CorsLayer::permissive())
148 .layer(middleware::from_fn(add_lables))
149 .layer(MetricLayer::default());
150
151 let listener = TcpListener::bind(addr).await?;
152 println!("api: listening at http://{:?}", listener.local_addr()?);
153 axum::serve(listener, app)
154 .with_graceful_shutdown(async move { stay_alive.cancelled().await })
155 .await?;
156
157 Ok(())
158}
159
160async fn add_lables(request: Request, next: Next) -> Response {
161 let origin = request
162 .headers()
163 .get(header::ORIGIN)
164 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok());
165 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| {
166 ua.to_str()
167 .map(|v| {
168 if v.starts_with("Mozilla/") {
169 "Mozilla/...".into()
170 } else {
171 v.to_owned()
172 }
173 })
174 .ok()
175 });
176
177 let mut res = next.run(request).await;
178
179 let mut labels = Vec::new();
180 if let Some(o) = origin {
181 labels.push(metrics::Label::new("origin", o));
182 }
183 if let Some(ua) = user_agent {
184 labels.push(metrics::Label::new("user_agent", ua));
185 }
186 res.extensions_mut().insert(ExtraMetricLabels(labels));
187 res
188}
189
190async fn robots() -> &'static str {
191 "\
192User-agent: *
193Disallow: /links
194Disallow: /links/
195 "
196}
197
198#[derive(Template, Serialize, Deserialize)]
199#[template(path = "hello.html.j2")]
200struct HelloReponse {
201 help: &'static str,
202 days_indexed: Option<u64>,
203 stats: StorageStats,
204}
205fn hello(
206 accept: ExtractAccept,
207 store: impl LinkReader,
208) -> Result<impl IntoResponse, http::StatusCode> {
209 let stats = store
210 .get_stats()
211 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
212 let days_indexed = stats
213 .started_at
214 .map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
215 .transpose()
216 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
217 .map(|d| d.as_secs() / 86_400);
218 Ok(acceptable(accept, HelloReponse {
219 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
220 days_indexed,
221 stats,
222 }))
223}
224
225#[derive(Clone, Deserialize)]
226#[serde(rename_all = "camelCase")]
227struct GetManyToManyCountsQuery {
228 subject: String,
229 source: String,
230 /// path to the secondary link in the linking record
231 path_to_other: String,
232 /// filter to linking records (join of the m2m) by these DIDs
233 #[serde(default)]
234 did: Vec<String>,
235 /// filter to specific secondary records
236 #[serde(default)]
237 other_subject: Vec<String>,
238 cursor: Option<OpaqueApiCursor>,
239 /// Set the max number of links to return per page of results
240 #[serde(default = "get_default_cursor_limit")]
241 limit: u64,
242}
243#[derive(Serialize)]
244struct OtherSubjectCount {
245 subject: String,
246 total: u64,
247 distinct: u64,
248}
249#[derive(Template, Serialize)]
250#[template(path = "get-many-to-many-counts.html.j2")]
251struct GetManyToManyCountsResponse {
252 counts_by_other_subject: Vec<OtherSubjectCount>,
253 cursor: Option<OpaqueApiCursor>,
254 #[serde(skip_serializing)]
255 query: GetManyToManyCountsQuery,
256}
257fn get_many_to_many_counts(
258 accept: ExtractAccept,
259 query: axum_extra::extract::Query<GetManyToManyCountsQuery>,
260 store: impl LinkReader,
261) -> Result<impl IntoResponse, http::StatusCode> {
262 let cursor_key = query
263 .cursor
264 .clone()
265 .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
266 .transpose()?
267 .map(|c| c.next);
268
269 let limit = query.limit;
270 if limit > DEFAULT_CURSOR_LIMIT_MAX {
271 return Err(http::StatusCode::BAD_REQUEST);
272 }
273
274 let filter_dids: HashSet<Did> = HashSet::from_iter(
275 query
276 .did
277 .iter()
278 .map(|d| d.trim())
279 .filter(|d| !d.is_empty())
280 .map(|d| Did(d.to_string())),
281 );
282
283 let filter_other_subjects: HashSet<String> = HashSet::from_iter(
284 query
285 .other_subject
286 .iter()
287 .map(|s| s.trim().to_string())
288 .filter(|s| !s.is_empty()),
289 );
290
291 let Some((collection, path)) = query.source.split_once(':') else {
292 return Err(http::StatusCode::BAD_REQUEST);
293 };
294 let path = format!(".{path}");
295
296 let path_to_other = format!(".{}", query.path_to_other);
297
298 let paged = store
299 .get_many_to_many_counts(
300 &query.subject,
301 collection,
302 &path,
303 &path_to_other,
304 limit,
305 cursor_key,
306 &filter_dids,
307 &filter_other_subjects,
308 )
309 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
310
311 let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into());
312
313 let items = paged
314 .items
315 .into_iter()
316 .map(|(subject, total, distinct)| OtherSubjectCount {
317 subject,
318 total,
319 distinct,
320 })
321 .collect();
322
323 Ok(acceptable(
324 accept,
325 GetManyToManyCountsResponse {
326 counts_by_other_subject: items,
327 cursor,
328 query: (*query).clone(),
329 },
330 ))
331}
332
333#[derive(Clone, Deserialize)]
334struct GetLinksCountQuery {
335 target: String,
336 collection: String,
337 path: String,
338}
339#[derive(Template, Serialize)]
340#[template(path = "links-count.html.j2")]
341struct GetLinksCountResponse {
342 total: u64,
343 #[serde(skip_serializing)]
344 query: GetLinksCountQuery,
345}
346fn count_links(
347 accept: ExtractAccept,
348 query: Query<GetLinksCountQuery>,
349 store: impl LinkReader,
350) -> Result<impl IntoResponse, http::StatusCode> {
351 let total = store
352 .get_count(&query.target, &query.collection, &query.path)
353 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
354 Ok(acceptable(
355 accept,
356 GetLinksCountResponse {
357 total,
358 query: (*query).clone(),
359 },
360 ))
361}
362
363#[derive(Clone, Deserialize)]
364struct GetDidsCountQuery {
365 target: String,
366 collection: String,
367 path: String,
368}
369#[derive(Template, Serialize)]
370#[template(path = "dids-count.html.j2")]
371struct GetDidsCountResponse {
372 total: u64,
373 #[serde(skip_serializing)]
374 query: GetDidsCountQuery,
375}
376fn count_distinct_dids(
377 accept: ExtractAccept,
378 query: Query<GetDidsCountQuery>,
379 store: impl LinkReader,
380) -> Result<impl IntoResponse, http::StatusCode> {
381 let total = store
382 .get_distinct_did_count(&query.target, &query.collection, &query.path)
383 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
384 Ok(acceptable(
385 accept,
386 GetDidsCountResponse {
387 total,
388 query: (*query).clone(),
389 },
390 ))
391}
392
393#[derive(Clone, Deserialize)]
394struct GetBacklinksQuery {
395 /// The link target
396 ///
397 /// can be an AT-URI, plain DID, or regular URI
398 subject: String,
399 /// Filter links only from this link source
400 ///
401 /// eg.: `app.bsky.feed.like:subject.uri`
402 source: String,
403 cursor: Option<OpaqueApiCursor>,
404 /// Filter links only from these DIDs
405 ///
406 /// include multiple times to filter by multiple source DIDs
407 #[serde(default)]
408 did: Vec<String>,
409 /// Set the max number of links to return per page of results
410 #[serde(default = "get_default_cursor_limit")]
411 limit: u64,
412 /// Allow returning links in reverse order (default: false)
413 #[serde(default)]
414 reverse: bool,
415}
416#[derive(Template, Serialize)]
417#[template(path = "get-backlinks.html.j2")]
418struct GetBacklinksResponse {
419 total: u64,
420 records: Vec<RecordId>,
421 cursor: Option<OpaqueApiCursor>,
422 #[serde(skip_serializing)]
423 query: GetBacklinksQuery,
424 #[serde(skip_serializing)]
425 collection: String,
426 #[serde(skip_serializing)]
427 path: String,
428}
429fn get_backlinks(
430 accept: ExtractAccept,
431 query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences
432 store: impl LinkReader,
433) -> Result<impl IntoResponse, http::StatusCode> {
434 let until = query
435 .cursor
436 .clone()
437 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
438 .transpose()?
439 .map(|c| c.next);
440
441 let limit = query.limit;
442 if limit > DEFAULT_CURSOR_LIMIT_MAX {
443 return Err(http::StatusCode::BAD_REQUEST);
444 }
445
446 let filter_dids: HashSet<Did> = HashSet::from_iter(
447 query
448 .did
449 .iter()
450 .map(|d| d.trim())
451 .filter(|d| !d.is_empty())
452 .map(|d| Did(d.to_string())),
453 );
454
455 let Some((collection, path)) = query.source.split_once(':') else {
456 return Err(http::StatusCode::BAD_REQUEST);
457 };
458 let path = format!(".{path}");
459
460 let order = if query.reverse {
461 Order::OldestToNewest
462 } else {
463 Order::NewestToOldest
464 };
465
466 let paged = store
467 .get_links(
468 &query.subject,
469 collection,
470 &path,
471 order,
472 limit,
473 until,
474 &filter_dids,
475 )
476 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
477
478 let cursor = paged.next.map(|next| {
479 ApiCursor {
480 version: paged.version,
481 next,
482 }
483 .into()
484 });
485
486 Ok(acceptable(
487 accept,
488 GetBacklinksResponse {
489 total: paged.total,
490 records: paged.items,
491 cursor,
492 query: (*query).clone(),
493 collection: collection.to_string(),
494 path,
495 },
496 ))
497}
498
499#[derive(Clone, Deserialize)]
500struct GetLinkItemsQuery {
501 target: String,
502 collection: String,
503 path: String,
504 cursor: Option<OpaqueApiCursor>,
505 /// Filter links only from these DIDs
506 ///
507 /// include multiple times to filter by multiple source DIDs
508 #[serde(default)]
509 did: Vec<String>,
510 /// [deprecated] Filter links only from these DIDs
511 ///
512 /// format: comma-separated sequence of DIDs
513 ///
514 /// errors: if `did` parameter is also present
515 ///
516 /// deprecated: use `did`, which can be repeated multiple times
517 from_dids: Option<String>, // comma separated: gross
518 #[serde(default = "get_default_cursor_limit")]
519 limit: u64,
520}
521#[derive(Template, Serialize)]
522#[template(path = "links.html.j2")]
523struct GetLinkItemsResponse {
524 // what does staleness mean?
525 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
526 // - links have been deleted. hmm.
527 total: u64,
528 linking_records: Vec<RecordId>,
529 cursor: Option<OpaqueApiCursor>,
530 #[serde(skip_serializing)]
531 query: GetLinkItemsQuery,
532}
533fn get_links(
534 accept: ExtractAccept,
535 query: axum_extra::extract::Query<GetLinkItemsQuery>, // supports multiple param occurrences
536 store: impl LinkReader,
537) -> Result<impl IntoResponse, http::StatusCode> {
538 let until = query
539 .cursor
540 .clone()
541 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
542 .transpose()?
543 .map(|c| c.next);
544
545 let limit = query.limit;
546 if limit > DEFAULT_CURSOR_LIMIT_MAX {
547 return Err(http::StatusCode::BAD_REQUEST);
548 }
549
550 let mut filter_dids: HashSet<Did> = HashSet::from_iter(
551 query
552 .did
553 .iter()
554 .map(|d| d.trim())
555 .filter(|d| !d.is_empty())
556 .map(|d| Did(d.to_string())),
557 );
558
559 if let Some(comma_joined) = &query.from_dids {
560 if !filter_dids.is_empty() {
561 return Err(http::StatusCode::BAD_REQUEST);
562 }
563 for did in comma_joined.split(',') {
564 filter_dids.insert(Did(did.to_string()));
565 }
566 }
567
568 let paged = store
569 .get_links(
570 &query.target,
571 &query.collection,
572 &query.path,
573 Order::NewestToOldest,
574 limit,
575 until,
576 &filter_dids,
577 )
578 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
579
580 let cursor = paged.next.map(|next| {
581 ApiCursor {
582 version: paged.version,
583 next,
584 }
585 .into()
586 });
587
588 Ok(acceptable(
589 accept,
590 GetLinkItemsResponse {
591 total: paged.total,
592 linking_records: paged.items,
593 cursor,
594 query: (*query).clone(),
595 },
596 ))
597}
598
599#[derive(Clone, Deserialize)]
600struct GetDidItemsQuery {
601 target: String,
602 collection: String,
603 path: String,
604 cursor: Option<OpaqueApiCursor>,
605 limit: Option<u64>,
606 // TODO: allow reverse (er, forward) order as well
607}
608#[derive(Template, Serialize)]
609#[template(path = "dids.html.j2")]
610struct GetDidItemsResponse {
611 // what does staleness mean?
612 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
613 // - links have been deleted. hmm.
614 total: u64,
615 linking_dids: Vec<Did>,
616 cursor: Option<OpaqueApiCursor>,
617 #[serde(skip_serializing)]
618 query: GetDidItemsQuery,
619}
620fn get_distinct_dids(
621 accept: ExtractAccept,
622 query: Query<GetDidItemsQuery>,
623 store: impl LinkReader,
624) -> Result<impl IntoResponse, http::StatusCode> {
625 let until = query
626 .cursor
627 .clone()
628 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
629 .transpose()?
630 .map(|c| c.next);
631
632 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT);
633 if limit > DEFAULT_CURSOR_LIMIT_MAX {
634 return Err(http::StatusCode::BAD_REQUEST);
635 }
636
637 let paged = store
638 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until)
639 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
640
641 let cursor = paged.next.map(|next| {
642 ApiCursor {
643 version: paged.version,
644 next,
645 }
646 .into()
647 });
648
649 Ok(acceptable(
650 accept,
651 GetDidItemsResponse {
652 total: paged.total,
653 linking_dids: paged.items,
654 cursor,
655 query: (*query).clone(),
656 },
657 ))
658}
659
660#[derive(Clone, Deserialize)]
661struct GetAllLinksQuery {
662 target: String,
663}
664#[derive(Template, Serialize)]
665#[template(path = "links-all-count.html.j2")]
666struct GetAllLinksResponse {
667 links: HashMap<String, HashMap<String, u64>>,
668 #[serde(skip_serializing)]
669 query: GetAllLinksQuery,
670}
671fn count_all_links(
672 accept: ExtractAccept,
673 query: Query<GetAllLinksQuery>,
674 store: impl LinkReader,
675) -> Result<impl IntoResponse, http::StatusCode> {
676 let links = store
677 .get_all_record_counts(&query.target)
678 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
679 Ok(acceptable(
680 accept,
681 GetAllLinksResponse {
682 links,
683 query: (*query).clone(),
684 },
685 ))
686}
687
688#[derive(Clone, Deserialize)]
689struct ExploreLinksQuery {
690 target: String,
691}
692#[derive(Template, Serialize)]
693#[template(path = "explore-links.html.j2")]
694struct ExploreLinksResponse {
695 links: HashMap<String, HashMap<String, CountsByCount>>,
696 #[serde(skip_serializing)]
697 query: ExploreLinksQuery,
698}
699fn explore_links(
700 accept: ExtractAccept,
701 query: Query<ExploreLinksQuery>,
702 store: impl LinkReader,
703) -> Result<impl IntoResponse, http::StatusCode> {
704 let links = store
705 .get_all_counts(&query.target)
706 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
707 Ok(acceptable(
708 accept,
709 ExploreLinksResponse {
710 links,
711 query: (*query).clone(),
712 },
713 ))
714}
715
716#[serde_as]
717#[derive(Clone, Serialize, Deserialize)] // for json
718struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>);
719
720#[derive(Serialize, Deserialize)] // for bincode
721struct ApiCursor {
722 version: (u64, u64), // (collection length, deleted item count)
723 next: u64,
724}
725
726impl TryFrom<OpaqueApiCursor> for ApiCursor {
727 type Error = bincode::Error;
728
729 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
730 bincode::DefaultOptions::new().deserialize(&item.0)
731 }
732}
733
734impl From<ApiCursor> for OpaqueApiCursor {
735 fn from(item: ApiCursor) -> Self {
736 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
737 }
738}
739
740#[derive(Serialize, Deserialize)] // for bincode
741struct ApiKeyedCursor {
742 next: String, // the key
743}
744
745impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor {
746 type Error = bincode::Error;
747
748 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
749 bincode::DefaultOptions::new().deserialize(&item.0)
750 }
751}
752
753impl From<ApiKeyedCursor> for OpaqueApiCursor {
754 fn from(item: ApiKeyedCursor) -> Self {
755 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
756 }
757}