Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

timeseries endpoint

+200 -11
+6
ufos/src/lib.rs
··· 241 241 dids_estimate: u64, 242 242 } 243 243 244 + #[derive(Debug, Serialize, JsonSchema)] 245 + pub struct JustCount { 246 + records: u64, 247 + dids_estimate: u64, 248 + } 249 + 244 250 #[derive(Debug)] 245 251 pub enum OrderCollectionsBy { 246 252 Lexi { cursor: Option<Vec<u8>> },
+84 -5
ufos/src/server.rs
··· 1 1 use crate::index_html::INDEX_HTML; 2 2 use crate::storage::StoreReader; 3 3 use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor}; 4 - use crate::{ConsumerInfo, Cursor, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord}; 4 + use crate::{ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord}; 5 5 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; 6 6 use chrono::{DateTime, Utc}; 7 7 use dropshot::endpoint; ··· 280 280 /// Mutually exclusive with `cursor` -- sorted results cannot be paged. 281 281 order: Option<CollectionsQueryOrder>, 282 282 } 283 - #[endpoint { 284 - method = GET, 285 - path = "/collections" 286 - }] 283 + 287 284 /// Get collection with statistics 288 285 /// 289 286 /// ## To fetch a full list: ··· 305 302 /// Specify the `order` parameter (must be either `records-created` or `did-estimate`). Note that ordered results cannot be paged. 306 303 /// 307 304 /// All statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour. 305 + #[endpoint { 306 + method = GET, 307 + path = "/collections" 308 + }] 308 309 async fn get_collections( 309 310 ctx: RequestContext<Context>, 310 311 query: Query<CollectionsQuery>, ··· 356 357 }) 357 358 } 358 359 360 + #[derive(Debug, Deserialize, JsonSchema)] 361 + struct CollectionTimeseriesQuery { 362 + collection: String, // JsonSchema not implemented for Nsid :( 363 + /// Limit collections and statistics to those seen after this UTC datetime 364 + /// 365 + /// default: 1 week ago 366 + since: Option<DateTime<Utc>>, 367 + /// Limit collections and statistics to those seen before this UTC datetime 368 + /// 369 + /// default: now 370 + until: Option<DateTime<Utc>>, 371 + /// time steps between data, in seconds 372 + /// 373 + /// the step will be rounded down to the nearest hour 374 + /// 375 + /// default: 86400 (24hrs) 376 + #[schemars(range(min = 3600))] 377 + step: Option<u64>, 378 + // todo: rolling averages 379 + } 380 + #[derive(Debug, Serialize, JsonSchema)] 381 + struct CollectionTimeseriesResponse { 382 + range: Vec<DateTime<Utc>>, 383 + series: HashMap<String, Vec<JustCount>>, 384 + } 385 + /// Get timeseries data 386 + #[endpoint { 387 + method = GET, 388 + path = "/timeseries" 389 + }] 390 + async fn get_timeseries( 391 + ctx: RequestContext<Context>, 392 + query: Query<CollectionTimeseriesQuery>, 393 + ) -> OkCorsResponse<CollectionTimeseriesResponse> { 394 + let Context { storage, .. } = ctx.context(); 395 + let q = query.into_inner(); 396 + 397 + let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 398 + let week_ago_secs = 7 * 86_400; 399 + let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 400 + Cursor::at(week_ago).into() 401 + }); 402 + 403 + let until = q.until.map(dt_to_cursor).transpose()?; 404 + 405 + let step = if let Some(secs) = q.step { 406 + if secs < 3600 { 407 + let msg = format!("step is too small: {}", secs); 408 + return Err(HttpError::for_bad_request(None, msg)); 409 + } 410 + (secs / 3600) * 3600 // trucate to hour 411 + } else { 412 + 86_400 413 + }; 414 + 415 + let nsid = Nsid::new(q.collection).map_err(|e| { 416 + HttpError::for_bad_request(None, format!("collection was not a valid NSID: {:?}", e)) 417 + })?; 418 + 419 + let (range_cursors, series) = storage 420 + .get_timeseries(vec![nsid], since, until, step) 421 + .await 422 + .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 423 + 424 + let range = range_cursors 425 + .into_iter() 426 + .map(|c| DateTime::<Utc>::from_timestamp_micros(c.to_raw_u64() as i64).unwrap()) 427 + .collect(); 428 + 429 + let series = series 430 + .into_iter() 431 + .map(|(k, v)| (k.to_string(), v.iter().map(Into::into).collect())) 432 + .collect(); 433 + 434 + ok_cors(CollectionTimeseriesResponse { range, series }) 435 + } 436 + 359 437 pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 360 438 let log = ConfigLogging::StderrTerminal { 361 439 level: ConfigLoggingLevel::Info, ··· 371 449 api.register(get_records_by_collections).unwrap(); 372 450 api.register(get_records_total_seen).unwrap(); 373 451 api.register(get_collections).unwrap(); 452 + api.register(get_timeseries).unwrap(); 374 453 375 454 let context = Context { 376 455 spec: Arc::new(
+10 -2
ufos/src/storage.rs
··· 1 - use crate::store_types::{HourTruncatedCursor, SketchSecretPrefix}; 1 + use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix}; 2 2 use crate::{ 3 3 error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, OrderCollectionsBy, 4 4 UFOsRecord, 5 5 }; 6 6 use async_trait::async_trait; 7 7 use jetstream::exports::{Did, Nsid}; 8 - use std::collections::HashSet; 8 + use std::collections::{HashMap, HashSet}; 9 9 use std::path::Path; 10 10 use tokio::sync::mpsc::Receiver; 11 11 ··· 83 83 since: Option<HourTruncatedCursor>, 84 84 until: Option<HourTruncatedCursor>, 85 85 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>; 86 + 87 + async fn get_timeseries( 88 + &self, 89 + collections: Vec<Nsid>, 90 + since: HourTruncatedCursor, 91 + until: Option<HourTruncatedCursor>, 92 + step: u64, 93 + ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>; 86 94 87 95 async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>; 88 96
+77 -1
ufos/src/storage_fjall.rs
··· 9 9 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 10 10 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey, 11 11 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, 12 - WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, 12 + WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS, 13 + WEEK_IN_MICROS, 13 14 }; 14 15 use crate::{ 15 16 CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord, ··· 374 375 ))) 375 376 } 376 377 378 + type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>; 379 + 377 380 impl FjallReader { 378 381 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 379 382 let rollup_cursor = ··· 652 655 } 653 656 } 654 657 658 + /// - step: output series time step, in seconds 659 + fn get_timeseries( 660 + &self, 661 + collections: Vec<Nsid>, 662 + since: HourTruncatedCursor, 663 + until: Option<HourTruncatedCursor>, 664 + step: u64, 665 + ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 666 + if step > WEEK_IN_MICROS { 667 + panic!("week-stepping is todo"); 668 + } 669 + let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 670 + let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else { 671 + return Ok(( 672 + // empty: until < since 673 + vec![], 674 + collections.into_iter().map(|c| (c, vec![])).collect(), 675 + )); 676 + }; 677 + let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS; 678 + let mut counts_by_hour = Vec::with_capacity(n_hours as usize); 679 + let snapshot = self.rollups.snapshot(); 680 + for hour in (0..n_hours).map(|i| since.nth_next(i)) { 681 + let mut counts = Vec::with_capacity(collections.len()); 682 + for nsid in &collections { 683 + let count = snapshot 684 + .get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)? 685 + .as_deref() 686 + .map(db_complete::<CountsValue>) 687 + .transpose()? 688 + .unwrap_or_default(); 689 + counts.push(count); 690 + } 691 + counts_by_hour.push((hour, counts)); 692 + } 693 + 694 + let step_hours = step / (HOUR_IN_MICROS / 1_000_000); 695 + let mut output_hours = Vec::with_capacity(step_hours as usize); 696 + let mut output_series: CollectionSerieses = collections 697 + .iter() 698 + .map(|c| (c.clone(), Vec::with_capacity(step_hours as usize))) 699 + .collect(); 700 + 701 + for chunk in counts_by_hour.chunks(step_hours as usize) { 702 + output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk 703 + for (i, collection) in collections.iter().enumerate() { 704 + let mut c = CountsValue::default(); 705 + for (_, counts) in chunk { 706 + c.merge(&counts[i]); 707 + } 708 + output_series 709 + .get_mut(collection) 710 + .expect("output series is initialized with all collections") 711 + .push(c); 712 + } 713 + } 714 + 715 + Ok((output_hours, output_series)) 716 + } 717 + 655 718 fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 656 719 // 0. grab a snapshot in case rollups happen while we're working 657 720 let instant = self.keyspace.instant(); ··· 761 824 let s = self.clone(); 762 825 tokio::task::spawn_blocking(move || { 763 826 FjallReader::get_collections(&s, limit, order, since, until) 827 + }) 828 + .await? 829 + } 830 + async fn get_timeseries( 831 + &self, 832 + collections: Vec<Nsid>, 833 + since: HourTruncatedCursor, 834 + until: Option<HourTruncatedCursor>, 835 + step: u64, 836 + ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 837 + let s = self.clone(); 838 + tokio::task::spawn_blocking(move || { 839 + FjallReader::get_timeseries(&s, collections, since, until, step) 764 840 }) 765 841 .await? 766 842 }
+9
ufos/src/storage_mem.rs
··· 556 556 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 557 557 todo!() 558 558 } 559 + async fn get_timeseries( 560 + &self, 561 + _: Vec<Nsid>, 562 + _: HourTruncatedCursor, 563 + _: Option<HourTruncatedCursor>, 564 + _: u64, 565 + ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)> { 566 + todo!() 567 + } 559 568 async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 560 569 let s = self.clone(); 561 570 let collection = collection.clone();
+14 -3
ufos/src/store_types.rs
··· 2 2 DbBytes, DbConcat, DbStaticStr, EncodingError, EncodingResult, SerdeBytes, StaticStr, 3 3 UseBincodePlz, 4 4 }; 5 - use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit}; 5 + use crate::{Cursor, Did, JustCount, Nsid, PutAction, RecordKey, UFOsCommit}; 6 6 use bincode::{Decode, Encode}; 7 7 use cardinality_estimator_safe::Sketch; 8 8 use std::ops::{Bound, Range}; ··· 263 263 } 264 264 } 265 265 } 266 + impl From<&CountsValue> for JustCount { 267 + fn from(cv: &CountsValue) -> Self { 268 + Self { 269 + records: cv.records(), 270 + dids_estimate: cv.dids().estimate() as u64, 271 + } 272 + } 273 + } 266 274 267 275 static_str!("delete_acount", _DeleteAccountStaticStr); 268 276 pub type DeleteAccountStaticPrefix = DbStaticStr<_DeleteAccountStaticStr>; ··· 527 535 pub fn next(&self) -> Self { 528 536 Self(self.0 + MOD) 529 537 } 538 + pub fn nth_next(&self, n: u64) -> Self { 539 + Self(self.0 + (n * MOD)) 540 + } 530 541 pub fn prev(&self) -> Self { 531 542 if self.0 < MOD { 532 543 panic!("underflow: previous truncation start would be less than zero"); ··· 556 567 } 557 568 } 558 569 559 - const HOUR_IN_MICROS: u64 = 1_000_000 * 3600; 570 + pub const HOUR_IN_MICROS: u64 = 1_000_000 * 3600; 560 571 pub type HourTruncatedCursor = TruncatedCursor<HOUR_IN_MICROS>; 561 572 562 - const WEEK_IN_MICROS: u64 = HOUR_IN_MICROS * 24 * 7; 573 + pub const WEEK_IN_MICROS: u64 = HOUR_IN_MICROS * 24 * 7; 563 574 pub type WeekTruncatedCursor = TruncatedCursor<WEEK_IN_MICROS>; 564 575 565 576 #[derive(Debug, PartialEq)]