Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

stats by prefix

with children by type (subsequent prefixes or full NSIDs)

+464 -17
+12 -5
ufos/src/db_types.rs
··· 18 18 pub enum EncodingError { 19 19 #[error("failed to parse Atrium string type: {0}")] 20 20 BadAtriumStringType(&'static str), 21 + #[error("Not enough NSID segments for a usable prefix")] 22 + NotEnoughNsidSegments, 21 23 #[error("failed to bincode-encode: {0}")] 22 24 BincodeEncodeFailed(#[from] EncodeError), 23 25 #[error("failed to bincode-decode: {0}")] ··· 62 64 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> 63 65 where 64 66 Self: Sized; 67 + fn as_prefix_range_end(&self) -> EncodingResult<Vec<u8>> { 68 + let bytes = self.to_db_bytes()?; 69 + let (_, Bound::Excluded(range_end)) = prefix_to_range(&bytes) else { 70 + return Err(EncodingError::BadRangeBound); 71 + }; 72 + Ok(range_end.to_vec()) 73 + } 65 74 } 66 75 67 76 pub trait SubPrefixBytes<T> { ··· 87 96 self.prefix.to_db_bytes() 88 97 } 89 98 pub fn prefix_range_end(prefix: &P) -> EncodingResult<Vec<u8>> { 90 - let prefix_bytes = prefix.to_db_bytes()?; 91 - let (_, Bound::Excluded(range_end)) = prefix_to_range(&prefix_bytes) else { 92 - return Err(EncodingError::BadRangeBound); 93 - }; 94 - Ok(range_end.to_vec()) 99 + prefix.as_prefix_range_end() 95 100 } 96 101 pub fn range_end(&self) -> EncodingResult<Vec<u8>> { 97 102 Self::prefix_range_end(&self.prefix) ··· 241 246 242 247 impl<const N: usize> UseBincodePlz for [u8; N] {} 243 248 249 + // bare bytes (NOT prefix-encoded!) 244 250 impl DbBytes for Vec<u8> { 245 251 fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 246 252 Ok(self.to_vec()) 247 253 } 254 + // greedy, consumes ALL remaining bytes 248 255 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 249 256 Ok((bytes.to_owned(), bytes.len())) 250 257 }
+41
ufos/src/lib.rs
··· 8 8 pub mod storage_fjall; 9 9 pub mod store_types; 10 10 11 + use crate::db_types::{EncodingError, EncodingResult}; 11 12 use crate::error::BatchInsertError; 12 13 use crate::store_types::SketchSecretPrefix; 13 14 use cardinality_estimator_safe::{Element, Sketch}; ··· 280 281 pub struct NsidCount { 281 282 nsid: String, 282 283 creates: u64, 284 + // TODO: add updates and deletes 283 285 dids_estimate: u64, 286 + } 287 + 288 + #[derive(Debug, Serialize, JsonSchema)] 289 + pub struct PrefixCount { 290 + prefix: String, 291 + creates: u64, 292 + // TODO: add updates and deletes 293 + dids_estimate: u64, 294 + } 295 + 296 + #[derive(Debug, Serialize, JsonSchema)] 297 + #[serde(tag = "type", rename_all = "camelCase")] 298 + pub enum PrefixChild { 299 + Collection(NsidCount), 300 + Prefix(PrefixCount), 301 + } 302 + 303 + #[derive(Debug, Serialize, JsonSchema)] 304 + pub struct NsidPrefix(String); 305 + impl NsidPrefix { 306 + pub fn new(pre: &str) -> EncodingResult<Self> { 307 + // it's a valid prefix if appending `.name` makes it a valid NSID 308 + Nsid::new(format!("{pre}.name")).map_err(EncodingError::BadAtriumStringType)?; 309 + // hack (shouldn't really be here): reject prefixes that aren't at least 2 segments long 310 + if !pre.contains('.') { 311 + return Err(EncodingError::NotEnoughNsidSegments); 312 + } 313 + Ok(Self(pre.to_string())) 314 + } 315 + pub fn is_group_of(&self, other: &Nsid) -> bool { 316 + assert!( 317 + other.as_str().starts_with(&self.0), 318 + "must be a prefix of other" 319 + ); 320 + self.0 == other.domain_authority() 321 + } 322 + pub fn as_str(&self) -> &str { 323 + self.0.as_str() 324 + } 284 325 } 285 326 286 327 #[derive(Debug, Serialize, JsonSchema)]
+135 -1
ufos/src/server/mod.rs
··· 4 4 use crate::index_html::INDEX_HTML; 5 5 use crate::storage::StoreReader; 6 6 use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor}; 7 - use crate::{ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord}; 7 + use crate::{ 8 + ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, 9 + UFOsRecord, 10 + }; 8 11 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; 9 12 use chrono::{DateTime, Utc}; 10 13 use collections_query::MultiCollectionQuery; ··· 379 382 .into() 380 383 } 381 384 385 + #[derive(Debug, Serialize, JsonSchema)] 386 + struct PrefixResponse { 387 + /// Note that total may not include counts beyond the current page (TODO) 388 + total: JustCount, 389 + children: Vec<PrefixChild>, 390 + /// Include in a follow-up request to get the next page of results, if more are available 391 + cursor: Option<String>, 392 + } 393 + #[derive(Debug, Deserialize, JsonSchema)] 394 + struct PrefixQuery { 395 + /// 396 + /// The final segment of a collection NSID is the `name`, and everything before it is called its `group`. eg: 397 + /// 398 + /// - `app.bsky.feed.post` and `app.bsky.feed.like` are both in the _lexicon group_ "`app.bsky.feed`". 399 + /// 400 + prefix: String, 401 + /// The maximum number of collections to return in one request. 402 + /// 403 + /// The number of items actually returned may be less than the limit. If paginating, this does **not** indicate that no 404 + /// more items are available! Check if the `cursor` in the response is `null` to determine the end of items. 405 + /// 406 + /// Default: `100` normally, `32` if `order` is specified. 407 + #[schemars(range(min = 1, max = 200))] 408 + limit: Option<usize>, 409 + /// Get a paginated response with more collections. 410 + /// 411 + /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request. 412 + /// 413 + /// `cursor` is mutually exclusive with `order`. 414 + cursor: Option<String>, 415 + /// Limit collections and statistics to those seen after this UTC datetime 416 + /// 417 + /// Default: all-time 418 + since: Option<DateTime<Utc>>, 419 + /// Limit collections and statistics to those seen before this UTC datetime 420 + /// 421 + /// Default: now 422 + until: Option<DateTime<Utc>>, 423 + /// Get a limited, sorted list 424 + /// 425 + /// Mutually exclusive with `cursor` -- sorted results cannot be paged. 426 + order: Option<CollectionsQueryOrder>, 427 + } 428 + /// Prefix-filter collections list 429 + /// 430 + /// This endpoint enumerates all collection NSIDs for a lexicon group. 431 + /// 432 + /// ## To fetch a full list: 433 + /// 434 + /// Omit the `order` parameter and page through the results using the `cursor`. There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all. 435 + /// 436 + /// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot: 437 + /// 438 + /// - all collection NSIDs observed before the first request will be included in the results 439 + /// 440 + /// - *new* NSIDs observed in the firehose *while paging* might be included or excluded from the final set 441 + /// 442 + /// - no duplicate NSIDs will occur in the combined results 443 + /// 444 + /// In practice this is close enough for most use-cases to not worry about. 445 + /// 446 + /// ## To fetch the top collection NSIDs: 447 + /// 448 + /// Specify the `order` parameter (must be either `records-created` or `did-estimate`). Note that ordered results cannot be paged. 449 + /// 450 + /// All statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour. 451 + #[endpoint { 452 + method = GET, 453 + path = "/prefix" 454 + }] 455 + async fn get_prefix( 456 + ctx: RequestContext<Context>, 457 + query: Query<PrefixQuery>, 458 + ) -> OkCorsResponse<PrefixResponse> { 459 + let Context { storage, .. } = ctx.context(); 460 + let q = query.into_inner(); 461 + 462 + let prefix = NsidPrefix::new(&q.prefix).map_err(|e| { 463 + HttpError::for_bad_request( 464 + None, 465 + format!("{:?} was not a valid NSID prefix: {e:?}", q.prefix), 466 + ) 467 + })?; 468 + 469 + if q.cursor.is_some() && q.order.is_some() { 470 + let msg = "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 471 + return Err(HttpError::for_bad_request(None, msg.to_string())); 472 + } 473 + 474 + let order = if let Some(ref o) = q.order { 475 + o.into() 476 + } else { 477 + let cursor = q 478 + .cursor 479 + .and_then(|c| if c.is_empty() { None } else { Some(c) }) 480 + .map(|c| URL_SAFE_NO_PAD.decode(&c)) 481 + .transpose() 482 + .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 483 + OrderCollectionsBy::Lexi { cursor } 484 + }; 485 + 486 + let limit = match (q.limit, q.order) { 487 + (Some(limit), _) => limit, 488 + (None, Some(_)) => 32, 489 + (None, None) => 100, 490 + }; 491 + 492 + if !(1..=200).contains(&limit) { 493 + let msg = format!("limit not in 1..=200: {}", limit); 494 + return Err(HttpError::for_bad_request(None, msg)); 495 + } 496 + 497 + let since = q.since.map(dt_to_cursor).transpose()?; 498 + let until = q.until.map(dt_to_cursor).transpose()?; 499 + 500 + let (total, children, next_cursor) = storage 501 + .get_prefix(prefix, limit, order, since, until) 502 + .await 503 + .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 504 + 505 + let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 506 + 507 + OkCors(PrefixResponse { 508 + total, 509 + children, 510 + cursor: next_cursor, 511 + }) 512 + .into() 513 + } 514 + 382 515 #[derive(Debug, Deserialize, JsonSchema)] 383 516 struct CollectionTimeseriesQuery { 384 517 collection: String, // JsonSchema not implemented for Nsid :( ··· 471 604 api.register(get_records_by_collections).unwrap(); 472 605 api.register(get_collection_stats).unwrap(); 473 606 api.register(get_collections).unwrap(); 607 + api.register(get_prefix).unwrap(); 474 608 api.register(get_timeseries).unwrap(); 475 609 476 610 let context = Context {
+11 -2
ufos/src/storage.rs
··· 1 1 use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix}; 2 2 use crate::{ 3 - error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, 4 - OrderCollectionsBy, UFOsRecord, 3 + error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, NsidPrefix, 4 + OrderCollectionsBy, PrefixChild, UFOsRecord, 5 5 }; 6 6 use async_trait::async_trait; 7 7 use jetstream::exports::{Did, Nsid}; ··· 106 106 since: Option<HourTruncatedCursor>, 107 107 until: Option<HourTruncatedCursor>, 108 108 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>; 109 + 110 + async fn get_prefix( 111 + &self, 112 + prefix: NsidPrefix, 113 + limit: usize, 114 + order: OrderCollectionsBy, 115 + since: Option<HourTruncatedCursor>, 116 + until: Option<HourTruncatedCursor>, 117 + ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)>; 109 118 110 119 async fn get_timeseries( 111 120 &self,
+199 -3
ufos/src/storage_fjall.rs
··· 1 - use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr}; 1 + use crate::db_types::{ 2 + db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes, 3 + }; 2 4 use crate::error::StorageError; 3 5 use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 4 6 use crate::store_types::{ ··· 13 15 WEEK_IN_MICROS, 14 16 }; 15 17 use crate::{ 16 - nice_duration, CommitAction, ConsumerInfo, Did, EventBatch, JustCount, Nsid, NsidCount, 17 - OrderCollectionsBy, UFOsRecord, 18 + nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid, 19 + NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord, 18 20 }; 19 21 use async_trait::async_trait; 20 22 use fjall::{ ··· 655 657 } 656 658 } 657 659 660 + fn get_lexi_prefix( 661 + &self, 662 + snapshot: Snapshot, 663 + prefix: NsidPrefix, 664 + limit: usize, 665 + cursor: Option<Vec<u8>>, 666 + buckets: Vec<CursorBucket>, 667 + ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 668 + let prefix_sub = String::sub_prefix(prefix.as_str())?; 669 + let cursor_child = cursor 670 + .as_deref() 671 + .map(|encoded_bytes| { 672 + let decoded: String = db_complete(encoded_bytes)?; 673 + let as_sub_prefix = String::sub_prefix(&decoded)?; 674 + Ok::<_, EncodingError>(as_sub_prefix) 675 + }) 676 + .transpose()?; 677 + let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len()); 678 + for bucket in &buckets { 679 + let it: NsidCounter = match bucket { 680 + CursorBucket::Hour(t) => { 681 + let start = cursor_child 682 + .as_ref() 683 + .map(|child| HourlyRollupKey::after_nsid_prefix(*t, child)) 684 + .unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 685 + let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 686 + get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 687 + } 688 + CursorBucket::Week(t) => { 689 + let start = cursor_child 690 + .as_ref() 691 + .map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child)) 692 + .unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 693 + let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 694 + get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 695 + } 696 + CursorBucket::AllTime => { 697 + let start = cursor_child 698 + .as_ref() 699 + .map(|child| AllTimeRollupKey::after_nsid_prefix(child)) 700 + .unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?; 701 + let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?; 702 + get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 703 + } 704 + }; 705 + iters.push(it); 706 + } 707 + 708 + // with apologies 709 + let mut iters: Vec<_> = iters 710 + .into_iter() 711 + .map(|it| { 712 + it.map(|bla| bla.map(|(nsid, v)| (Child::from_prefix(&nsid, &prefix), v))) 713 + .peekable() 714 + }) 715 + .collect(); 716 + 717 + let mut items = Vec::new(); 718 + let mut prefix_count = CountsValue::default(); 719 + #[derive(Debug, Clone, PartialEq)] 720 + enum Child { 721 + FullNsid(String), 722 + ChildPrefix(String), 723 + } 724 + impl Child { 725 + fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Self { 726 + if prefix.is_group_of(nsid) { 727 + Child::FullNsid(nsid.to_string()) 728 + } else { 729 + let suffix = nsid 730 + .as_str() 731 + .strip_prefix(&format!("{}.", prefix.0)) 732 + .unwrap(); 733 + let (segment, _) = suffix.split_once('.').unwrap(); 734 + Child::ChildPrefix(format!("{}.{segment}", prefix.0)) 735 + } 736 + } 737 + fn is_before(&self, other: &Child) -> bool { 738 + match (self, other) { 739 + (Child::FullNsid(s), Child::ChildPrefix(o)) if s == o => true, 740 + (Child::ChildPrefix(s), Child::FullNsid(o)) if s == o => false, 741 + (Child::FullNsid(s), Child::FullNsid(o)) => s < o, 742 + (Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o, 743 + (Child::FullNsid(s), Child::ChildPrefix(o)) => s < o, 744 + (Child::ChildPrefix(s), Child::FullNsid(o)) => s < o, 745 + } 746 + } 747 + fn into_inner(self) -> String { 748 + match self { 749 + Child::FullNsid(s) => s, 750 + Child::ChildPrefix(s) => s, 751 + } 752 + } 753 + } 754 + let mut current_child: Option<Child> = None; 755 + for _ in 0..limit { 756 + // double-scan the iters for each element: this could be eliminated but we're starting simple. 757 + // first scan: find the lowest nsid 758 + // second scan: take + merge, and advance all iters with lowest nsid 759 + let mut lowest: Option<Child> = None; 760 + for iter in &mut iters { 761 + if let Some(bla) = iter.peek_mut() { 762 + let (child, _) = match bla { 763 + Ok(v) => v, 764 + Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 765 + }; 766 + 767 + lowest = match lowest { 768 + Some(ref current) if current.is_before(child) => lowest, 769 + _ => Some(child.clone()), 770 + }; 771 + } 772 + } 773 + current_child = lowest.clone(); 774 + let Some(child) = lowest else { break }; 775 + 776 + let mut merged = CountsValue::default(); 777 + for iter in &mut iters { 778 + // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 779 + while let Some(Ok((_, get_counts))) = 780 + iter.next_if(|v| v.as_ref().unwrap().0 == child) 781 + { 782 + let counts = get_counts()?; 783 + prefix_count.merge(&counts); 784 + merged.merge(&counts); 785 + } 786 + } 787 + items.push(match child { 788 + Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount { 789 + nsid, 790 + creates: merged.counts().creates, 791 + dids_estimate: merged.dids().estimate() as u64, 792 + }), 793 + Child::ChildPrefix(prefix) => PrefixChild::Prefix(PrefixCount { 794 + prefix, 795 + creates: merged.counts().creates, 796 + dids_estimate: merged.dids().estimate() as u64, 797 + }), 798 + }); 799 + } 800 + 801 + // TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up? 802 + // ....er the sketch is probably too big 803 + // TODO: this is probably buggy on child-type boundaries bleh 804 + let next_cursor = current_child 805 + .map(|s| s.into_inner().to_db_bytes()) 806 + .transpose()?; 807 + 808 + Ok(((&prefix_count).into(), items, next_cursor)) 809 + } 810 + 811 + fn get_prefix( 812 + &self, 813 + prefix: NsidPrefix, 814 + limit: usize, 815 + order: OrderCollectionsBy, 816 + since: Option<HourTruncatedCursor>, 817 + until: Option<HourTruncatedCursor>, 818 + ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 819 + let snapshot = self.rollups.snapshot(); 820 + let buckets = if let (None, None) = (since, until) { 821 + vec![CursorBucket::AllTime] 822 + } else { 823 + let mut lower = self.get_earliest_hour(Some(&snapshot))?; 824 + if let Some(specified) = since { 825 + if specified > lower { 826 + lower = specified; 827 + } 828 + } 829 + let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 830 + CursorBucket::buckets_spanning(lower, upper) 831 + }; 832 + match order { 833 + OrderCollectionsBy::Lexi { cursor } => { 834 + self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets) 835 + } 836 + _ => todo!(), 837 + } 838 + } 839 + 658 840 /// - step: output series time step, in seconds 659 841 fn get_timeseries( 660 842 &self, ··· 819 1001 let s = self.clone(); 820 1002 tokio::task::spawn_blocking(move || { 821 1003 FjallReader::get_collections(&s, limit, order, since, until) 1004 + }) 1005 + .await? 1006 + } 1007 + async fn get_prefix( 1008 + &self, 1009 + prefix: NsidPrefix, 1010 + limit: usize, 1011 + order: OrderCollectionsBy, 1012 + since: Option<HourTruncatedCursor>, 1013 + until: Option<HourTruncatedCursor>, 1014 + ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 1015 + let s = self.clone(); 1016 + tokio::task::spawn_blocking(move || { 1017 + FjallReader::get_prefix(&s, prefix, limit, order, since, until) 822 1018 }) 823 1019 .await? 824 1020 }
+66 -6
ufos/src/store_types.rs
··· 360 360 pub type HourlyRollupStaticPrefix = DbStaticStr<_HourlyRollupStaticStr>; 361 361 pub type HourlyRollupKeyHourPrefix = DbConcat<HourlyRollupStaticPrefix, HourTruncatedCursor>; 362 362 pub type HourlyRollupKey = DbConcat<HourlyRollupKeyHourPrefix, Nsid>; 363 + pub type HourlyRollupPre = DbConcat<HourlyRollupKeyHourPrefix, Vec<u8>>; // bit hack but 363 364 impl HourlyRollupKey { 364 365 pub fn new(cursor: HourTruncatedCursor, nsid: &Nsid) -> Self { 365 366 Self::from_pair( ··· 367 368 nsid.clone(), 368 369 ) 369 370 } 371 + pub fn new_nsid_prefix(cursor: HourTruncatedCursor, pre: &[u8]) -> HourlyRollupPre { 372 + HourlyRollupPre::from_pair( 373 + DbConcat::from_pair(Default::default(), cursor), 374 + pre.to_vec(), 375 + ) 376 + } 370 377 pub fn cursor(&self) -> HourTruncatedCursor { 371 378 self.prefix.suffix 372 379 } ··· 378 385 pub fn after_nsid(hour: HourTruncatedCursor, nsid: &Nsid) -> EncodingResult<Bound<Vec<u8>>> { 379 386 Ok(Bound::Excluded(Self::new(hour, nsid).to_db_bytes()?)) 380 387 } 388 + pub fn after_nsid_prefix( 389 + hour: HourTruncatedCursor, 390 + pre: &[u8], 391 + ) -> EncodingResult<Bound<Vec<u8>>> { 392 + Ok(Bound::Excluded( 393 + Self::new_nsid_prefix(hour, pre).to_db_bytes()?, 394 + )) 395 + } 381 396 pub fn end(hour: HourTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 382 397 let prefix = HourlyRollupKeyHourPrefix::from_pair(Default::default(), hour); 383 398 Ok(Bound::Excluded(Self::prefix_range_end(&prefix)?)) 384 399 } 400 + pub fn nsid_prefix_end( 401 + hour: HourTruncatedCursor, 402 + pre: &[u8], 403 + ) -> EncodingResult<Bound<Vec<u8>>> { 404 + Ok(Bound::Excluded( 405 + Self::new_nsid_prefix(hour, pre).as_prefix_range_end()?, 406 + )) 407 + } 385 408 } 386 409 impl WithCollection for HourlyRollupKey { 387 410 fn collection(&self) -> &Nsid { ··· 400 423 pub type WeeklyRollupStaticPrefix = DbStaticStr<_WeeklyRollupStaticStr>; 401 424 pub type WeeklyRollupKeyWeekPrefix = DbConcat<WeeklyRollupStaticPrefix, WeekTruncatedCursor>; 402 425 pub type WeeklyRollupKey = DbConcat<WeeklyRollupKeyWeekPrefix, Nsid>; 426 + pub type WeeklyRollupPre = DbConcat<WeeklyRollupKeyWeekPrefix, Vec<u8>>; 403 427 impl WeeklyRollupKey { 404 428 pub fn new(cursor: WeekTruncatedCursor, nsid: &Nsid) -> Self { 405 429 Self::from_pair( ··· 407 431 nsid.clone(), 408 432 ) 409 433 } 434 + pub fn new_nsid_prefix(cursor: WeekTruncatedCursor, pre: &[u8]) -> WeeklyRollupPre { 435 + WeeklyRollupPre::from_pair( 436 + DbConcat::from_pair(Default::default(), cursor), 437 + pre.to_vec(), 438 + ) 439 + } 410 440 pub fn cursor(&self) -> WeekTruncatedCursor { 411 441 self.prefix.suffix 412 442 } 413 - pub fn start(hour: WeekTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 414 - let prefix = WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), hour); 443 + pub fn start(week: WeekTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 444 + let prefix = WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), week); 415 445 let prefix_bytes = Self::from_prefix_to_db_bytes(&prefix)?; 416 446 Ok(Bound::Included(prefix_bytes)) 417 447 } 418 - pub fn after_nsid(hour: WeekTruncatedCursor, nsid: &Nsid) -> EncodingResult<Bound<Vec<u8>>> { 419 - Ok(Bound::Excluded(Self::new(hour, nsid).to_db_bytes()?)) 448 + pub fn after_nsid(week: WeekTruncatedCursor, nsid: &Nsid) -> EncodingResult<Bound<Vec<u8>>> { 449 + Ok(Bound::Excluded(Self::new(week, nsid).to_db_bytes()?)) 420 450 } 421 - pub fn end(hour: WeekTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 422 - let prefix = WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), hour); 451 + pub fn after_nsid_prefix( 452 + week: WeekTruncatedCursor, 453 + prefix: &[u8], 454 + ) -> EncodingResult<Bound<Vec<u8>>> { 455 + Ok(Bound::Excluded( 456 + Self::new_nsid_prefix(week, prefix).to_db_bytes()?, 457 + )) 458 + } 459 + pub fn end(week: WeekTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 460 + let prefix = WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), week); 423 461 Ok(Bound::Excluded(Self::prefix_range_end(&prefix)?)) 424 462 } 463 + pub fn nsid_prefix_end( 464 + week: WeekTruncatedCursor, 465 + prefix: &[u8], 466 + ) -> EncodingResult<Bound<Vec<u8>>> { 467 + Ok(Bound::Excluded( 468 + Self::new_nsid_prefix(week, prefix).as_prefix_range_end()?, 469 + )) 470 + } 425 471 } 426 472 impl WithCollection for WeeklyRollupKey { 427 473 fn collection(&self) -> &Nsid { ··· 439 485 static_str!("ever_counts", _AllTimeRollupStaticStr); 440 486 pub type AllTimeRollupStaticPrefix = DbStaticStr<_AllTimeRollupStaticStr>; 441 487 pub type AllTimeRollupKey = DbConcat<AllTimeRollupStaticPrefix, Nsid>; 488 + pub type AllTimeRollupPre = DbConcat<AllTimeRollupStaticPrefix, Vec<u8>>; 442 489 impl AllTimeRollupKey { 443 490 pub fn new(nsid: &Nsid) -> Self { 444 491 Self::from_pair(Default::default(), nsid.clone()) 445 492 } 493 + pub fn new_nsid_prefix(pre: &[u8]) -> AllTimeRollupPre { 494 + AllTimeRollupPre::from_pair(Default::default(), pre.to_vec()) 495 + } 446 496 pub fn start() -> EncodingResult<Bound<Vec<u8>>> { 447 497 Ok(Bound::Included(Self::from_prefix_to_db_bytes( 448 498 &Default::default(), ··· 451 501 pub fn after_nsid(nsid: &Nsid) -> EncodingResult<Bound<Vec<u8>>> { 452 502 Ok(Bound::Excluded(Self::new(nsid).to_db_bytes()?)) 453 503 } 504 + pub fn after_nsid_prefix(prefix: &[u8]) -> EncodingResult<Bound<Vec<u8>>> { 505 + Ok(Bound::Excluded( 506 + Self::new_nsid_prefix(prefix).to_db_bytes()?, 507 + )) 508 + } 454 509 pub fn end() -> EncodingResult<Bound<Vec<u8>>> { 455 510 Ok(Bound::Excluded( 456 511 Self::prefix_range_end(&Default::default())?, 512 + )) 513 + } 514 + pub fn nsid_prefix_end(prefix: &[u8]) -> EncodingResult<Bound<Vec<u8>>> { 515 + Ok(Bound::Excluded( 516 + Self::new_nsid_prefix(prefix).as_prefix_range_end()?, 457 517 )) 458 518 } 459 519 }