Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Misc small fixups #13

merged opened by bad-example.com targeting main from metrics
  • throw in some metrics including a basic request counter
  • normalize handles to lowercase (should probably be fixed in atrium)
  • refresh handles and DIDs on firehose identity events
  • add aliases for blue.microcosm xrpc nsids to eventually replace the com.bad-example ones
Labels

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3mdvukaalbx22
+133 -38
Diff #0
+41 -25
slingshot/src/consumer.rs
··· 1 - use crate::CachedRecord; 2 use crate::error::ConsumerError; 3 use foyer::HybridCache; 4 use jetstream::{ 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 jetstream_endpoint: String, 12 cursor: Option<Cursor>, 13 no_zstd: bool, 14 shutdown: CancellationToken, 15 cache: HybridCache<String, CachedRecord>, 16 ) -> Result<(), ConsumerError> { ··· 46 break; 47 }; 48 49 - if event.kind != EventKind::Commit { 50 - continue; 51 - } 52 - let Some(ref mut commit) = event.commit else { 53 - log::warn!("consumer: commit event missing commit data, ignoring"); 54 - continue; 55 - }; 56 57 - // TODO: something a bit more robust 58 - let at_uri = format!( 59 - "at://{}/{}/{}", 60 - &*event.did, &*commit.collection, &*commit.rkey 61 - ); 62 63 - if commit.operation == CommitOp::Delete { 64 - cache.insert(at_uri, CachedRecord::Deleted); 65 - } else { 66 - let Some(record) = commit.record.take() else { 67 - log::warn!("consumer: commit insert or update missing record, ignoring"); 68 - continue; 69 - }; 70 - let Some(cid) = commit.cid.take() else { 71 - log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 - continue; 73 - }; 74 75 - cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 } 77 } 78
··· 1 use crate::error::ConsumerError; 2 + use crate::{CachedRecord, Identity, IdentityKey}; 3 use foyer::HybridCache; 4 use jetstream::{ 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 jetstream_endpoint: String, 12 cursor: Option<Cursor>, 13 no_zstd: bool, 14 + identity: Identity, 15 shutdown: CancellationToken, 16 cache: HybridCache<String, CachedRecord>, 17 ) -> Result<(), ConsumerError> { ··· 47 break; 48 }; 49 50 + match event.kind { 51 + EventKind::Commit => { 52 + let Some(ref mut commit) = event.commit else { 53 + log::warn!("consumer: commit event missing commit data, ignoring"); 54 + continue; 55 + }; 56 57 + // TODO: something a bit more robust 58 + let at_uri = format!( 59 + "at://{}/{}/{}", 60 + &*event.did, &*commit.collection, &*commit.rkey 61 + ); 62 63 + if commit.operation == CommitOp::Delete { 64 + cache.insert(at_uri, CachedRecord::Deleted); 65 + } else { 66 + let Some(record) = commit.record.take() else { 67 + log::warn!("consumer: commit insert or update missing record, ignoring"); 68 + continue; 69 + }; 70 + let Some(cid) = commit.cid.take() else { 71 + log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 + continue; 73 + }; 74 75 + cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 + } 77 + } 78 + EventKind::Identity => { 79 + let Some(ident) = event.identity else { 80 + log::warn!("consumer: identity event missing identity data, ignoring"); 81 + continue; 82 + }; 83 + if let Some(handle) = ident.handle { 84 + metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1); 85 + identity.queue_refresh(IdentityKey::Handle(handle)).await; 86 + } 87 + metrics::counter!("identity_did_refresh_queued", "reason" => "identity event") 88 + .increment(1); 89 + identity.queue_refresh(IdentityKey::Did(ident.did)).await; 90 + } 91 + EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete) 92 } 93 } 94
+20 -6
slingshot/src/identity.rs
··· 38 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 39 40 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 - enum IdentityKey { 42 Handle(Handle), 43 Did(Did), 44 } ··· 115 let Some(maybe_handle) = aka.strip_prefix("at://") else { 116 continue; 117 }; 118 - let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 119 continue; 120 }; 121 unverified_handle = Some(valid_handle); ··· 186 /// multi-producer *single consumer* queue 187 refresh_queue: Arc<Mutex<RefreshQueue>>, 188 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 - refresher: Arc<Mutex<()>>, 190 } 191 192 impl Identity { ··· 225 did_resolver: Arc::new(did_resolver), 226 cache, 227 refresh_queue: Default::default(), 228 - refresher: Default::default(), 229 }) 230 } 231 ··· 293 } 294 IdentityData::NotFound => { 295 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 296 self.queue_refresh(key).await; 297 } 298 Ok(None) 299 } 300 IdentityData::Did(did) => { 301 if (now - *last_fetch) >= MIN_TTL { 302 self.queue_refresh(key).await; 303 } 304 Ok(Some(did.clone())) ··· 347 } 348 IdentityData::NotFound => { 349 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 350 self.queue_refresh(key).await; 351 } 352 Ok(None) 353 } 354 IdentityData::Doc(mini_did) => { 355 if (now - *last_fetch) >= MIN_TTL { 356 self.queue_refresh(key).await; 357 } 358 Ok(Some(mini_did.clone())) ··· 363 /// put a refresh task on the queue 364 /// 365 /// this can be safely called from multiple concurrent tasks 366 - async fn queue_refresh(&self, key: IdentityKey) { 367 // todo: max queue size 368 let mut q = self.refresh_queue.lock().await; 369 if !q.items.contains(&key) { ··· 440 /// run the refresh queue consumer 441 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 442 let _guard = self 443 - .refresher 444 .try_lock() 445 .expect("there to only be one refresher running"); 446 loop { ··· 462 log::trace!("refreshing handle {handle:?}"); 463 match self.handle_resolver.resolve(handle).await { 464 Ok(did) => { 465 self.cache.insert( 466 task_key.clone(), 467 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 468 ); 469 } 470 Err(atrium_identity::Error::NotFound) => { 471 self.cache.insert( 472 task_key.clone(), 473 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 474 ); 475 } 476 Err(err) => { 477 log::warn!( 478 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 479 ); ··· 488 Ok(did_doc) => { 489 // TODO: fix in atrium: should verify id is did 490 if did_doc.id != did.to_string() { 491 log::warn!( 492 "refreshed did doc failed: wrong did doc id. dropping refresh." 493 ); ··· 496 let mini_doc = match did_doc.try_into() { 497 Ok(md) => md, 498 Err(e) => { 499 log::warn!( 500 "converting mini doc failed: {e:?}. dropping refresh." 501 ); 502 continue; 503 } 504 }; 505 self.cache.insert( 506 task_key.clone(), 507 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 508 ); 509 } 510 Err(atrium_identity::Error::NotFound) => { 511 self.cache.insert( 512 task_key.clone(), 513 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 514 ); 515 } 516 Err(err) => { 517 log::warn!( 518 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 519 );
··· 38 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 39 40 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 + pub enum IdentityKey { 42 Handle(Handle), 43 Did(Did), 44 } ··· 115 let Some(maybe_handle) = aka.strip_prefix("at://") else { 116 continue; 117 }; 118 + let Ok(valid_handle) = Handle::new(maybe_handle.to_lowercase()) else { 119 continue; 120 }; 121 unverified_handle = Some(valid_handle); ··· 186 /// multi-producer *single consumer* queue 187 refresh_queue: Arc<Mutex<RefreshQueue>>, 188 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 + refresher_task: Arc<Mutex<()>>, 190 } 191 192 impl Identity { ··· 225 did_resolver: Arc::new(did_resolver), 226 cache, 227 refresh_queue: Default::default(), 228 + refresher_task: Default::default(), 229 }) 230 } 231 ··· 293 } 294 IdentityData::NotFound => { 295 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 296 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 297 self.queue_refresh(key).await; 298 } 299 Ok(None) 300 } 301 IdentityData::Did(did) => { 302 if (now - *last_fetch) >= MIN_TTL { 303 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 304 self.queue_refresh(key).await; 305 } 306 Ok(Some(did.clone())) ··· 349 } 350 IdentityData::NotFound => { 351 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 352 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 353 self.queue_refresh(key).await; 354 } 355 Ok(None) 356 } 357 IdentityData::Doc(mini_did) => { 358 if (now - *last_fetch) >= MIN_TTL { 359 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 360 self.queue_refresh(key).await; 361 } 362 Ok(Some(mini_did.clone())) ··· 367 /// put a refresh task on the queue 368 /// 369 /// this can be safely called from multiple concurrent tasks 370 + pub async fn queue_refresh(&self, key: IdentityKey) { 371 // todo: max queue size 372 let mut q = self.refresh_queue.lock().await; 373 if !q.items.contains(&key) { ··· 444 /// run the refresh queue consumer 445 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 446 let _guard = self 447 + .refresher_task 448 .try_lock() 449 .expect("there to only be one refresher running"); 450 loop { ··· 466 log::trace!("refreshing handle {handle:?}"); 467 match self.handle_resolver.resolve(handle).await { 468 Ok(did) => { 469 + metrics::counter!("identity_handle_refresh", "success" => "true") 470 + .increment(1); 471 self.cache.insert( 472 task_key.clone(), 473 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 474 ); 475 } 476 Err(atrium_identity::Error::NotFound) => { 477 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1); 478 self.cache.insert( 479 task_key.clone(), 480 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 481 ); 482 } 483 Err(err) => { 484 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1); 485 log::warn!( 486 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 487 ); ··· 496 Ok(did_doc) => { 497 // TODO: fix in atrium: should verify id is did 498 if did_doc.id != did.to_string() { 499 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 500 log::warn!( 501 "refreshed did doc failed: wrong did doc id. dropping refresh." 502 ); ··· 505 let mini_doc = match did_doc.try_into() { 506 Ok(md) => md, 507 Err(e) => { 508 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 509 log::warn!( 510 "converting mini doc failed: {e:?}. dropping refresh." 511 ); 512 continue; 513 } 514 }; 515 + metrics::counter!("identity_did_refresh", "success" => "true") 516 + .increment(1); 517 self.cache.insert( 518 task_key.clone(), 519 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 520 ); 521 } 522 Err(atrium_identity::Error::NotFound) => { 523 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1); 524 self.cache.insert( 525 task_key.clone(), 526 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 527 ); 528 } 529 Err(err) => { 530 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1); 531 log::warn!( 532 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 533 );
+1 -1
slingshot/src/lib.rs
··· 9 pub use consumer::consume; 10 pub use firehose_cache::firehose_cache; 11 pub use healthcheck::healthcheck; 12 - pub use identity::Identity; 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 pub use server::serve;
··· 9 pub use consumer::consume; 10 pub use firehose_cache::firehose_cache; 11 pub use healthcheck::healthcheck; 12 + pub use identity::{Identity, IdentityKey}; 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 pub use server::serve;
+4 -1
slingshot/src/main.rs
··· 154 155 let repo = Repo::new(identity.clone()); 156 157 let server_shutdown = shutdown.clone(); 158 let server_cache_handle = cache.clone(); 159 let bind = args.bind; 160 tasks.spawn(async move { 161 serve( 162 server_cache_handle, 163 - identity, 164 repo, 165 args.acme_domain, 166 args.acme_contact, ··· 173 Ok(()) 174 }); 175 176 let consumer_shutdown = shutdown.clone(); 177 let consumer_cache = cache.clone(); 178 tasks.spawn(async move { ··· 180 args.jetstream, 181 None, 182 args.jetstream_no_zstd, 183 consumer_shutdown, 184 consumer_cache, 185 )
··· 154 155 let repo = Repo::new(identity.clone()); 156 157 + let identity_for_server = identity.clone(); 158 let server_shutdown = shutdown.clone(); 159 let server_cache_handle = cache.clone(); 160 let bind = args.bind; 161 tasks.spawn(async move { 162 serve( 163 server_cache_handle, 164 + identity_for_server, 165 repo, 166 args.acme_domain, 167 args.acme_contact, ··· 174 Ok(()) 175 }); 176 177 + let identity_refreshable = identity.clone(); 178 let consumer_shutdown = shutdown.clone(); 179 let consumer_cache = cache.clone(); 180 tasks.spawn(async move { ··· 182 args.jetstream, 183 None, 184 args.jetstream_no_zstd, 185 + identity_refreshable, 186 consumer_shutdown, 187 consumer_cache, 188 )
+1 -1
slingshot/src/record.rs
··· 83 pub fn new(identity: Identity) -> Self { 84 let client = Client::builder() 85 .user_agent(format!( 86 - "microcosm slingshot v{} (dev: @bad-example.com)", 87 env!("CARGO_PKG_VERSION") 88 )) 89 .no_proxy()
··· 83 pub fn new(identity: Identity) -> Self { 84 let client = Client::builder() 85 .user_agent(format!( 86 + "microcosm slingshot v{} (contact: @bad-example.com)", 87 env!("CARGO_PKG_VERSION") 88 )) 89 .no_proxy()
+66 -4
slingshot/src/server.rs
··· 12 use tokio_util::sync::CancellationToken; 13 14 use poem::{ 15 - Endpoint, EndpointExt, Route, Server, 16 endpoint::{StaticFileEndpoint, make_sync}, 17 http::Method, 18 listener::{ ··· 288 self.get_record_impl(repo, collection, rkey, cid).await 289 } 290 291 /// com.bad-example.repo.getUriRecord 292 /// 293 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) ··· 375 #[oai(example = "example_handle")] 376 Query(handle): Query<String>, 377 ) -> JustDidResponse { 378 - let Ok(handle) = Handle::new(handle) else { 379 return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle")); 380 }; 381 ··· 413 })) 414 } 415 416 /// com.bad-example.identity.resolveMiniDoc 417 /// 418 /// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity) ··· 436 let did = match Did::new(identifier.clone()) { 437 Ok(did) => did, 438 Err(_) => { 439 - let Ok(alleged_handle) = Handle::new(identifier) else { 440 return invalid("Identifier was not a valid DID or handle"); 441 }; 442 ··· 513 let did = match Did::new(repo.clone()) { 514 Ok(did) => did, 515 Err(_) => { 516 - let Ok(handle) = Handle::new(repo) else { 517 return GetRecordResponse::BadRequest(xrpc_error( 518 "InvalidRequest", 519 "Repo was not a valid DID or handle", ··· 772 .allow_credentials(false), 773 ) 774 .with(CatchPanic::new()) 775 .with(Tracing); 776 Server::new(listener) 777 .name("slingshot") 778 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 780 .map_err(ServerError::ServerExited) 781 .inspect(|()| log::info!("server ended. goodbye.")) 782 }
··· 12 use tokio_util::sync::CancellationToken; 13 14 use poem::{ 15 + Endpoint, EndpointExt, IntoResponse, Route, Server, 16 endpoint::{StaticFileEndpoint, make_sync}, 17 http::Method, 18 listener::{ ··· 288 self.get_record_impl(repo, collection, rkey, cid).await 289 } 290 291 + /// blue.microcosm.repo.getRecordByUri 292 + /// 293 + /// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name 294 + #[oai( 295 + path = "/blue.microcosm.repo.getRecordByUri", 296 + method = "get", 297 + tag = "ApiTags::Custom" 298 + )] 299 + async fn get_record_by_uri( 300 + &self, 301 + /// The at-uri of the record 302 + /// 303 + /// The identifier can be a DID or an atproto handle, and the collection 304 + /// and rkey segments must be present. 305 + #[oai(example = "example_uri")] 306 + Query(at_uri): Query<String>, 307 + /// Optional: the CID of the version of the record. 308 + /// 309 + /// If not specified, then return the most recent version. 310 + /// 311 + /// > [!tip] 312 + /// > If specified and a newer version of the record exists, returns 404 not 313 + /// > found. That is: slingshot only retains the most recent version of a 314 + /// > record. 315 + Query(cid): Query<Option<String>>, 316 + ) -> GetRecordResponse { 317 + self.get_uri_record(Query(at_uri), Query(cid)).await 318 + } 319 + 320 /// com.bad-example.repo.getUriRecord 321 /// 322 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) ··· 404 #[oai(example = "example_handle")] 405 Query(handle): Query<String>, 406 ) -> JustDidResponse { 407 + let Ok(handle) = Handle::new(handle.to_lowercase()) else { 408 return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle")); 409 }; 410 ··· 442 })) 443 } 444 445 + /// blue.microcosm.identity.resolveMiniDoc 446 + /// 447 + /// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name 448 + #[oai( 449 + path = "/blue.microcosm.identity.resolveMiniDoc", 450 + method = "get", 451 + tag = "ApiTags::Custom" 452 + )] 453 + async fn resolve_mini_doc( 454 + &self, 455 + /// Handle or DID to resolve 456 + #[oai(example = "example_handle")] 457 + Query(identifier): Query<String>, 458 + ) -> ResolveMiniIDResponse { 459 + self.resolve_mini_id(Query(identifier)).await 460 + } 461 + 462 /// com.bad-example.identity.resolveMiniDoc 463 /// 464 /// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity) ··· 482 let did = match Did::new(identifier.clone()) { 483 Ok(did) => did, 484 Err(_) => { 485 + let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 486 return invalid("Identifier was not a valid DID or handle"); 487 }; 488 ··· 559 let did = match Did::new(repo.clone()) { 560 Ok(did) => did, 561 Err(_) => { 562 + let Ok(handle) = Handle::new(repo.to_lowercase()) else { 563 return GetRecordResponse::BadRequest(xrpc_error( 564 "InvalidRequest", 565 "Repo was not a valid DID or handle", ··· 818 .allow_credentials(false), 819 ) 820 .with(CatchPanic::new()) 821 + .around(request_counter) 822 .with(Tracing); 823 + 824 Server::new(listener) 825 .name("slingshot") 826 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 828 .map_err(ServerError::ServerExited) 829 .inspect(|()| log::info!("server ended. goodbye.")) 830 } 831 + 832 + async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> { 833 + let t0 = std::time::Instant::now(); 834 + let method = req.method().to_string(); 835 + let path = req.uri().path().to_string(); 836 + let res = next.call(req).await?.into_response(); 837 + metrics::histogram!( 838 + "server_request", 839 + "endpoint" => format!("{method} {path}"), 840 + "status" => res.status().to_string(), 841 + ) 842 + .record(t0.elapsed()); 843 + Ok(res) 844 + }

History

1 round 0 comments
sign up or login to add to the discussion
bad-example.com submitted #0
3 commits
expand
add some metrics
add blue.microcosm xrpc aliases
normalize handles to lowercase
expand 0 comments
pull request successfully merged