forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use askama::Template;
2use axum::{
3 extract::{Query, Request},
4 http::{self, header},
5 middleware::{self, Next},
6 response::{IntoResponse, Response},
7 routing::get,
8 Router,
9};
10use axum_metrics::{ExtraMetricLabels, MetricLayer};
11use bincode::Options;
12use serde::{Deserialize, Serialize};
13use serde_with::serde_as;
14use std::collections::{HashMap, HashSet};
15use std::time::{Duration, UNIX_EPOCH};
16use tokio::net::{TcpListener, ToSocketAddrs};
17use tokio::task::spawn_blocking;
18use tokio_util::sync::CancellationToken;
19
20use crate::storage::{LinkReader, Order, StorageStats};
21use crate::{CountsByCount, Did, RecordId};
22
23mod acceptable;
24mod filters;
25
26use acceptable::{acceptable, ExtractAccept};
27
28const DEFAULT_CURSOR_LIMIT: u64 = 100;
29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000;
30
31fn get_default_cursor_limit() -> u64 {
32 DEFAULT_CURSOR_LIMIT
33}
34
35fn to500(e: tokio::task::JoinError) -> http::StatusCode {
36 eprintln!("handler error: {e}");
37 http::StatusCode::INTERNAL_SERVER_ERROR
38}
39
40pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
41where
42 S: LinkReader,
43 A: ToSocketAddrs,
44{
45 let app = Router::new()
46 .route("/robots.txt", get(robots))
47 .route(
48 "/",
49 get({
50 let store = store.clone();
51 move |accept| async {
52 spawn_blocking(|| hello(accept, store))
53 .await
54 .map_err(to500)?
55 }
56 }),
57 )
58 .route(
59 "/xrpc/blue.microcosm.links.getManyToManyCounts",
60 get({
61 let store = store.clone();
62 move |accept, query| async {
63 spawn_blocking(|| get_many_to_many_counts(accept, query, store))
64 .await
65 .map_err(to500)?
66 }
67 }),
68 )
69 // deprecated
70 .route(
71 "/links/count",
72 get({
73 let store = store.clone();
74 move |accept, query| async {
75 spawn_blocking(|| count_links(accept, query, store))
76 .await
77 .map_err(to500)?
78 }
79 }),
80 )
81 .route(
82 "/xrpc/blue.microcosm.links.getBacklinksCount",
83 get({
84 let store = store.clone();
85 move |accept, query| async {
86 spawn_blocking(|| get_backlink_counts(accept, query, store))
87 .await
88 .map_err(to500)?
89 }
90 }),
91 )
92 .route(
93 "/links/count/distinct-dids",
94 get({
95 let store = store.clone();
96 move |accept, query| async {
97 spawn_blocking(|| count_distinct_dids(accept, query, store))
98 .await
99 .map_err(to500)?
100 }
101 }),
102 )
103 .route(
104 "/xrpc/blue.microcosm.links.getBacklinks",
105 get({
106 let store = store.clone();
107 move |accept, query| async {
108 spawn_blocking(|| get_backlinks(accept, query, store))
109 .await
110 .map_err(to500)?
111 }
112 }),
113 )
114 .route(
115 "/links",
116 get({
117 let store = store.clone();
118 move |accept, query| async {
119 spawn_blocking(|| get_links(accept, query, store))
120 .await
121 .map_err(to500)?
122 }
123 }),
124 )
125 .route(
126 "/links/distinct-dids",
127 get({
128 let store = store.clone();
129 move |accept, query| async {
130 spawn_blocking(|| get_distinct_dids(accept, query, store))
131 .await
132 .map_err(to500)?
133 }
134 }),
135 )
136 .route(
137 // deprecated
138 "/links/all/count",
139 get({
140 let store = store.clone();
141 move |accept, query| async {
142 spawn_blocking(|| count_all_links(accept, query, store))
143 .await
144 .map_err(to500)?
145 }
146 }),
147 )
148 .route(
149 "/links/all",
150 get({
151 let store = store.clone();
152 move |accept, query| async {
153 spawn_blocking(|| explore_links(accept, query, store))
154 .await
155 .map_err(to500)?
156 }
157 }),
158 )
159 .layer(tower_http::cors::CorsLayer::permissive())
160 .layer(middleware::from_fn(add_lables))
161 .layer(MetricLayer::default());
162
163 let listener = TcpListener::bind(addr).await?;
164 println!("api: listening at http://{:?}", listener.local_addr()?);
165 axum::serve(listener, app)
166 .with_graceful_shutdown(async move { stay_alive.cancelled().await })
167 .await?;
168
169 Ok(())
170}
171
172async fn add_lables(request: Request, next: Next) -> Response {
173 let origin = request
174 .headers()
175 .get(header::ORIGIN)
176 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok());
177 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| {
178 ua.to_str()
179 .map(|v| {
180 if v.starts_with("Mozilla/") {
181 "Mozilla/...".into()
182 } else {
183 v.to_owned()
184 }
185 })
186 .ok()
187 });
188
189 let mut res = next.run(request).await;
190
191 let mut labels = Vec::new();
192 if let Some(o) = origin {
193 labels.push(metrics::Label::new("origin", o));
194 }
195 if let Some(ua) = user_agent {
196 labels.push(metrics::Label::new("user_agent", ua));
197 }
198 res.extensions_mut().insert(ExtraMetricLabels(labels));
199 res
200}
201
202async fn robots() -> &'static str {
203 "\
204User-agent: *
205Disallow: /links
206Disallow: /links/
207 "
208}
209
210#[derive(Template, Serialize, Deserialize)]
211#[template(path = "hello.html.j2")]
212struct HelloReponse {
213 help: &'static str,
214 days_indexed: Option<u64>,
215 stats: StorageStats,
216}
217fn hello(
218 accept: ExtractAccept,
219 store: impl LinkReader,
220) -> Result<impl IntoResponse, http::StatusCode> {
221 let stats = store
222 .get_stats()
223 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
224 let days_indexed = stats
225 .started_at
226 .map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
227 .transpose()
228 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
229 .map(|d| d.as_secs() / 86_400);
230 Ok(acceptable(accept, HelloReponse {
231 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
232 days_indexed,
233 stats,
234 }))
235}
236
237#[derive(Clone, Deserialize)]
238#[serde(rename_all = "camelCase")]
239struct GetManyToManyCountsQuery {
240 subject: String,
241 source: String,
242 /// path to the secondary link in the linking record
243 path_to_other: String,
244 /// filter to linking records (join of the m2m) by these DIDs
245 #[serde(default)]
246 did: Vec<String>,
247 /// filter to specific secondary records
248 #[serde(default)]
249 other_subject: Vec<String>,
250 cursor: Option<OpaqueApiCursor>,
251 /// Set the max number of links to return per page of results
252 #[serde(default = "get_default_cursor_limit")]
253 limit: u64,
254}
255#[derive(Serialize)]
256struct OtherSubjectCount {
257 subject: String,
258 total: u64,
259 distinct: u64,
260}
261#[derive(Template, Serialize)]
262#[template(path = "get-many-to-many-counts.html.j2")]
263struct GetManyToManyCountsResponse {
264 counts_by_other_subject: Vec<OtherSubjectCount>,
265 cursor: Option<OpaqueApiCursor>,
266 #[serde(skip_serializing)]
267 query: GetManyToManyCountsQuery,
268}
269fn get_many_to_many_counts(
270 accept: ExtractAccept,
271 query: axum_extra::extract::Query<GetManyToManyCountsQuery>,
272 store: impl LinkReader,
273) -> Result<impl IntoResponse, http::StatusCode> {
274 let cursor_key = query
275 .cursor
276 .clone()
277 .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
278 .transpose()?
279 .map(|c| c.next);
280
281 let limit = query.limit;
282 if limit > DEFAULT_CURSOR_LIMIT_MAX {
283 return Err(http::StatusCode::BAD_REQUEST);
284 }
285
286 let filter_dids: HashSet<Did> = HashSet::from_iter(
287 query
288 .did
289 .iter()
290 .map(|d| d.trim())
291 .filter(|d| !d.is_empty())
292 .map(|d| Did(d.to_string())),
293 );
294
295 let filter_other_subjects: HashSet<String> = HashSet::from_iter(
296 query
297 .other_subject
298 .iter()
299 .map(|s| s.trim().to_string())
300 .filter(|s| !s.is_empty()),
301 );
302
303 let Some((collection, path)) = query.source.split_once(':') else {
304 return Err(http::StatusCode::BAD_REQUEST);
305 };
306 let path = format!(".{path}");
307
308 let path_to_other = format!(".{}", query.path_to_other);
309
310 let paged = store
311 .get_many_to_many_counts(
312 &query.subject,
313 collection,
314 &path,
315 &path_to_other,
316 limit,
317 cursor_key,
318 &filter_dids,
319 &filter_other_subjects,
320 )
321 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
322
323 let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into());
324
325 let items = paged
326 .items
327 .into_iter()
328 .map(|(subject, total, distinct)| OtherSubjectCount {
329 subject,
330 total,
331 distinct,
332 })
333 .collect();
334
335 Ok(acceptable(
336 accept,
337 GetManyToManyCountsResponse {
338 counts_by_other_subject: items,
339 cursor,
340 query: (*query).clone(),
341 },
342 ))
343}
344
345#[derive(Clone, Deserialize)]
346struct GetLinksCountQuery {
347 target: String,
348 collection: String,
349 path: String,
350}
351#[derive(Template, Serialize)]
352#[template(path = "links-count.html.j2")]
353struct GetLinksCountResponse {
354 total: u64,
355 #[serde(skip_serializing)]
356 query: GetLinksCountQuery,
357}
358fn count_links(
359 accept: ExtractAccept,
360 query: Query<GetLinksCountQuery>,
361 store: impl LinkReader,
362) -> Result<impl IntoResponse, http::StatusCode> {
363 let total = store
364 .get_count(&query.target, &query.collection, &query.path)
365 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
366 Ok(acceptable(
367 accept,
368 GetLinksCountResponse {
369 total,
370 query: (*query).clone(),
371 },
372 ))
373}
374
375#[derive(Clone, Deserialize)]
376struct GetItemsCountQuery {
377 subject: String,
378 source: String,
379}
380#[derive(Template, Serialize)]
381#[template(path = "get-backlinks-count.html.j2")]
382struct GetItemsCountResponse {
383 total: u64,
384 #[serde(skip_serializing)]
385 query: GetItemsCountQuery,
386}
387fn get_backlink_counts(
388 accept: ExtractAccept,
389 query: axum_extra::extract::Query<GetItemsCountQuery>,
390 store: impl LinkReader,
391) -> Result<impl IntoResponse, http::StatusCode> {
392 let Some((collection, path)) = query.source.split_once(':') else {
393 return Err(http::StatusCode::BAD_REQUEST);
394 };
395 let path = format!(".{path}");
396 let total = store
397 .get_count(&query.subject, collection, &path)
398 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
399
400 Ok(acceptable(
401 accept,
402 GetItemsCountResponse {
403 total,
404 query: (*query).clone(),
405 },
406 ))
407}
408
409#[derive(Clone, Deserialize)]
410struct GetDidsCountQuery {
411 target: String,
412 collection: String,
413 path: String,
414}
415#[derive(Template, Serialize)]
416#[template(path = "dids-count.html.j2")]
417struct GetDidsCountResponse {
418 total: u64,
419 #[serde(skip_serializing)]
420 query: GetDidsCountQuery,
421}
422fn count_distinct_dids(
423 accept: ExtractAccept,
424 query: Query<GetDidsCountQuery>,
425 store: impl LinkReader,
426) -> Result<impl IntoResponse, http::StatusCode> {
427 let total = store
428 .get_distinct_did_count(&query.target, &query.collection, &query.path)
429 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
430 Ok(acceptable(
431 accept,
432 GetDidsCountResponse {
433 total,
434 query: (*query).clone(),
435 },
436 ))
437}
438
439#[derive(Clone, Deserialize)]
440struct GetBacklinksQuery {
441 /// The link target
442 ///
443 /// can be an AT-URI, plain DID, or regular URI
444 subject: String,
445 /// Filter links only from this link source
446 ///
447 /// eg.: `app.bsky.feed.like:subject.uri`
448 source: String,
449 cursor: Option<OpaqueApiCursor>,
450 /// Filter links only from these DIDs
451 ///
452 /// include multiple times to filter by multiple source DIDs
453 #[serde(default)]
454 did: Vec<String>,
455 /// Set the max number of links to return per page of results
456 #[serde(default = "get_default_cursor_limit")]
457 limit: u64,
458 /// Allow returning links in reverse order (default: false)
459 #[serde(default)]
460 reverse: bool,
461}
462#[derive(Template, Serialize)]
463#[template(path = "get-backlinks.html.j2")]
464struct GetBacklinksResponse {
465 total: u64,
466 records: Vec<RecordId>,
467 cursor: Option<OpaqueApiCursor>,
468 #[serde(skip_serializing)]
469 query: GetBacklinksQuery,
470 #[serde(skip_serializing)]
471 collection: String,
472 #[serde(skip_serializing)]
473 path: String,
474}
475fn get_backlinks(
476 accept: ExtractAccept,
477 query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences
478 store: impl LinkReader,
479) -> Result<impl IntoResponse, http::StatusCode> {
480 let until = query
481 .cursor
482 .clone()
483 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
484 .transpose()?
485 .map(|c| c.next);
486
487 let limit = query.limit;
488 if limit > DEFAULT_CURSOR_LIMIT_MAX {
489 return Err(http::StatusCode::BAD_REQUEST);
490 }
491
492 let filter_dids: HashSet<Did> = HashSet::from_iter(
493 query
494 .did
495 .iter()
496 .map(|d| d.trim())
497 .filter(|d| !d.is_empty())
498 .map(|d| Did(d.to_string())),
499 );
500
501 let Some((collection, path)) = query.source.split_once(':') else {
502 return Err(http::StatusCode::BAD_REQUEST);
503 };
504 let path = format!(".{path}");
505
506 let order = if query.reverse {
507 Order::OldestToNewest
508 } else {
509 Order::NewestToOldest
510 };
511
512 let paged = store
513 .get_links(
514 &query.subject,
515 collection,
516 &path,
517 order,
518 limit,
519 until,
520 &filter_dids,
521 )
522 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
523
524 let cursor = paged.next.map(|next| {
525 ApiCursor {
526 version: paged.version,
527 next,
528 }
529 .into()
530 });
531
532 Ok(acceptable(
533 accept,
534 GetBacklinksResponse {
535 total: paged.total,
536 records: paged.items,
537 cursor,
538 query: (*query).clone(),
539 collection: collection.to_string(),
540 path,
541 },
542 ))
543}
544
545#[derive(Clone, Deserialize)]
546struct GetLinkItemsQuery {
547 target: String,
548 collection: String,
549 path: String,
550 cursor: Option<OpaqueApiCursor>,
551 /// Filter links only from these DIDs
552 ///
553 /// include multiple times to filter by multiple source DIDs
554 #[serde(default)]
555 did: Vec<String>,
556 /// [deprecated] Filter links only from these DIDs
557 ///
558 /// format: comma-separated sequence of DIDs
559 ///
560 /// errors: if `did` parameter is also present
561 ///
562 /// deprecated: use `did`, which can be repeated multiple times
563 from_dids: Option<String>, // comma separated: gross
564 #[serde(default = "get_default_cursor_limit")]
565 limit: u64,
566}
567#[derive(Template, Serialize)]
568#[template(path = "links.html.j2")]
569struct GetLinkItemsResponse {
570 // what does staleness mean?
571 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
572 // - links have been deleted. hmm.
573 total: u64,
574 linking_records: Vec<RecordId>,
575 cursor: Option<OpaqueApiCursor>,
576 #[serde(skip_serializing)]
577 query: GetLinkItemsQuery,
578}
579fn get_links(
580 accept: ExtractAccept,
581 query: axum_extra::extract::Query<GetLinkItemsQuery>, // supports multiple param occurrences
582 store: impl LinkReader,
583) -> Result<impl IntoResponse, http::StatusCode> {
584 let until = query
585 .cursor
586 .clone()
587 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
588 .transpose()?
589 .map(|c| c.next);
590
591 let limit = query.limit;
592 if limit > DEFAULT_CURSOR_LIMIT_MAX {
593 return Err(http::StatusCode::BAD_REQUEST);
594 }
595
596 let mut filter_dids: HashSet<Did> = HashSet::from_iter(
597 query
598 .did
599 .iter()
600 .map(|d| d.trim())
601 .filter(|d| !d.is_empty())
602 .map(|d| Did(d.to_string())),
603 );
604
605 if let Some(comma_joined) = &query.from_dids {
606 if !filter_dids.is_empty() {
607 return Err(http::StatusCode::BAD_REQUEST);
608 }
609 for did in comma_joined.split(',') {
610 filter_dids.insert(Did(did.to_string()));
611 }
612 }
613
614 let paged = store
615 .get_links(
616 &query.target,
617 &query.collection,
618 &query.path,
619 Order::NewestToOldest,
620 limit,
621 until,
622 &filter_dids,
623 )
624 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
625
626 let cursor = paged.next.map(|next| {
627 ApiCursor {
628 version: paged.version,
629 next,
630 }
631 .into()
632 });
633
634 Ok(acceptable(
635 accept,
636 GetLinkItemsResponse {
637 total: paged.total,
638 linking_records: paged.items,
639 cursor,
640 query: (*query).clone(),
641 },
642 ))
643}
644
645#[derive(Clone, Deserialize)]
646struct GetDidItemsQuery {
647 target: String,
648 collection: String,
649 path: String,
650 cursor: Option<OpaqueApiCursor>,
651 limit: Option<u64>,
652 // TODO: allow reverse (er, forward) order as well
653}
654#[derive(Template, Serialize)]
655#[template(path = "dids.html.j2")]
656struct GetDidItemsResponse {
657 // what does staleness mean?
658 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
659 // - links have been deleted. hmm.
660 total: u64,
661 linking_dids: Vec<Did>,
662 cursor: Option<OpaqueApiCursor>,
663 #[serde(skip_serializing)]
664 query: GetDidItemsQuery,
665}
666fn get_distinct_dids(
667 accept: ExtractAccept,
668 query: Query<GetDidItemsQuery>,
669 store: impl LinkReader,
670) -> Result<impl IntoResponse, http::StatusCode> {
671 let until = query
672 .cursor
673 .clone()
674 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
675 .transpose()?
676 .map(|c| c.next);
677
678 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT);
679 if limit > DEFAULT_CURSOR_LIMIT_MAX {
680 return Err(http::StatusCode::BAD_REQUEST);
681 }
682
683 let paged = store
684 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until)
685 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
686
687 let cursor = paged.next.map(|next| {
688 ApiCursor {
689 version: paged.version,
690 next,
691 }
692 .into()
693 });
694
695 Ok(acceptable(
696 accept,
697 GetDidItemsResponse {
698 total: paged.total,
699 linking_dids: paged.items,
700 cursor,
701 query: (*query).clone(),
702 },
703 ))
704}
705
706#[derive(Clone, Deserialize)]
707struct GetAllLinksQuery {
708 target: String,
709}
710#[derive(Template, Serialize)]
711#[template(path = "links-all-count.html.j2")]
712struct GetAllLinksResponse {
713 links: HashMap<String, HashMap<String, u64>>,
714 #[serde(skip_serializing)]
715 query: GetAllLinksQuery,
716}
717fn count_all_links(
718 accept: ExtractAccept,
719 query: Query<GetAllLinksQuery>,
720 store: impl LinkReader,
721) -> Result<impl IntoResponse, http::StatusCode> {
722 let links = store
723 .get_all_record_counts(&query.target)
724 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
725 Ok(acceptable(
726 accept,
727 GetAllLinksResponse {
728 links,
729 query: (*query).clone(),
730 },
731 ))
732}
733
734#[derive(Clone, Deserialize)]
735struct ExploreLinksQuery {
736 target: String,
737}
738#[derive(Template, Serialize)]
739#[template(path = "explore-links.html.j2")]
740struct ExploreLinksResponse {
741 links: HashMap<String, HashMap<String, CountsByCount>>,
742 #[serde(skip_serializing)]
743 query: ExploreLinksQuery,
744}
745fn explore_links(
746 accept: ExtractAccept,
747 query: Query<ExploreLinksQuery>,
748 store: impl LinkReader,
749) -> Result<impl IntoResponse, http::StatusCode> {
750 let links = store
751 .get_all_counts(&query.target)
752 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
753 Ok(acceptable(
754 accept,
755 ExploreLinksResponse {
756 links,
757 query: (*query).clone(),
758 },
759 ))
760}
761
762#[serde_as]
763#[derive(Clone, Serialize, Deserialize)] // for json
764struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>);
765
766#[derive(Serialize, Deserialize)] // for bincode
767struct ApiCursor {
768 version: (u64, u64), // (collection length, deleted item count)
769 next: u64,
770}
771
772impl TryFrom<OpaqueApiCursor> for ApiCursor {
773 type Error = bincode::Error;
774
775 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
776 bincode::DefaultOptions::new().deserialize(&item.0)
777 }
778}
779
780impl From<ApiCursor> for OpaqueApiCursor {
781 fn from(item: ApiCursor) -> Self {
782 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
783 }
784}
785
786#[derive(Serialize, Deserialize)] // for bincode
787struct ApiKeyedCursor {
788 next: String, // the key
789}
790
791impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor {
792 type Error = bincode::Error;
793
794 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
795 bincode::DefaultOptions::new().deserialize(&item.0)
796 }
797}
798
799impl From<ApiKeyedCursor> for OpaqueApiCursor {
800 fn from(item: ApiKeyedCursor) -> Self {
801 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
802 }
803}