Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Misc small fixups #13

merged opened by bad-example.com targeting main from metrics
  • throw in some metrics including a basic request counter
  • normalize handles to lowercase (should probably be fixed in atrium)
  • refresh handles and DIDs on firehose identity events
  • add aliases for blue.microcosm xrpc nsids to eventually replace the com.bad-example ones
Labels

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3mdvukaalbx22
+133 -38
Diff #0
+41 -25
slingshot/src/consumer.rs
··· 1 - use crate::CachedRecord; 2 1 use crate::error::ConsumerError; 2 + use crate::{CachedRecord, Identity, IdentityKey}; 3 3 use foyer::HybridCache; 4 4 use jetstream::{ 5 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 11 jetstream_endpoint: String, 12 12 cursor: Option<Cursor>, 13 13 no_zstd: bool, 14 + identity: Identity, 14 15 shutdown: CancellationToken, 15 16 cache: HybridCache<String, CachedRecord>, 16 17 ) -> Result<(), ConsumerError> { ··· 46 47 break; 47 48 }; 48 49 49 - if event.kind != EventKind::Commit { 50 - continue; 51 - } 52 - let Some(ref mut commit) = event.commit else { 53 - log::warn!("consumer: commit event missing commit data, ignoring"); 54 - continue; 55 - }; 50 + match event.kind { 51 + EventKind::Commit => { 52 + let Some(ref mut commit) = event.commit else { 53 + log::warn!("consumer: commit event missing commit data, ignoring"); 54 + continue; 55 + }; 56 56 57 - // TODO: something a bit more robust 58 - let at_uri = format!( 59 - "at://{}/{}/{}", 60 - &*event.did, &*commit.collection, &*commit.rkey 61 - ); 57 + // TODO: something a bit more robust 58 + let at_uri = format!( 59 + "at://{}/{}/{}", 60 + &*event.did, &*commit.collection, &*commit.rkey 61 + ); 62 62 63 - if commit.operation == CommitOp::Delete { 64 - cache.insert(at_uri, CachedRecord::Deleted); 65 - } else { 66 - let Some(record) = commit.record.take() else { 67 - log::warn!("consumer: commit insert or update missing record, ignoring"); 68 - continue; 69 - }; 70 - let Some(cid) = commit.cid.take() else { 71 - log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 - continue; 73 - }; 63 + if commit.operation == CommitOp::Delete { 64 + cache.insert(at_uri, CachedRecord::Deleted); 65 + } else { 66 + let Some(record) = commit.record.take() else { 67 + log::warn!("consumer: commit insert or update missing record, ignoring"); 68 + continue; 69 + }; 70 + let Some(cid) = commit.cid.take() else { 71 + log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 + continue; 73 + }; 74 74 75 - cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 75 + cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 + } 77 + } 78 + EventKind::Identity => { 79 + let Some(ident) = event.identity else { 80 + log::warn!("consumer: identity event missing identity data, ignoring"); 81 + continue; 82 + }; 83 + if let Some(handle) = ident.handle { 84 + metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1); 85 + identity.queue_refresh(IdentityKey::Handle(handle)).await; 86 + } 87 + metrics::counter!("identity_did_refresh_queued", "reason" => "identity event") 88 + .increment(1); 89 + identity.queue_refresh(IdentityKey::Did(ident.did)).await; 90 + } 91 + EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete) 76 92 } 77 93 } 78 94
+20 -6
slingshot/src/identity.rs
··· 38 38 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 39 39 40 40 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 - enum IdentityKey { 41 + pub enum IdentityKey { 42 42 Handle(Handle), 43 43 Did(Did), 44 44 } ··· 115 115 let Some(maybe_handle) = aka.strip_prefix("at://") else { 116 116 continue; 117 117 }; 118 - let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 118 + let Ok(valid_handle) = Handle::new(maybe_handle.to_lowercase()) else { 119 119 continue; 120 120 }; 121 121 unverified_handle = Some(valid_handle); ··· 186 186 /// multi-producer *single consumer* queue 187 187 refresh_queue: Arc<Mutex<RefreshQueue>>, 188 188 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 - refresher: Arc<Mutex<()>>, 189 + refresher_task: Arc<Mutex<()>>, 190 190 } 191 191 192 192 impl Identity { ··· 225 225 did_resolver: Arc::new(did_resolver), 226 226 cache, 227 227 refresh_queue: Default::default(), 228 - refresher: Default::default(), 228 + refresher_task: Default::default(), 229 229 }) 230 230 } 231 231 ··· 293 293 } 294 294 IdentityData::NotFound => { 295 295 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 296 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 296 297 self.queue_refresh(key).await; 297 298 } 298 299 Ok(None) 299 300 } 300 301 IdentityData::Did(did) => { 301 302 if (now - *last_fetch) >= MIN_TTL { 303 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 302 304 self.queue_refresh(key).await; 303 305 } 304 306 Ok(Some(did.clone())) ··· 347 349 } 348 350 IdentityData::NotFound => { 349 351 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 352 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 350 353 self.queue_refresh(key).await; 351 354 } 352 355 Ok(None) 353 356 } 354 357 IdentityData::Doc(mini_did) => { 355 358 if (now - *last_fetch) >= MIN_TTL { 359 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 356 360 self.queue_refresh(key).await; 357 361 } 358 362 Ok(Some(mini_did.clone())) ··· 363 367 /// put a refresh task on the queue 364 368 /// 365 369 /// this can be safely called from multiple concurrent tasks 366 - async fn queue_refresh(&self, key: IdentityKey) { 370 + pub async fn queue_refresh(&self, key: IdentityKey) { 367 371 // todo: max queue size 368 372 let mut q = self.refresh_queue.lock().await; 369 373 if !q.items.contains(&key) { ··· 440 444 /// run the refresh queue consumer 441 445 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 442 446 let _guard = self 443 - .refresher 447 + .refresher_task 444 448 .try_lock() 445 449 .expect("there to only be one refresher running"); 446 450 loop { ··· 462 466 log::trace!("refreshing handle {handle:?}"); 463 467 match self.handle_resolver.resolve(handle).await { 464 468 Ok(did) => { 469 + metrics::counter!("identity_handle_refresh", "success" => "true") 470 + .increment(1); 465 471 self.cache.insert( 466 472 task_key.clone(), 467 473 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 468 474 ); 469 475 } 470 476 Err(atrium_identity::Error::NotFound) => { 477 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1); 471 478 self.cache.insert( 472 479 task_key.clone(), 473 480 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 474 481 ); 475 482 } 476 483 Err(err) => { 484 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1); 477 485 log::warn!( 478 486 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 479 487 ); ··· 488 496 Ok(did_doc) => { 489 497 // TODO: fix in atrium: should verify id is did 490 498 if did_doc.id != did.to_string() { 499 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 491 500 log::warn!( 492 501 "refreshed did doc failed: wrong did doc id. dropping refresh." 493 502 ); ··· 496 505 let mini_doc = match did_doc.try_into() { 497 506 Ok(md) => md, 498 507 Err(e) => { 508 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 499 509 log::warn!( 500 510 "converting mini doc failed: {e:?}. dropping refresh." 501 511 ); 502 512 continue; 503 513 } 504 514 }; 515 + metrics::counter!("identity_did_refresh", "success" => "true") 516 + .increment(1); 505 517 self.cache.insert( 506 518 task_key.clone(), 507 519 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 508 520 ); 509 521 } 510 522 Err(atrium_identity::Error::NotFound) => { 523 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1); 511 524 self.cache.insert( 512 525 task_key.clone(), 513 526 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 514 527 ); 515 528 } 516 529 Err(err) => { 530 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1); 517 531 log::warn!( 518 532 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 519 533 );
+1 -1
slingshot/src/lib.rs
··· 9 9 pub use consumer::consume; 10 10 pub use firehose_cache::firehose_cache; 11 11 pub use healthcheck::healthcheck; 12 - pub use identity::Identity; 12 + pub use identity::{Identity, IdentityKey}; 13 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 14 pub use server::serve;
+4 -1
slingshot/src/main.rs
··· 154 154 155 155 let repo = Repo::new(identity.clone()); 156 156 157 + let identity_for_server = identity.clone(); 157 158 let server_shutdown = shutdown.clone(); 158 159 let server_cache_handle = cache.clone(); 159 160 let bind = args.bind; 160 161 tasks.spawn(async move { 161 162 serve( 162 163 server_cache_handle, 163 - identity, 164 + identity_for_server, 164 165 repo, 165 166 args.acme_domain, 166 167 args.acme_contact, ··· 173 174 Ok(()) 174 175 }); 175 176 177 + let identity_refreshable = identity.clone(); 176 178 let consumer_shutdown = shutdown.clone(); 177 179 let consumer_cache = cache.clone(); 178 180 tasks.spawn(async move { ··· 180 182 args.jetstream, 181 183 None, 182 184 args.jetstream_no_zstd, 185 + identity_refreshable, 183 186 consumer_shutdown, 184 187 consumer_cache, 185 188 )
+1 -1
slingshot/src/record.rs
··· 83 83 pub fn new(identity: Identity) -> Self { 84 84 let client = Client::builder() 85 85 .user_agent(format!( 86 - "microcosm slingshot v{} (dev: @bad-example.com)", 86 + "microcosm slingshot v{} (contact: @bad-example.com)", 87 87 env!("CARGO_PKG_VERSION") 88 88 )) 89 89 .no_proxy()
+66 -4
slingshot/src/server.rs
··· 12 12 use tokio_util::sync::CancellationToken; 13 13 14 14 use poem::{ 15 - Endpoint, EndpointExt, Route, Server, 15 + Endpoint, EndpointExt, IntoResponse, Route, Server, 16 16 endpoint::{StaticFileEndpoint, make_sync}, 17 17 http::Method, 18 18 listener::{ ··· 288 288 self.get_record_impl(repo, collection, rkey, cid).await 289 289 } 290 290 291 + /// blue.microcosm.repo.getRecordByUri 292 + /// 293 + /// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name 294 + #[oai( 295 + path = "/blue.microcosm.repo.getRecordByUri", 296 + method = "get", 297 + tag = "ApiTags::Custom" 298 + )] 299 + async fn get_record_by_uri( 300 + &self, 301 + /// The at-uri of the record 302 + /// 303 + /// The identifier can be a DID or an atproto handle, and the collection 304 + /// and rkey segments must be present. 305 + #[oai(example = "example_uri")] 306 + Query(at_uri): Query<String>, 307 + /// Optional: the CID of the version of the record. 308 + /// 309 + /// If not specified, then return the most recent version. 310 + /// 311 + /// > [!tip] 312 + /// > If specified and a newer version of the record exists, returns 404 not 313 + /// > found. That is: slingshot only retains the most recent version of a 314 + /// > record. 315 + Query(cid): Query<Option<String>>, 316 + ) -> GetRecordResponse { 317 + self.get_uri_record(Query(at_uri), Query(cid)).await 318 + } 319 + 291 320 /// com.bad-example.repo.getUriRecord 292 321 /// 293 322 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) ··· 375 404 #[oai(example = "example_handle")] 376 405 Query(handle): Query<String>, 377 406 ) -> JustDidResponse { 378 - let Ok(handle) = Handle::new(handle) else { 407 + let Ok(handle) = Handle::new(handle.to_lowercase()) else { 379 408 return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle")); 380 409 }; 381 410 ··· 413 442 })) 414 443 } 415 444 445 + /// blue.microcosm.identity.resolveMiniDoc 446 + /// 447 + /// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name 448 + #[oai( 449 + path = "/blue.microcosm.identity.resolveMiniDoc", 450 + method = "get", 451 + tag = "ApiTags::Custom" 452 + )] 453 + async fn resolve_mini_doc( 454 + &self, 455 + /// Handle or DID to resolve 456 + #[oai(example = "example_handle")] 457 + Query(identifier): Query<String>, 458 + ) -> ResolveMiniIDResponse { 459 + self.resolve_mini_id(Query(identifier)).await 460 + } 461 + 416 462 /// com.bad-example.identity.resolveMiniDoc 417 463 /// 418 464 /// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity) ··· 436 482 let did = match Did::new(identifier.clone()) { 437 483 Ok(did) => did, 438 484 Err(_) => { 439 - let Ok(alleged_handle) = Handle::new(identifier) else { 485 + let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 440 486 return invalid("Identifier was not a valid DID or handle"); 441 487 }; 442 488 ··· 513 559 let did = match Did::new(repo.clone()) { 514 560 Ok(did) => did, 515 561 Err(_) => { 516 - let Ok(handle) = Handle::new(repo) else { 562 + let Ok(handle) = Handle::new(repo.to_lowercase()) else { 517 563 return GetRecordResponse::BadRequest(xrpc_error( 518 564 "InvalidRequest", 519 565 "Repo was not a valid DID or handle", ··· 772 818 .allow_credentials(false), 773 819 ) 774 820 .with(CatchPanic::new()) 821 + .around(request_counter) 775 822 .with(Tracing); 823 + 776 824 Server::new(listener) 777 825 .name("slingshot") 778 826 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 780 828 .map_err(ServerError::ServerExited) 781 829 .inspect(|()| log::info!("server ended. goodbye.")) 782 830 } 831 + 832 + async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> { 833 + let t0 = std::time::Instant::now(); 834 + let method = req.method().to_string(); 835 + let path = req.uri().path().to_string(); 836 + let res = next.call(req).await?.into_response(); 837 + metrics::histogram!( 838 + "server_request", 839 + "endpoint" => format!("{method} {path}"), 840 + "status" => res.status().to_string(), 841 + ) 842 + .record(t0.elapsed()); 843 + Ok(res) 844 + }

History

1 round 0 comments
sign up or login to add to the discussion
bad-example.com submitted #0
3 commits
expand
add some metrics
add blue.microcosm xrpc aliases
normalize handles to lowercase
expand 0 comments
pull request successfully merged