Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at order_query 757 lines 21 kB view raw
1use askama::Template; 2use axum::{ 3 extract::{Query, Request}, 4 http::{self, header}, 5 middleware::{self, Next}, 6 response::{IntoResponse, Response}, 7 routing::get, 8 Router, 9}; 10use axum_metrics::{ExtraMetricLabels, MetricLayer}; 11use bincode::Options; 12use serde::{Deserialize, Serialize}; 13use serde_with::serde_as; 14use std::collections::{HashMap, HashSet}; 15use std::time::{Duration, UNIX_EPOCH}; 16use tokio::net::{TcpListener, ToSocketAddrs}; 17use tokio::task::spawn_blocking; 18use tokio_util::sync::CancellationToken; 19 20use crate::storage::{LinkReader, Order, StorageStats}; 21use crate::{CountsByCount, Did, RecordId}; 22 23mod acceptable; 24mod filters; 25 26use acceptable::{acceptable, ExtractAccept}; 27 28const DEFAULT_CURSOR_LIMIT: u64 = 100; 29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000; 30 31fn get_default_cursor_limit() -> u64 { 32 DEFAULT_CURSOR_LIMIT 33} 34 35fn to500(e: tokio::task::JoinError) -> http::StatusCode { 36 eprintln!("handler error: {e}"); 37 http::StatusCode::INTERNAL_SERVER_ERROR 38} 39 40pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 41where 42 S: LinkReader, 43 A: ToSocketAddrs, 44{ 45 let app = Router::new() 46 .route("/robots.txt", get(robots)) 47 .route( 48 "/", 49 get({ 50 let store = store.clone(); 51 move |accept| async { 52 spawn_blocking(|| hello(accept, store)) 53 .await 54 .map_err(to500)? 55 } 56 }), 57 ) 58 .route( 59 "/xrpc/blue.microcosm.links.getManyToManyCounts", 60 get({ 61 let store = store.clone(); 62 move |accept, query| async { 63 spawn_blocking(|| get_many_to_many_counts(accept, query, store)) 64 .await 65 .map_err(to500)? 66 } 67 }), 68 ) 69 .route( 70 "/links/count", 71 get({ 72 let store = store.clone(); 73 move |accept, query| async { 74 spawn_blocking(|| count_links(accept, query, store)) 75 .await 76 .map_err(to500)? 77 } 78 }), 79 ) 80 .route( 81 "/links/count/distinct-dids", 82 get({ 83 let store = store.clone(); 84 move |accept, query| async { 85 spawn_blocking(|| count_distinct_dids(accept, query, store)) 86 .await 87 .map_err(to500)? 88 } 89 }), 90 ) 91 .route( 92 "/xrpc/blue.microcosm.links.getBacklinks", 93 get({ 94 let store = store.clone(); 95 move |accept, query| async { 96 spawn_blocking(|| get_backlinks(accept, query, store)) 97 .await 98 .map_err(to500)? 99 } 100 }), 101 ) 102 .route( 103 "/links", 104 get({ 105 let store = store.clone(); 106 move |accept, query| async { 107 spawn_blocking(|| get_links(accept, query, store)) 108 .await 109 .map_err(to500)? 110 } 111 }), 112 ) 113 .route( 114 "/links/distinct-dids", 115 get({ 116 let store = store.clone(); 117 move |accept, query| async { 118 spawn_blocking(|| get_distinct_dids(accept, query, store)) 119 .await 120 .map_err(to500)? 121 } 122 }), 123 ) 124 .route( 125 // deprecated 126 "/links/all/count", 127 get({ 128 let store = store.clone(); 129 move |accept, query| async { 130 spawn_blocking(|| count_all_links(accept, query, store)) 131 .await 132 .map_err(to500)? 133 } 134 }), 135 ) 136 .route( 137 "/links/all", 138 get({ 139 let store = store.clone(); 140 move |accept, query| async { 141 spawn_blocking(|| explore_links(accept, query, store)) 142 .await 143 .map_err(to500)? 144 } 145 }), 146 ) 147 .layer(tower_http::cors::CorsLayer::permissive()) 148 .layer(middleware::from_fn(add_lables)) 149 .layer(MetricLayer::default()); 150 151 let listener = TcpListener::bind(addr).await?; 152 println!("api: listening at http://{:?}", listener.local_addr()?); 153 axum::serve(listener, app) 154 .with_graceful_shutdown(async move { stay_alive.cancelled().await }) 155 .await?; 156 157 Ok(()) 158} 159 160async fn add_lables(request: Request, next: Next) -> Response { 161 let origin = request 162 .headers() 163 .get(header::ORIGIN) 164 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok()); 165 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| { 166 ua.to_str() 167 .map(|v| { 168 if v.starts_with("Mozilla/") { 169 "Mozilla/...".into() 170 } else { 171 v.to_owned() 172 } 173 }) 174 .ok() 175 }); 176 177 let mut res = next.run(request).await; 178 179 let mut labels = Vec::new(); 180 if let Some(o) = origin { 181 labels.push(metrics::Label::new("origin", o)); 182 } 183 if let Some(ua) = user_agent { 184 labels.push(metrics::Label::new("user_agent", ua)); 185 } 186 res.extensions_mut().insert(ExtraMetricLabels(labels)); 187 res 188} 189 190async fn robots() -> &'static str { 191 "\ 192User-agent: * 193Disallow: /links 194Disallow: /links/ 195 " 196} 197 198#[derive(Template, Serialize, Deserialize)] 199#[template(path = "hello.html.j2")] 200struct HelloReponse { 201 help: &'static str, 202 days_indexed: Option<u64>, 203 stats: StorageStats, 204} 205fn hello( 206 accept: ExtractAccept, 207 store: impl LinkReader, 208) -> Result<impl IntoResponse, http::StatusCode> { 209 let stats = store 210 .get_stats() 211 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 212 let days_indexed = stats 213 .started_at 214 .map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed()) 215 .transpose() 216 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)? 217 .map(|d| d.as_secs() / 86_400); 218 Ok(acceptable(accept, HelloReponse { 219 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.", 220 days_indexed, 221 stats, 222 })) 223} 224 225#[derive(Clone, Deserialize)] 226#[serde(rename_all = "camelCase")] 227struct GetManyToManyCountsQuery { 228 subject: String, 229 source: String, 230 /// path to the secondary link in the linking record 231 path_to_other: String, 232 /// filter to linking records (join of the m2m) by these DIDs 233 #[serde(default)] 234 did: Vec<String>, 235 /// filter to specific secondary records 236 #[serde(default)] 237 other_subject: Vec<String>, 238 cursor: Option<OpaqueApiCursor>, 239 /// Set the max number of links to return per page of results 240 #[serde(default = "get_default_cursor_limit")] 241 limit: u64, 242} 243#[derive(Serialize)] 244struct OtherSubjectCount { 245 subject: String, 246 total: u64, 247 distinct: u64, 248} 249#[derive(Template, Serialize)] 250#[template(path = "get-many-to-many-counts.html.j2")] 251struct GetManyToManyCountsResponse { 252 counts_by_other_subject: Vec<OtherSubjectCount>, 253 cursor: Option<OpaqueApiCursor>, 254 #[serde(skip_serializing)] 255 query: GetManyToManyCountsQuery, 256} 257fn get_many_to_many_counts( 258 accept: ExtractAccept, 259 query: axum_extra::extract::Query<GetManyToManyCountsQuery>, 260 store: impl LinkReader, 261) -> Result<impl IntoResponse, http::StatusCode> { 262 let cursor_key = query 263 .cursor 264 .clone() 265 .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 266 .transpose()? 267 .map(|c| c.next); 268 269 let limit = query.limit; 270 if limit > DEFAULT_CURSOR_LIMIT_MAX { 271 return Err(http::StatusCode::BAD_REQUEST); 272 } 273 274 let filter_dids: HashSet<Did> = HashSet::from_iter( 275 query 276 .did 277 .iter() 278 .map(|d| d.trim()) 279 .filter(|d| !d.is_empty()) 280 .map(|d| Did(d.to_string())), 281 ); 282 283 let filter_other_subjects: HashSet<String> = HashSet::from_iter( 284 query 285 .other_subject 286 .iter() 287 .map(|s| s.trim().to_string()) 288 .filter(|s| !s.is_empty()), 289 ); 290 291 let Some((collection, path)) = query.source.split_once(':') else { 292 return Err(http::StatusCode::BAD_REQUEST); 293 }; 294 let path = format!(".{path}"); 295 296 let path_to_other = format!(".{}", query.path_to_other); 297 298 let paged = store 299 .get_many_to_many_counts( 300 &query.subject, 301 collection, 302 &path, 303 &path_to_other, 304 limit, 305 cursor_key, 306 &filter_dids, 307 &filter_other_subjects, 308 ) 309 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 310 311 let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 312 313 let items = paged 314 .items 315 .into_iter() 316 .map(|(subject, total, distinct)| OtherSubjectCount { 317 subject, 318 total, 319 distinct, 320 }) 321 .collect(); 322 323 Ok(acceptable( 324 accept, 325 GetManyToManyCountsResponse { 326 counts_by_other_subject: items, 327 cursor, 328 query: (*query).clone(), 329 }, 330 )) 331} 332 333#[derive(Clone, Deserialize)] 334struct GetLinksCountQuery { 335 target: String, 336 collection: String, 337 path: String, 338} 339#[derive(Template, Serialize)] 340#[template(path = "links-count.html.j2")] 341struct GetLinksCountResponse { 342 total: u64, 343 #[serde(skip_serializing)] 344 query: GetLinksCountQuery, 345} 346fn count_links( 347 accept: ExtractAccept, 348 query: Query<GetLinksCountQuery>, 349 store: impl LinkReader, 350) -> Result<impl IntoResponse, http::StatusCode> { 351 let total = store 352 .get_count(&query.target, &query.collection, &query.path) 353 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 354 Ok(acceptable( 355 accept, 356 GetLinksCountResponse { 357 total, 358 query: (*query).clone(), 359 }, 360 )) 361} 362 363#[derive(Clone, Deserialize)] 364struct GetDidsCountQuery { 365 target: String, 366 collection: String, 367 path: String, 368} 369#[derive(Template, Serialize)] 370#[template(path = "dids-count.html.j2")] 371struct GetDidsCountResponse { 372 total: u64, 373 #[serde(skip_serializing)] 374 query: GetDidsCountQuery, 375} 376fn count_distinct_dids( 377 accept: ExtractAccept, 378 query: Query<GetDidsCountQuery>, 379 store: impl LinkReader, 380) -> Result<impl IntoResponse, http::StatusCode> { 381 let total = store 382 .get_distinct_did_count(&query.target, &query.collection, &query.path) 383 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 384 Ok(acceptable( 385 accept, 386 GetDidsCountResponse { 387 total, 388 query: (*query).clone(), 389 }, 390 )) 391} 392 393#[derive(Clone, Deserialize)] 394struct GetBacklinksQuery { 395 /// The link target 396 /// 397 /// can be an AT-URI, plain DID, or regular URI 398 subject: String, 399 /// Filter links only from this link source 400 /// 401 /// eg.: `app.bsky.feed.like:subject.uri` 402 source: String, 403 cursor: Option<OpaqueApiCursor>, 404 /// Filter links only from these DIDs 405 /// 406 /// include multiple times to filter by multiple source DIDs 407 #[serde(default)] 408 did: Vec<String>, 409 /// Set the max number of links to return per page of results 410 #[serde(default = "get_default_cursor_limit")] 411 limit: u64, 412 /// Allow returning links in reverse order (default: false) 413 #[serde(default)] 414 reverse: bool, 415} 416#[derive(Template, Serialize)] 417#[template(path = "get-backlinks.html.j2")] 418struct GetBacklinksResponse { 419 total: u64, 420 records: Vec<RecordId>, 421 cursor: Option<OpaqueApiCursor>, 422 #[serde(skip_serializing)] 423 query: GetBacklinksQuery, 424 #[serde(skip_serializing)] 425 collection: String, 426 #[serde(skip_serializing)] 427 path: String, 428} 429fn get_backlinks( 430 accept: ExtractAccept, 431 query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences 432 store: impl LinkReader, 433) -> Result<impl IntoResponse, http::StatusCode> { 434 let until = query 435 .cursor 436 .clone() 437 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 438 .transpose()? 439 .map(|c| c.next); 440 441 let limit = query.limit; 442 if limit > DEFAULT_CURSOR_LIMIT_MAX { 443 return Err(http::StatusCode::BAD_REQUEST); 444 } 445 446 let filter_dids: HashSet<Did> = HashSet::from_iter( 447 query 448 .did 449 .iter() 450 .map(|d| d.trim()) 451 .filter(|d| !d.is_empty()) 452 .map(|d| Did(d.to_string())), 453 ); 454 455 let Some((collection, path)) = query.source.split_once(':') else { 456 return Err(http::StatusCode::BAD_REQUEST); 457 }; 458 let path = format!(".{path}"); 459 460 let order = if query.reverse { 461 Order::OldestToNewest 462 } else { 463 Order::NewestToOldest 464 }; 465 466 let paged = store 467 .get_links( 468 &query.subject, 469 collection, 470 &path, 471 order, 472 limit, 473 until, 474 &filter_dids, 475 ) 476 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 477 478 let cursor = paged.next.map(|next| { 479 ApiCursor { 480 version: paged.version, 481 next, 482 } 483 .into() 484 }); 485 486 Ok(acceptable( 487 accept, 488 GetBacklinksResponse { 489 total: paged.total, 490 records: paged.items, 491 cursor, 492 query: (*query).clone(), 493 collection: collection.to_string(), 494 path, 495 }, 496 )) 497} 498 499#[derive(Clone, Deserialize)] 500struct GetLinkItemsQuery { 501 target: String, 502 collection: String, 503 path: String, 504 cursor: Option<OpaqueApiCursor>, 505 /// Filter links only from these DIDs 506 /// 507 /// include multiple times to filter by multiple source DIDs 508 #[serde(default)] 509 did: Vec<String>, 510 /// [deprecated] Filter links only from these DIDs 511 /// 512 /// format: comma-separated sequence of DIDs 513 /// 514 /// errors: if `did` parameter is also present 515 /// 516 /// deprecated: use `did`, which can be repeated multiple times 517 from_dids: Option<String>, // comma separated: gross 518 #[serde(default = "get_default_cursor_limit")] 519 limit: u64, 520} 521#[derive(Template, Serialize)] 522#[template(path = "links.html.j2")] 523struct GetLinkItemsResponse { 524 // what does staleness mean? 525 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 526 // - links have been deleted. hmm. 527 total: u64, 528 linking_records: Vec<RecordId>, 529 cursor: Option<OpaqueApiCursor>, 530 #[serde(skip_serializing)] 531 query: GetLinkItemsQuery, 532} 533fn get_links( 534 accept: ExtractAccept, 535 query: axum_extra::extract::Query<GetLinkItemsQuery>, // supports multiple param occurrences 536 store: impl LinkReader, 537) -> Result<impl IntoResponse, http::StatusCode> { 538 let until = query 539 .cursor 540 .clone() 541 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 542 .transpose()? 543 .map(|c| c.next); 544 545 let limit = query.limit; 546 if limit > DEFAULT_CURSOR_LIMIT_MAX { 547 return Err(http::StatusCode::BAD_REQUEST); 548 } 549 550 let mut filter_dids: HashSet<Did> = HashSet::from_iter( 551 query 552 .did 553 .iter() 554 .map(|d| d.trim()) 555 .filter(|d| !d.is_empty()) 556 .map(|d| Did(d.to_string())), 557 ); 558 559 if let Some(comma_joined) = &query.from_dids { 560 if !filter_dids.is_empty() { 561 return Err(http::StatusCode::BAD_REQUEST); 562 } 563 for did in comma_joined.split(',') { 564 filter_dids.insert(Did(did.to_string())); 565 } 566 } 567 568 let paged = store 569 .get_links( 570 &query.target, 571 &query.collection, 572 &query.path, 573 Order::NewestToOldest, 574 limit, 575 until, 576 &filter_dids, 577 ) 578 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 579 580 let cursor = paged.next.map(|next| { 581 ApiCursor { 582 version: paged.version, 583 next, 584 } 585 .into() 586 }); 587 588 Ok(acceptable( 589 accept, 590 GetLinkItemsResponse { 591 total: paged.total, 592 linking_records: paged.items, 593 cursor, 594 query: (*query).clone(), 595 }, 596 )) 597} 598 599#[derive(Clone, Deserialize)] 600struct GetDidItemsQuery { 601 target: String, 602 collection: String, 603 path: String, 604 cursor: Option<OpaqueApiCursor>, 605 limit: Option<u64>, 606 // TODO: allow reverse (er, forward) order as well 607} 608#[derive(Template, Serialize)] 609#[template(path = "dids.html.j2")] 610struct GetDidItemsResponse { 611 // what does staleness mean? 612 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 613 // - links have been deleted. hmm. 614 total: u64, 615 linking_dids: Vec<Did>, 616 cursor: Option<OpaqueApiCursor>, 617 #[serde(skip_serializing)] 618 query: GetDidItemsQuery, 619} 620fn get_distinct_dids( 621 accept: ExtractAccept, 622 query: Query<GetDidItemsQuery>, 623 store: impl LinkReader, 624) -> Result<impl IntoResponse, http::StatusCode> { 625 let until = query 626 .cursor 627 .clone() 628 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 629 .transpose()? 630 .map(|c| c.next); 631 632 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT); 633 if limit > DEFAULT_CURSOR_LIMIT_MAX { 634 return Err(http::StatusCode::BAD_REQUEST); 635 } 636 637 let paged = store 638 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until) 639 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 640 641 let cursor = paged.next.map(|next| { 642 ApiCursor { 643 version: paged.version, 644 next, 645 } 646 .into() 647 }); 648 649 Ok(acceptable( 650 accept, 651 GetDidItemsResponse { 652 total: paged.total, 653 linking_dids: paged.items, 654 cursor, 655 query: (*query).clone(), 656 }, 657 )) 658} 659 660#[derive(Clone, Deserialize)] 661struct GetAllLinksQuery { 662 target: String, 663} 664#[derive(Template, Serialize)] 665#[template(path = "links-all-count.html.j2")] 666struct GetAllLinksResponse { 667 links: HashMap<String, HashMap<String, u64>>, 668 #[serde(skip_serializing)] 669 query: GetAllLinksQuery, 670} 671fn count_all_links( 672 accept: ExtractAccept, 673 query: Query<GetAllLinksQuery>, 674 store: impl LinkReader, 675) -> Result<impl IntoResponse, http::StatusCode> { 676 let links = store 677 .get_all_record_counts(&query.target) 678 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 679 Ok(acceptable( 680 accept, 681 GetAllLinksResponse { 682 links, 683 query: (*query).clone(), 684 }, 685 )) 686} 687 688#[derive(Clone, Deserialize)] 689struct ExploreLinksQuery { 690 target: String, 691} 692#[derive(Template, Serialize)] 693#[template(path = "explore-links.html.j2")] 694struct ExploreLinksResponse { 695 links: HashMap<String, HashMap<String, CountsByCount>>, 696 #[serde(skip_serializing)] 697 query: ExploreLinksQuery, 698} 699fn explore_links( 700 accept: ExtractAccept, 701 query: Query<ExploreLinksQuery>, 702 store: impl LinkReader, 703) -> Result<impl IntoResponse, http::StatusCode> { 704 let links = store 705 .get_all_counts(&query.target) 706 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 707 Ok(acceptable( 708 accept, 709 ExploreLinksResponse { 710 links, 711 query: (*query).clone(), 712 }, 713 )) 714} 715 716#[serde_as] 717#[derive(Clone, Serialize, Deserialize)] // for json 718struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>); 719 720#[derive(Serialize, Deserialize)] // for bincode 721struct ApiCursor { 722 version: (u64, u64), // (collection length, deleted item count) 723 next: u64, 724} 725 726impl TryFrom<OpaqueApiCursor> for ApiCursor { 727 type Error = bincode::Error; 728 729 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 730 bincode::DefaultOptions::new().deserialize(&item.0) 731 } 732} 733 734impl From<ApiCursor> for OpaqueApiCursor { 735 fn from(item: ApiCursor) -> Self { 736 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 737 } 738} 739 740#[derive(Serialize, Deserialize)] // for bincode 741struct ApiKeyedCursor { 742 next: String, // the key 743} 744 745impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor { 746 type Error = bincode::Error; 747 748 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 749 bincode::DefaultOptions::new().deserialize(&item.0) 750 } 751} 752 753impl From<ApiKeyedCursor> for OpaqueApiCursor { 754 fn from(item: ApiKeyedCursor) -> Self { 755 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 756 } 757}