Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at proxy-blobby 803 lines 22 kB view raw
1use askama::Template; 2use axum::{ 3 extract::{Query, Request}, 4 http::{self, header}, 5 middleware::{self, Next}, 6 response::{IntoResponse, Response}, 7 routing::get, 8 Router, 9}; 10use axum_metrics::{ExtraMetricLabels, MetricLayer}; 11use bincode::Options; 12use serde::{Deserialize, Serialize}; 13use serde_with::serde_as; 14use std::collections::{HashMap, HashSet}; 15use std::time::{Duration, UNIX_EPOCH}; 16use tokio::net::{TcpListener, ToSocketAddrs}; 17use tokio::task::spawn_blocking; 18use tokio_util::sync::CancellationToken; 19 20use crate::storage::{LinkReader, Order, StorageStats}; 21use crate::{CountsByCount, Did, RecordId}; 22 23mod acceptable; 24mod filters; 25 26use acceptable::{acceptable, ExtractAccept}; 27 28const DEFAULT_CURSOR_LIMIT: u64 = 100; 29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000; 30 31fn get_default_cursor_limit() -> u64 { 32 DEFAULT_CURSOR_LIMIT 33} 34 35fn to500(e: tokio::task::JoinError) -> http::StatusCode { 36 eprintln!("handler error: {e}"); 37 http::StatusCode::INTERNAL_SERVER_ERROR 38} 39 40pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 41where 42 S: LinkReader, 43 A: ToSocketAddrs, 44{ 45 let app = Router::new() 46 .route("/robots.txt", get(robots)) 47 .route( 48 "/", 49 get({ 50 let store = store.clone(); 51 move |accept| async { 52 spawn_blocking(|| hello(accept, store)) 53 .await 54 .map_err(to500)? 55 } 56 }), 57 ) 58 .route( 59 "/xrpc/blue.microcosm.links.getManyToManyCounts", 60 get({ 61 let store = store.clone(); 62 move |accept, query| async { 63 spawn_blocking(|| get_many_to_many_counts(accept, query, store)) 64 .await 65 .map_err(to500)? 66 } 67 }), 68 ) 69 // deprecated 70 .route( 71 "/links/count", 72 get({ 73 let store = store.clone(); 74 move |accept, query| async { 75 spawn_blocking(|| count_links(accept, query, store)) 76 .await 77 .map_err(to500)? 78 } 79 }), 80 ) 81 .route( 82 "/xrpc/blue.microcosm.links.getBacklinksCount", 83 get({ 84 let store = store.clone(); 85 move |accept, query| async { 86 spawn_blocking(|| get_backlink_counts(accept, query, store)) 87 .await 88 .map_err(to500)? 89 } 90 }), 91 ) 92 .route( 93 "/links/count/distinct-dids", 94 get({ 95 let store = store.clone(); 96 move |accept, query| async { 97 spawn_blocking(|| count_distinct_dids(accept, query, store)) 98 .await 99 .map_err(to500)? 100 } 101 }), 102 ) 103 .route( 104 "/xrpc/blue.microcosm.links.getBacklinks", 105 get({ 106 let store = store.clone(); 107 move |accept, query| async { 108 spawn_blocking(|| get_backlinks(accept, query, store)) 109 .await 110 .map_err(to500)? 111 } 112 }), 113 ) 114 .route( 115 "/links", 116 get({ 117 let store = store.clone(); 118 move |accept, query| async { 119 spawn_blocking(|| get_links(accept, query, store)) 120 .await 121 .map_err(to500)? 122 } 123 }), 124 ) 125 .route( 126 "/links/distinct-dids", 127 get({ 128 let store = store.clone(); 129 move |accept, query| async { 130 spawn_blocking(|| get_distinct_dids(accept, query, store)) 131 .await 132 .map_err(to500)? 133 } 134 }), 135 ) 136 .route( 137 // deprecated 138 "/links/all/count", 139 get({ 140 let store = store.clone(); 141 move |accept, query| async { 142 spawn_blocking(|| count_all_links(accept, query, store)) 143 .await 144 .map_err(to500)? 145 } 146 }), 147 ) 148 .route( 149 "/links/all", 150 get({ 151 let store = store.clone(); 152 move |accept, query| async { 153 spawn_blocking(|| explore_links(accept, query, store)) 154 .await 155 .map_err(to500)? 156 } 157 }), 158 ) 159 .layer(tower_http::cors::CorsLayer::permissive()) 160 .layer(middleware::from_fn(add_lables)) 161 .layer(MetricLayer::default()); 162 163 let listener = TcpListener::bind(addr).await?; 164 println!("api: listening at http://{:?}", listener.local_addr()?); 165 axum::serve(listener, app) 166 .with_graceful_shutdown(async move { stay_alive.cancelled().await }) 167 .await?; 168 169 Ok(()) 170} 171 172async fn add_lables(request: Request, next: Next) -> Response { 173 let origin = request 174 .headers() 175 .get(header::ORIGIN) 176 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok()); 177 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| { 178 ua.to_str() 179 .map(|v| { 180 if v.starts_with("Mozilla/") { 181 "Mozilla/...".into() 182 } else { 183 v.to_owned() 184 } 185 }) 186 .ok() 187 }); 188 189 let mut res = next.run(request).await; 190 191 let mut labels = Vec::new(); 192 if let Some(o) = origin { 193 labels.push(metrics::Label::new("origin", o)); 194 } 195 if let Some(ua) = user_agent { 196 labels.push(metrics::Label::new("user_agent", ua)); 197 } 198 res.extensions_mut().insert(ExtraMetricLabels(labels)); 199 res 200} 201 202async fn robots() -> &'static str { 203 "\ 204User-agent: * 205Disallow: /links 206Disallow: /links/ 207 " 208} 209 210#[derive(Template, Serialize, Deserialize)] 211#[template(path = "hello.html.j2")] 212struct HelloReponse { 213 help: &'static str, 214 days_indexed: Option<u64>, 215 stats: StorageStats, 216} 217fn hello( 218 accept: ExtractAccept, 219 store: impl LinkReader, 220) -> Result<impl IntoResponse, http::StatusCode> { 221 let stats = store 222 .get_stats() 223 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 224 let days_indexed = stats 225 .started_at 226 .map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed()) 227 .transpose() 228 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)? 229 .map(|d| d.as_secs() / 86_400); 230 Ok(acceptable(accept, HelloReponse { 231 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.", 232 days_indexed, 233 stats, 234 })) 235} 236 237#[derive(Clone, Deserialize)] 238#[serde(rename_all = "camelCase")] 239struct GetManyToManyCountsQuery { 240 subject: String, 241 source: String, 242 /// path to the secondary link in the linking record 243 path_to_other: String, 244 /// filter to linking records (join of the m2m) by these DIDs 245 #[serde(default)] 246 did: Vec<String>, 247 /// filter to specific secondary records 248 #[serde(default)] 249 other_subject: Vec<String>, 250 cursor: Option<OpaqueApiCursor>, 251 /// Set the max number of links to return per page of results 252 #[serde(default = "get_default_cursor_limit")] 253 limit: u64, 254} 255#[derive(Serialize)] 256struct OtherSubjectCount { 257 subject: String, 258 total: u64, 259 distinct: u64, 260} 261#[derive(Template, Serialize)] 262#[template(path = "get-many-to-many-counts.html.j2")] 263struct GetManyToManyCountsResponse { 264 counts_by_other_subject: Vec<OtherSubjectCount>, 265 cursor: Option<OpaqueApiCursor>, 266 #[serde(skip_serializing)] 267 query: GetManyToManyCountsQuery, 268} 269fn get_many_to_many_counts( 270 accept: ExtractAccept, 271 query: axum_extra::extract::Query<GetManyToManyCountsQuery>, 272 store: impl LinkReader, 273) -> Result<impl IntoResponse, http::StatusCode> { 274 let cursor_key = query 275 .cursor 276 .clone() 277 .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 278 .transpose()? 279 .map(|c| c.next); 280 281 let limit = query.limit; 282 if limit > DEFAULT_CURSOR_LIMIT_MAX { 283 return Err(http::StatusCode::BAD_REQUEST); 284 } 285 286 let filter_dids: HashSet<Did> = HashSet::from_iter( 287 query 288 .did 289 .iter() 290 .map(|d| d.trim()) 291 .filter(|d| !d.is_empty()) 292 .map(|d| Did(d.to_string())), 293 ); 294 295 let filter_other_subjects: HashSet<String> = HashSet::from_iter( 296 query 297 .other_subject 298 .iter() 299 .map(|s| s.trim().to_string()) 300 .filter(|s| !s.is_empty()), 301 ); 302 303 let Some((collection, path)) = query.source.split_once(':') else { 304 return Err(http::StatusCode::BAD_REQUEST); 305 }; 306 let path = format!(".{path}"); 307 308 let path_to_other = format!(".{}", query.path_to_other); 309 310 let paged = store 311 .get_many_to_many_counts( 312 &query.subject, 313 collection, 314 &path, 315 &path_to_other, 316 limit, 317 cursor_key, 318 &filter_dids, 319 &filter_other_subjects, 320 ) 321 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 322 323 let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 324 325 let items = paged 326 .items 327 .into_iter() 328 .map(|(subject, total, distinct)| OtherSubjectCount { 329 subject, 330 total, 331 distinct, 332 }) 333 .collect(); 334 335 Ok(acceptable( 336 accept, 337 GetManyToManyCountsResponse { 338 counts_by_other_subject: items, 339 cursor, 340 query: (*query).clone(), 341 }, 342 )) 343} 344 345#[derive(Clone, Deserialize)] 346struct GetLinksCountQuery { 347 target: String, 348 collection: String, 349 path: String, 350} 351#[derive(Template, Serialize)] 352#[template(path = "links-count.html.j2")] 353struct GetLinksCountResponse { 354 total: u64, 355 #[serde(skip_serializing)] 356 query: GetLinksCountQuery, 357} 358fn count_links( 359 accept: ExtractAccept, 360 query: Query<GetLinksCountQuery>, 361 store: impl LinkReader, 362) -> Result<impl IntoResponse, http::StatusCode> { 363 let total = store 364 .get_count(&query.target, &query.collection, &query.path) 365 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 366 Ok(acceptable( 367 accept, 368 GetLinksCountResponse { 369 total, 370 query: (*query).clone(), 371 }, 372 )) 373} 374 375#[derive(Clone, Deserialize)] 376struct GetItemsCountQuery { 377 subject: String, 378 source: String, 379} 380#[derive(Template, Serialize)] 381#[template(path = "get-backlinks-count.html.j2")] 382struct GetItemsCountResponse { 383 total: u64, 384 #[serde(skip_serializing)] 385 query: GetItemsCountQuery, 386} 387fn get_backlink_counts( 388 accept: ExtractAccept, 389 query: axum_extra::extract::Query<GetItemsCountQuery>, 390 store: impl LinkReader, 391) -> Result<impl IntoResponse, http::StatusCode> { 392 let Some((collection, path)) = query.source.split_once(':') else { 393 return Err(http::StatusCode::BAD_REQUEST); 394 }; 395 let path = format!(".{path}"); 396 let total = store 397 .get_count(&query.subject, collection, &path) 398 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 399 400 Ok(acceptable( 401 accept, 402 GetItemsCountResponse { 403 total, 404 query: (*query).clone(), 405 }, 406 )) 407} 408 409#[derive(Clone, Deserialize)] 410struct GetDidsCountQuery { 411 target: String, 412 collection: String, 413 path: String, 414} 415#[derive(Template, Serialize)] 416#[template(path = "dids-count.html.j2")] 417struct GetDidsCountResponse { 418 total: u64, 419 #[serde(skip_serializing)] 420 query: GetDidsCountQuery, 421} 422fn count_distinct_dids( 423 accept: ExtractAccept, 424 query: Query<GetDidsCountQuery>, 425 store: impl LinkReader, 426) -> Result<impl IntoResponse, http::StatusCode> { 427 let total = store 428 .get_distinct_did_count(&query.target, &query.collection, &query.path) 429 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 430 Ok(acceptable( 431 accept, 432 GetDidsCountResponse { 433 total, 434 query: (*query).clone(), 435 }, 436 )) 437} 438 439#[derive(Clone, Deserialize)] 440struct GetBacklinksQuery { 441 /// The link target 442 /// 443 /// can be an AT-URI, plain DID, or regular URI 444 subject: String, 445 /// Filter links only from this link source 446 /// 447 /// eg.: `app.bsky.feed.like:subject.uri` 448 source: String, 449 cursor: Option<OpaqueApiCursor>, 450 /// Filter links only from these DIDs 451 /// 452 /// include multiple times to filter by multiple source DIDs 453 #[serde(default)] 454 did: Vec<String>, 455 /// Set the max number of links to return per page of results 456 #[serde(default = "get_default_cursor_limit")] 457 limit: u64, 458 /// Allow returning links in reverse order (default: false) 459 #[serde(default)] 460 reverse: bool, 461} 462#[derive(Template, Serialize)] 463#[template(path = "get-backlinks.html.j2")] 464struct GetBacklinksResponse { 465 total: u64, 466 records: Vec<RecordId>, 467 cursor: Option<OpaqueApiCursor>, 468 #[serde(skip_serializing)] 469 query: GetBacklinksQuery, 470 #[serde(skip_serializing)] 471 collection: String, 472 #[serde(skip_serializing)] 473 path: String, 474} 475fn get_backlinks( 476 accept: ExtractAccept, 477 query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences 478 store: impl LinkReader, 479) -> Result<impl IntoResponse, http::StatusCode> { 480 let until = query 481 .cursor 482 .clone() 483 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 484 .transpose()? 485 .map(|c| c.next); 486 487 let limit = query.limit; 488 if limit > DEFAULT_CURSOR_LIMIT_MAX { 489 return Err(http::StatusCode::BAD_REQUEST); 490 } 491 492 let filter_dids: HashSet<Did> = HashSet::from_iter( 493 query 494 .did 495 .iter() 496 .map(|d| d.trim()) 497 .filter(|d| !d.is_empty()) 498 .map(|d| Did(d.to_string())), 499 ); 500 501 let Some((collection, path)) = query.source.split_once(':') else { 502 return Err(http::StatusCode::BAD_REQUEST); 503 }; 504 let path = format!(".{path}"); 505 506 let order = if query.reverse { 507 Order::OldestToNewest 508 } else { 509 Order::NewestToOldest 510 }; 511 512 let paged = store 513 .get_links( 514 &query.subject, 515 collection, 516 &path, 517 order, 518 limit, 519 until, 520 &filter_dids, 521 ) 522 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 523 524 let cursor = paged.next.map(|next| { 525 ApiCursor { 526 version: paged.version, 527 next, 528 } 529 .into() 530 }); 531 532 Ok(acceptable( 533 accept, 534 GetBacklinksResponse { 535 total: paged.total, 536 records: paged.items, 537 cursor, 538 query: (*query).clone(), 539 collection: collection.to_string(), 540 path, 541 }, 542 )) 543} 544 545#[derive(Clone, Deserialize)] 546struct GetLinkItemsQuery { 547 target: String, 548 collection: String, 549 path: String, 550 cursor: Option<OpaqueApiCursor>, 551 /// Filter links only from these DIDs 552 /// 553 /// include multiple times to filter by multiple source DIDs 554 #[serde(default)] 555 did: Vec<String>, 556 /// [deprecated] Filter links only from these DIDs 557 /// 558 /// format: comma-separated sequence of DIDs 559 /// 560 /// errors: if `did` parameter is also present 561 /// 562 /// deprecated: use `did`, which can be repeated multiple times 563 from_dids: Option<String>, // comma separated: gross 564 #[serde(default = "get_default_cursor_limit")] 565 limit: u64, 566} 567#[derive(Template, Serialize)] 568#[template(path = "links.html.j2")] 569struct GetLinkItemsResponse { 570 // what does staleness mean? 571 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 572 // - links have been deleted. hmm. 573 total: u64, 574 linking_records: Vec<RecordId>, 575 cursor: Option<OpaqueApiCursor>, 576 #[serde(skip_serializing)] 577 query: GetLinkItemsQuery, 578} 579fn get_links( 580 accept: ExtractAccept, 581 query: axum_extra::extract::Query<GetLinkItemsQuery>, // supports multiple param occurrences 582 store: impl LinkReader, 583) -> Result<impl IntoResponse, http::StatusCode> { 584 let until = query 585 .cursor 586 .clone() 587 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 588 .transpose()? 589 .map(|c| c.next); 590 591 let limit = query.limit; 592 if limit > DEFAULT_CURSOR_LIMIT_MAX { 593 return Err(http::StatusCode::BAD_REQUEST); 594 } 595 596 let mut filter_dids: HashSet<Did> = HashSet::from_iter( 597 query 598 .did 599 .iter() 600 .map(|d| d.trim()) 601 .filter(|d| !d.is_empty()) 602 .map(|d| Did(d.to_string())), 603 ); 604 605 if let Some(comma_joined) = &query.from_dids { 606 if !filter_dids.is_empty() { 607 return Err(http::StatusCode::BAD_REQUEST); 608 } 609 for did in comma_joined.split(',') { 610 filter_dids.insert(Did(did.to_string())); 611 } 612 } 613 614 let paged = store 615 .get_links( 616 &query.target, 617 &query.collection, 618 &query.path, 619 Order::NewestToOldest, 620 limit, 621 until, 622 &filter_dids, 623 ) 624 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 625 626 let cursor = paged.next.map(|next| { 627 ApiCursor { 628 version: paged.version, 629 next, 630 } 631 .into() 632 }); 633 634 Ok(acceptable( 635 accept, 636 GetLinkItemsResponse { 637 total: paged.total, 638 linking_records: paged.items, 639 cursor, 640 query: (*query).clone(), 641 }, 642 )) 643} 644 645#[derive(Clone, Deserialize)] 646struct GetDidItemsQuery { 647 target: String, 648 collection: String, 649 path: String, 650 cursor: Option<OpaqueApiCursor>, 651 limit: Option<u64>, 652 // TODO: allow reverse (er, forward) order as well 653} 654#[derive(Template, Serialize)] 655#[template(path = "dids.html.j2")] 656struct GetDidItemsResponse { 657 // what does staleness mean? 658 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 659 // - links have been deleted. hmm. 660 total: u64, 661 linking_dids: Vec<Did>, 662 cursor: Option<OpaqueApiCursor>, 663 #[serde(skip_serializing)] 664 query: GetDidItemsQuery, 665} 666fn get_distinct_dids( 667 accept: ExtractAccept, 668 query: Query<GetDidItemsQuery>, 669 store: impl LinkReader, 670) -> Result<impl IntoResponse, http::StatusCode> { 671 let until = query 672 .cursor 673 .clone() 674 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 675 .transpose()? 676 .map(|c| c.next); 677 678 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT); 679 if limit > DEFAULT_CURSOR_LIMIT_MAX { 680 return Err(http::StatusCode::BAD_REQUEST); 681 } 682 683 let paged = store 684 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until) 685 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 686 687 let cursor = paged.next.map(|next| { 688 ApiCursor { 689 version: paged.version, 690 next, 691 } 692 .into() 693 }); 694 695 Ok(acceptable( 696 accept, 697 GetDidItemsResponse { 698 total: paged.total, 699 linking_dids: paged.items, 700 cursor, 701 query: (*query).clone(), 702 }, 703 )) 704} 705 706#[derive(Clone, Deserialize)] 707struct GetAllLinksQuery { 708 target: String, 709} 710#[derive(Template, Serialize)] 711#[template(path = "links-all-count.html.j2")] 712struct GetAllLinksResponse { 713 links: HashMap<String, HashMap<String, u64>>, 714 #[serde(skip_serializing)] 715 query: GetAllLinksQuery, 716} 717fn count_all_links( 718 accept: ExtractAccept, 719 query: Query<GetAllLinksQuery>, 720 store: impl LinkReader, 721) -> Result<impl IntoResponse, http::StatusCode> { 722 let links = store 723 .get_all_record_counts(&query.target) 724 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 725 Ok(acceptable( 726 accept, 727 GetAllLinksResponse { 728 links, 729 query: (*query).clone(), 730 }, 731 )) 732} 733 734#[derive(Clone, Deserialize)] 735struct ExploreLinksQuery { 736 target: String, 737} 738#[derive(Template, Serialize)] 739#[template(path = "explore-links.html.j2")] 740struct ExploreLinksResponse { 741 links: HashMap<String, HashMap<String, CountsByCount>>, 742 #[serde(skip_serializing)] 743 query: ExploreLinksQuery, 744} 745fn explore_links( 746 accept: ExtractAccept, 747 query: Query<ExploreLinksQuery>, 748 store: impl LinkReader, 749) -> Result<impl IntoResponse, http::StatusCode> { 750 let links = store 751 .get_all_counts(&query.target) 752 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 753 Ok(acceptable( 754 accept, 755 ExploreLinksResponse { 756 links, 757 query: (*query).clone(), 758 }, 759 )) 760} 761 762#[serde_as] 763#[derive(Clone, Serialize, Deserialize)] // for json 764struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>); 765 766#[derive(Serialize, Deserialize)] // for bincode 767struct ApiCursor { 768 version: (u64, u64), // (collection length, deleted item count) 769 next: u64, 770} 771 772impl TryFrom<OpaqueApiCursor> for ApiCursor { 773 type Error = bincode::Error; 774 775 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 776 bincode::DefaultOptions::new().deserialize(&item.0) 777 } 778} 779 780impl From<ApiCursor> for OpaqueApiCursor { 781 fn from(item: ApiCursor) -> Self { 782 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 783 } 784} 785 786#[derive(Serialize, Deserialize)] // for bincode 787struct ApiKeyedCursor { 788 next: String, // the key 789} 790 791impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor { 792 type Error = bincode::Error; 793 794 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 795 bincode::DefaultOptions::new().deserialize(&item.0) 796 } 797} 798 799impl From<ApiKeyedCursor> for OpaqueApiCursor { 800 fn from(item: ApiKeyedCursor) -> Self { 801 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 802 } 803}