Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

add some metrics

+82 -33
+41 -25
slingshot/src/consumer.rs
··· 1 - use crate::CachedRecord; 2 use crate::error::ConsumerError; 3 use foyer::HybridCache; 4 use jetstream::{ 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 jetstream_endpoint: String, 12 cursor: Option<Cursor>, 13 no_zstd: bool, 14 shutdown: CancellationToken, 15 cache: HybridCache<String, CachedRecord>, 16 ) -> Result<(), ConsumerError> { ··· 46 break; 47 }; 48 49 - if event.kind != EventKind::Commit { 50 - continue; 51 - } 52 - let Some(ref mut commit) = event.commit else { 53 - log::warn!("consumer: commit event missing commit data, ignoring"); 54 - continue; 55 - }; 56 57 - // TODO: something a bit more robust 58 - let at_uri = format!( 59 - "at://{}/{}/{}", 60 - &*event.did, &*commit.collection, &*commit.rkey 61 - ); 62 63 - if commit.operation == CommitOp::Delete { 64 - cache.insert(at_uri, CachedRecord::Deleted); 65 - } else { 66 - let Some(record) = commit.record.take() else { 67 - log::warn!("consumer: commit insert or update missing record, ignoring"); 68 - continue; 69 - }; 70 - let Some(cid) = commit.cid.take() else { 71 - log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 - continue; 73 - }; 74 75 - cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 } 77 } 78
··· 1 use crate::error::ConsumerError; 2 + use crate::{CachedRecord, Identity, IdentityKey}; 3 use foyer::HybridCache; 4 use jetstream::{ 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 jetstream_endpoint: String, 12 cursor: Option<Cursor>, 13 no_zstd: bool, 14 + identity: Identity, 15 shutdown: CancellationToken, 16 cache: HybridCache<String, CachedRecord>, 17 ) -> Result<(), ConsumerError> { ··· 47 break; 48 }; 49 50 + match event.kind { 51 + EventKind::Commit => { 52 + let Some(ref mut commit) = event.commit else { 53 + log::warn!("consumer: commit event missing commit data, ignoring"); 54 + continue; 55 + }; 56 57 + // TODO: something a bit more robust 58 + let at_uri = format!( 59 + "at://{}/{}/{}", 60 + &*event.did, &*commit.collection, &*commit.rkey 61 + ); 62 63 + if commit.operation == CommitOp::Delete { 64 + cache.insert(at_uri, CachedRecord::Deleted); 65 + } else { 66 + let Some(record) = commit.record.take() else { 67 + log::warn!("consumer: commit insert or update missing record, ignoring"); 68 + continue; 69 + }; 70 + let Some(cid) = commit.cid.take() else { 71 + log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 + continue; 73 + }; 74 75 + cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 + } 77 + } 78 + EventKind::Identity => { 79 + let Some(ident) = event.identity else { 80 + log::warn!("consumer: identity event missing identity data, ignoring"); 81 + continue; 82 + }; 83 + if let Some(handle) = ident.handle { 84 + metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1); 85 + identity.queue_refresh(IdentityKey::Handle(handle)).await; 86 + } 87 + metrics::counter!("identity_did_refresh_queued", "reason" => "identity event") 88 + .increment(1); 89 + identity.queue_refresh(IdentityKey::Did(ident.did)).await; 90 + } 91 + EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete) 92 } 93 } 94
+19 -5
slingshot/src/identity.rs
··· 38 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 39 40 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 - enum IdentityKey { 42 Handle(Handle), 43 Did(Did), 44 } ··· 186 /// multi-producer *single consumer* queue 187 refresh_queue: Arc<Mutex<RefreshQueue>>, 188 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 - refresher: Arc<Mutex<()>>, 190 } 191 192 impl Identity { ··· 225 did_resolver: Arc::new(did_resolver), 226 cache, 227 refresh_queue: Default::default(), 228 - refresher: Default::default(), 229 }) 230 } 231 ··· 293 } 294 IdentityData::NotFound => { 295 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 296 self.queue_refresh(key).await; 297 } 298 Ok(None) 299 } 300 IdentityData::Did(did) => { 301 if (now - *last_fetch) >= MIN_TTL { 302 self.queue_refresh(key).await; 303 } 304 Ok(Some(did.clone())) ··· 347 } 348 IdentityData::NotFound => { 349 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 350 self.queue_refresh(key).await; 351 } 352 Ok(None) 353 } 354 IdentityData::Doc(mini_did) => { 355 if (now - *last_fetch) >= MIN_TTL { 356 self.queue_refresh(key).await; 357 } 358 Ok(Some(mini_did.clone())) ··· 363 /// put a refresh task on the queue 364 /// 365 /// this can be safely called from multiple concurrent tasks 366 - async fn queue_refresh(&self, key: IdentityKey) { 367 // todo: max queue size 368 let mut q = self.refresh_queue.lock().await; 369 if !q.items.contains(&key) { ··· 440 /// run the refresh queue consumer 441 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 442 let _guard = self 443 - .refresher 444 .try_lock() 445 .expect("there to only be one refresher running"); 446 loop { ··· 462 log::trace!("refreshing handle {handle:?}"); 463 match self.handle_resolver.resolve(handle).await { 464 Ok(did) => { 465 self.cache.insert( 466 task_key.clone(), 467 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 468 ); 469 } 470 Err(atrium_identity::Error::NotFound) => { 471 self.cache.insert( 472 task_key.clone(), 473 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 474 ); 475 } 476 Err(err) => { 477 log::warn!( 478 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 479 ); ··· 488 Ok(did_doc) => { 489 // TODO: fix in atrium: should verify id is did 490 if did_doc.id != did.to_string() { 491 log::warn!( 492 "refreshed did doc failed: wrong did doc id. dropping refresh." 493 ); ··· 496 let mini_doc = match did_doc.try_into() { 497 Ok(md) => md, 498 Err(e) => { 499 log::warn!( 500 "converting mini doc failed: {e:?}. dropping refresh." 501 ); 502 continue; 503 } 504 }; 505 self.cache.insert( 506 task_key.clone(), 507 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 508 ); 509 } 510 Err(atrium_identity::Error::NotFound) => { 511 self.cache.insert( 512 task_key.clone(), 513 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 514 ); 515 } 516 Err(err) => { 517 log::warn!( 518 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 519 );
··· 38 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 39 40 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 + pub enum IdentityKey { 42 Handle(Handle), 43 Did(Did), 44 } ··· 186 /// multi-producer *single consumer* queue 187 refresh_queue: Arc<Mutex<RefreshQueue>>, 188 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 + refresher_task: Arc<Mutex<()>>, 190 } 191 192 impl Identity { ··· 225 did_resolver: Arc::new(did_resolver), 226 cache, 227 refresh_queue: Default::default(), 228 + refresher_task: Default::default(), 229 }) 230 } 231 ··· 293 } 294 IdentityData::NotFound => { 295 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 296 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 297 self.queue_refresh(key).await; 298 } 299 Ok(None) 300 } 301 IdentityData::Did(did) => { 302 if (now - *last_fetch) >= MIN_TTL { 303 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 304 self.queue_refresh(key).await; 305 } 306 Ok(Some(did.clone())) ··· 349 } 350 IdentityData::NotFound => { 351 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 352 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 353 self.queue_refresh(key).await; 354 } 355 Ok(None) 356 } 357 IdentityData::Doc(mini_did) => { 358 if (now - *last_fetch) >= MIN_TTL { 359 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 360 self.queue_refresh(key).await; 361 } 362 Ok(Some(mini_did.clone())) ··· 367 /// put a refresh task on the queue 368 /// 369 /// this can be safely called from multiple concurrent tasks 370 + pub async fn queue_refresh(&self, key: IdentityKey) { 371 // todo: max queue size 372 let mut q = self.refresh_queue.lock().await; 373 if !q.items.contains(&key) { ··· 444 /// run the refresh queue consumer 445 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 446 let _guard = self 447 + .refresher_task 448 .try_lock() 449 .expect("there to only be one refresher running"); 450 loop { ··· 466 log::trace!("refreshing handle {handle:?}"); 467 match self.handle_resolver.resolve(handle).await { 468 Ok(did) => { 469 + metrics::counter!("identity_handle_refresh", "success" => "true") 470 + .increment(1); 471 self.cache.insert( 472 task_key.clone(), 473 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 474 ); 475 } 476 Err(atrium_identity::Error::NotFound) => { 477 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1); 478 self.cache.insert( 479 task_key.clone(), 480 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 481 ); 482 } 483 Err(err) => { 484 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1); 485 log::warn!( 486 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 487 ); ··· 496 Ok(did_doc) => { 497 // TODO: fix in atrium: should verify id is did 498 if did_doc.id != did.to_string() { 499 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 500 log::warn!( 501 "refreshed did doc failed: wrong did doc id. dropping refresh." 502 ); ··· 505 let mini_doc = match did_doc.try_into() { 506 Ok(md) => md, 507 Err(e) => { 508 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 509 log::warn!( 510 "converting mini doc failed: {e:?}. dropping refresh." 511 ); 512 continue; 513 } 514 }; 515 + metrics::counter!("identity_did_refresh", "success" => "true") 516 + .increment(1); 517 self.cache.insert( 518 task_key.clone(), 519 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 520 ); 521 } 522 Err(atrium_identity::Error::NotFound) => { 523 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1); 524 self.cache.insert( 525 task_key.clone(), 526 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 527 ); 528 } 529 Err(err) => { 530 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1); 531 log::warn!( 532 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 533 );
+1 -1
slingshot/src/lib.rs
··· 9 pub use consumer::consume; 10 pub use firehose_cache::firehose_cache; 11 pub use healthcheck::healthcheck; 12 - pub use identity::Identity; 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 pub use server::serve;
··· 9 pub use consumer::consume; 10 pub use firehose_cache::firehose_cache; 11 pub use healthcheck::healthcheck; 12 + pub use identity::{Identity, IdentityKey}; 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 pub use server::serve;
+4 -1
slingshot/src/main.rs
··· 154 155 let repo = Repo::new(identity.clone()); 156 157 let server_shutdown = shutdown.clone(); 158 let server_cache_handle = cache.clone(); 159 let bind = args.bind; 160 tasks.spawn(async move { 161 serve( 162 server_cache_handle, 163 - identity, 164 repo, 165 args.acme_domain, 166 args.acme_contact, ··· 173 Ok(()) 174 }); 175 176 let consumer_shutdown = shutdown.clone(); 177 let consumer_cache = cache.clone(); 178 tasks.spawn(async move { ··· 180 args.jetstream, 181 None, 182 args.jetstream_no_zstd, 183 consumer_shutdown, 184 consumer_cache, 185 )
··· 154 155 let repo = Repo::new(identity.clone()); 156 157 + let identity_for_server = identity.clone(); 158 let server_shutdown = shutdown.clone(); 159 let server_cache_handle = cache.clone(); 160 let bind = args.bind; 161 tasks.spawn(async move { 162 serve( 163 server_cache_handle, 164 + identity_for_server, 165 repo, 166 args.acme_domain, 167 args.acme_contact, ··· 174 Ok(()) 175 }); 176 177 + let identity_refreshable = identity.clone(); 178 let consumer_shutdown = shutdown.clone(); 179 let consumer_cache = cache.clone(); 180 tasks.spawn(async move { ··· 182 args.jetstream, 183 None, 184 args.jetstream_no_zstd, 185 + identity_refreshable, 186 consumer_shutdown, 187 consumer_cache, 188 )
+17 -1
slingshot/src/server.rs
··· 12 use tokio_util::sync::CancellationToken; 13 14 use poem::{ 15 - Endpoint, EndpointExt, Route, Server, 16 endpoint::{StaticFileEndpoint, make_sync}, 17 http::Method, 18 listener::{ ··· 772 .allow_credentials(false), 773 ) 774 .with(CatchPanic::new()) 775 .with(Tracing); 776 Server::new(listener) 777 .name("slingshot") 778 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 780 .map_err(ServerError::ServerExited) 781 .inspect(|()| log::info!("server ended. goodbye.")) 782 }
··· 12 use tokio_util::sync::CancellationToken; 13 14 use poem::{ 15 + Endpoint, EndpointExt, IntoResponse, Route, Server, 16 endpoint::{StaticFileEndpoint, make_sync}, 17 http::Method, 18 listener::{ ··· 772 .allow_credentials(false), 773 ) 774 .with(CatchPanic::new()) 775 + .around(request_counter) 776 .with(Tracing); 777 + 778 Server::new(listener) 779 .name("slingshot") 780 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 782 .map_err(ServerError::ServerExited) 783 .inspect(|()| log::info!("server ended. goodbye.")) 784 } 785 + 786 + async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> { 787 + let t0 = std::time::Instant::now(); 788 + let method = req.method().to_string(); 789 + let path = req.uri().path().to_string(); 790 + let res = next.call(req).await?.into_response(); 791 + metrics::histogram!( 792 + "server_request", 793 + "endpoint" => format!("{method} {path}"), 794 + "status" => res.status().to_string(), 795 + ) 796 + .record(t0.elapsed()); 797 + Ok(res) 798 + }