Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at order_query 1674 lines 55 kB view raw
1use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 2use anyhow::Result; 3use serde::{Deserialize, Serialize}; 4use std::collections::{HashMap, HashSet}; 5 6pub mod mem_store; 7pub use mem_store::MemStorage; 8 9#[cfg(feature = "rocks")] 10pub mod rocks_store; 11#[cfg(feature = "rocks")] 12pub use rocks_store::RocksStorage; 13 14/// Ordering for paginated link queries 15#[derive(Debug, Clone, Copy, PartialEq, Eq)] 16pub enum Order { 17 /// Newest links first (default) 18 NewestToOldest, 19 /// Oldest links first 20 OldestToNewest, 21} 22 23#[derive(Debug, Default, PartialEq)] 24pub struct PagedAppendingCollection<T> { 25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 26 pub items: Vec<T>, 27 pub next: Option<u64>, 28 pub total: u64, 29} 30 31impl<T> PagedAppendingCollection<T> { 32 pub(crate) fn empty() -> Self { 33 Self { 34 version: (0, 0), 35 items: Vec::new(), 36 next: None, 37 total: 0, 38 } 39 } 40} 41 42/// A paged collection whose keys are sorted instead of indexed 43/// 44/// this has weaker guarantees than PagedAppendingCollection: it might 45/// return a totally consistent snapshot. but it should avoid duplicates 46/// and each page should at least be internally consistent. 47#[derive(Debug, PartialEq)] 48pub struct PagedOrderedCollection<T, K: Ord> { 49 pub items: Vec<T>, 50 pub next: Option<K>, 51} 52 53impl<T, K: Ord> PagedOrderedCollection<T, K> { 54 pub(crate) fn empty() -> Self { 55 Self { 56 items: Vec::new(), 57 next: None, 58 } 59 } 60} 61 62#[derive(Debug, Deserialize, Serialize, PartialEq)] 63pub struct StorageStats { 64 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. 65 /// for example: new user A follows users B and C. this count will only increment by one, for A. 66 pub dids: u64, 67 68 /// estimate targets * distinct (collection, path)s to reference them. 69 /// distinct targets alone are currently challenging to estimate. 70 pub targetables: u64, 71 72 /// estimate of the count of atproto records seen that contain links. 73 /// records with multiple links are single-counted. 74 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it. 75 pub linking_records: u64, 76 77 /// first jetstream cursor when this instance first started 78 pub started_at: Option<u64>, 79 80 /// anything else we want to throw in 81 pub other_data: HashMap<String, u64>, 82} 83 84pub trait LinkStorage: Send + Sync { 85 /// jetstream cursor from last saved actions, if available 86 fn get_cursor(&mut self) -> Result<Option<u64>> { 87 Ok(None) 88 } 89 90 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>; 91 92 // readers are off from the writer instance 93 fn to_readable(&mut self) -> impl LinkReader; 94} 95 96pub trait LinkReader: Clone + Send + Sync + 'static { 97 #[allow(clippy::too_many_arguments)] 98 fn get_many_to_many_counts( 99 &self, 100 target: &str, 101 collection: &str, 102 path: &str, 103 path_to_other: &str, 104 limit: u64, 105 after: Option<String>, 106 filter_dids: &HashSet<Did>, 107 filter_to_targets: &HashSet<String>, 108 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>>; 109 110 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 111 112 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 113 114 #[allow(clippy::too_many_arguments)] 115 fn get_links( 116 &self, 117 target: &str, 118 collection: &str, 119 path: &str, 120 order: Order, 121 limit: u64, 122 until: Option<u64>, 123 filter_dids: &HashSet<Did>, 124 ) -> Result<PagedAppendingCollection<RecordId>>; 125 126 fn get_distinct_dids( 127 &self, 128 target: &str, 129 collection: &str, 130 path: &str, 131 limit: u64, 132 until: Option<u64>, 133 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor 134 135 fn get_all_record_counts(&self, _target: &str) 136 -> Result<HashMap<String, HashMap<String, u64>>>; 137 138 fn get_all_counts( 139 &self, 140 _target: &str, 141 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>; 142 143 /// assume all stats are estimates, since exact counts are very challenging for LSMs 144 fn get_stats(&self) -> Result<StorageStats>; 145} 146 147#[cfg(test)] 148mod tests { 149 use super::*; 150 use links::{CollectedLink, Link}; 151 use std::ops::RangeBounds; 152 153 macro_rules! test_each_storage { 154 ($test_name:ident, |$storage_label:ident| $test_code:block) => { 155 #[test] 156 fn $test_name() -> Result<()> { 157 { 158 println!("=> testing with memstorage backend"); 159 #[allow(unused_mut)] 160 let mut $storage_label = MemStorage::new(); 161 $test_code 162 } 163 164 #[cfg(feature = "rocks")] 165 { 166 println!("=> testing with rocksdb backend"); 167 let rocks_db_path = tempfile::tempdir()?; 168 #[allow(unused_mut)] 169 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?; 170 $test_code 171 } 172 173 Ok(()) 174 } 175 }; 176 } 177 178 fn assert_stats( 179 stats: StorageStats, 180 dids: impl RangeBounds<u64>, 181 targetables: impl RangeBounds<u64>, 182 linking_records: impl RangeBounds<u64>, 183 ) { 184 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) { 185 assert!( 186 rb.contains(&stat), 187 "{name:?}: {stat:?} not in range {:?}–{:?}", 188 rb.start_bound(), 189 rb.end_bound() 190 ); 191 } 192 check("dids", stats.dids, dids); 193 check("targetables", stats.targetables, targetables); 194 check("linking_records", stats.linking_records, linking_records); 195 } 196 197 test_each_storage!(test_empty, |storage| { 198 assert_eq!(storage.get_count("", "", "")?, 0); 199 assert_eq!(storage.get_count("a", "b", "c")?, 0); 200 assert_eq!( 201 storage.get_count( 202 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f", 203 "app.t.c", 204 ".reply.parent.uri" 205 )?, 206 0 207 ); 208 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0); 209 assert_eq!( 210 storage.get_links( 211 "a.com", 212 "app.t.c", 213 ".abc.uri", 214 Order::NewestToOldest, 215 100, 216 None, 217 &HashSet::default() 218 )?, 219 PagedAppendingCollection::empty() 220 ); 221 assert_eq!( 222 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 223 PagedAppendingCollection::empty() 224 ); 225 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 226 assert_eq!( 227 storage.get_all_record_counts("bad-example.com")?, 228 HashMap::new() 229 ); 230 231 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0); 232 }); 233 234 test_each_storage!(test_add_link, |storage| { 235 storage.push( 236 &ActionableEvent::CreateLinks { 237 record_id: RecordId { 238 did: "did:plc:asdf".into(), 239 collection: "app.t.c".into(), 240 rkey: "fdsa".into(), 241 }, 242 links: vec![CollectedLink { 243 target: Link::Uri("e.com".into()), 244 path: ".abc.uri".into(), 245 }], 246 }, 247 0, 248 )?; 249 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 250 assert_eq!( 251 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 252 1 253 ); 254 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0); 255 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0); 256 assert_eq!( 257 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?, 258 0 259 ); 260 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 261 }); 262 263 test_each_storage!(test_links, |storage| { 264 storage.push( 265 &ActionableEvent::CreateLinks { 266 record_id: RecordId { 267 did: "did:plc:asdf".into(), 268 collection: "app.t.c".into(), 269 rkey: "fdsa".into(), 270 }, 271 links: vec![CollectedLink { 272 target: Link::Uri("e.com".into()), 273 path: ".abc.uri".into(), 274 }], 275 }, 276 0, 277 )?; 278 279 // delete under the wrong collection 280 storage.push( 281 &ActionableEvent::DeleteRecord(RecordId { 282 did: "did:plc:asdf".into(), 283 collection: "app.test.wrongcollection".into(), 284 rkey: "fdsa".into(), 285 }), 286 0, 287 )?; 288 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 289 290 // delete under the wrong rkey 291 storage.push( 292 &ActionableEvent::DeleteRecord(RecordId { 293 did: "did:plc:asdf".into(), 294 collection: "app.t.c".into(), 295 rkey: "wrongkey".into(), 296 }), 297 0, 298 )?; 299 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 300 301 // finally actually delete it 302 storage.push( 303 &ActionableEvent::DeleteRecord(RecordId { 304 did: "did:plc:asdf".into(), 305 collection: "app.t.c".into(), 306 rkey: "fdsa".into(), 307 }), 308 0, 309 )?; 310 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 311 312 // put it back 313 storage.push( 314 &ActionableEvent::CreateLinks { 315 record_id: RecordId { 316 did: "did:plc:asdf".into(), 317 collection: "app.t.c".into(), 318 rkey: "fdsa".into(), 319 }, 320 links: vec![CollectedLink { 321 target: Link::Uri("e.com".into()), 322 path: ".abc.uri".into(), 323 }], 324 }, 325 0, 326 )?; 327 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 328 assert_eq!( 329 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 330 1 331 ); 332 333 // add another link from this user 334 storage.push( 335 &ActionableEvent::CreateLinks { 336 record_id: RecordId { 337 did: "did:plc:asdf".into(), 338 collection: "app.t.c".into(), 339 rkey: "fdsa2".into(), 340 }, 341 links: vec![CollectedLink { 342 target: Link::Uri("e.com".into()), 343 path: ".abc.uri".into(), 344 }], 345 }, 346 0, 347 )?; 348 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 349 assert_eq!( 350 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 351 1 352 ); 353 354 // add a link from someone else 355 storage.push( 356 &ActionableEvent::CreateLinks { 357 record_id: RecordId { 358 did: "did:plc:asdfasdf".into(), 359 collection: "app.t.c".into(), 360 rkey: "fdsa".into(), 361 }, 362 links: vec![CollectedLink { 363 target: Link::Uri("e.com".into()), 364 path: ".abc.uri".into(), 365 }], 366 }, 367 0, 368 )?; 369 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3); 370 assert_eq!( 371 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 372 2 373 ); 374 375 // aaaand delete the first one again 376 storage.push( 377 &ActionableEvent::DeleteRecord(RecordId { 378 did: "did:plc:asdf".into(), 379 collection: "app.t.c".into(), 380 rkey: "fdsa".into(), 381 }), 382 0, 383 )?; 384 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 385 assert_eq!( 386 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 387 2 388 ); 389 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2); 390 }); 391 392 test_each_storage!(test_two_user_links_delete_one, |storage| { 393 // create the first link 394 storage.push( 395 &ActionableEvent::CreateLinks { 396 record_id: RecordId { 397 did: "did:plc:asdf".into(), 398 collection: "app.t.c".into(), 399 rkey: "A".into(), 400 }, 401 links: vec![CollectedLink { 402 target: Link::Uri("e.com".into()), 403 path: ".abc.uri".into(), 404 }], 405 }, 406 0, 407 )?; 408 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 409 assert_eq!( 410 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 411 1 412 ); 413 414 // create the second link (same user, different rkey) 415 storage.push( 416 &ActionableEvent::CreateLinks { 417 record_id: RecordId { 418 did: "did:plc:asdf".into(), 419 collection: "app.t.c".into(), 420 rkey: "B".into(), 421 }, 422 links: vec![CollectedLink { 423 target: Link::Uri("e.com".into()), 424 path: ".abc.uri".into(), 425 }], 426 }, 427 0, 428 )?; 429 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 430 assert_eq!( 431 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 432 1 433 ); 434 435 // aaaand delete the first link 436 storage.push( 437 &ActionableEvent::DeleteRecord(RecordId { 438 did: "did:plc:asdf".into(), 439 collection: "app.t.c".into(), 440 rkey: "A".into(), 441 }), 442 0, 443 )?; 444 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 445 assert_eq!( 446 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 447 1 448 ); 449 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 450 }); 451 452 test_each_storage!(test_accounts, |storage| { 453 // create two links 454 storage.push( 455 &ActionableEvent::CreateLinks { 456 record_id: RecordId { 457 did: "did:plc:asdf".into(), 458 collection: "app.t.c".into(), 459 rkey: "A".into(), 460 }, 461 links: vec![CollectedLink { 462 target: Link::Uri("a.com".into()), 463 path: ".abc.uri".into(), 464 }], 465 }, 466 0, 467 )?; 468 storage.push( 469 &ActionableEvent::CreateLinks { 470 record_id: RecordId { 471 did: "did:plc:asdf".into(), 472 collection: "app.t.c".into(), 473 rkey: "B".into(), 474 }, 475 links: vec![CollectedLink { 476 target: Link::Uri("b.com".into()), 477 path: ".abc.uri".into(), 478 }], 479 }, 480 0, 481 )?; 482 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 483 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1); 484 485 // and a third from a different account 486 storage.push( 487 &ActionableEvent::CreateLinks { 488 record_id: RecordId { 489 did: "did:plc:fdsa".into(), 490 collection: "app.t.c".into(), 491 rkey: "A".into(), 492 }, 493 links: vec![CollectedLink { 494 target: Link::Uri("a.com".into()), 495 path: ".abc.uri".into(), 496 }], 497 }, 498 0, 499 )?; 500 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2); 501 502 // delete the first account 503 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?; 504 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 505 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0); 506 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1); 507 }); 508 509 test_each_storage!(multi_link, |storage| { 510 storage.push( 511 &ActionableEvent::CreateLinks { 512 record_id: RecordId { 513 did: "did:plc:asdf".into(), 514 collection: "app.t.c".into(), 515 rkey: "fdsa".into(), 516 }, 517 links: vec![ 518 CollectedLink { 519 target: Link::Uri("e.com".into()), 520 path: ".abc.uri".into(), 521 }, 522 CollectedLink { 523 target: Link::Uri("f.com".into()), 524 path: ".xyz[].uri".into(), 525 }, 526 CollectedLink { 527 target: Link::Uri("g.com".into()), 528 path: ".xyz[].uri".into(), 529 }, 530 ], 531 }, 532 0, 533 )?; 534 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 535 assert_eq!( 536 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 537 1 538 ); 539 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 540 assert_eq!( 541 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?, 542 1 543 ); 544 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 545 assert_eq!( 546 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?, 547 1 548 ); 549 550 storage.push( 551 &ActionableEvent::DeleteRecord(RecordId { 552 did: "did:plc:asdf".into(), 553 collection: "app.t.c".into(), 554 rkey: "fdsa".into(), 555 }), 556 0, 557 )?; 558 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 559 assert_eq!( 560 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 561 0 562 ); 563 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0); 564 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 565 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0); 566 }); 567 568 test_each_storage!(update_link, |storage| { 569 // create the links 570 storage.push( 571 &ActionableEvent::CreateLinks { 572 record_id: RecordId { 573 did: "did:plc:asdf".into(), 574 collection: "app.t.c".into(), 575 rkey: "fdsa".into(), 576 }, 577 links: vec![ 578 CollectedLink { 579 target: Link::Uri("e.com".into()), 580 path: ".abc.uri".into(), 581 }, 582 CollectedLink { 583 target: Link::Uri("f.com".into()), 584 path: ".xyz[].uri".into(), 585 }, 586 CollectedLink { 587 target: Link::Uri("g.com".into()), 588 path: ".xyz[].uri".into(), 589 }, 590 ], 591 }, 592 0, 593 )?; 594 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 595 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 596 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 597 598 // update them 599 storage.push( 600 &ActionableEvent::UpdateLinks { 601 record_id: RecordId { 602 did: "did:plc:asdf".into(), 603 collection: "app.t.c".into(), 604 rkey: "fdsa".into(), 605 }, 606 new_links: vec![ 607 CollectedLink { 608 target: Link::Uri("h.com".into()), 609 path: ".abc.uri".into(), 610 }, 611 CollectedLink { 612 target: Link::Uri("f.com".into()), 613 path: ".xyz[].uri".into(), 614 }, 615 CollectedLink { 616 target: Link::Uri("i.com".into()), 617 path: ".xyz[].uri".into(), 618 }, 619 ], 620 }, 621 0, 622 )?; 623 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 624 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1); 625 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 626 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 627 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1); 628 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1); 629 }); 630 631 test_each_storage!(update_no_links_to_links, |storage| { 632 // update without prior create (consumer would have filtered out the original) 633 storage.push( 634 &ActionableEvent::UpdateLinks { 635 record_id: RecordId { 636 did: "did:plc:asdf".into(), 637 collection: "app.t.c".into(), 638 rkey: "asdf".into(), 639 }, 640 new_links: vec![CollectedLink { 641 target: Link::Uri("a.com".into()), 642 path: ".abc.uri".into(), 643 }], 644 }, 645 0, 646 )?; 647 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 648 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 649 }); 650 651 test_each_storage!(delete_multi_link_same_target, |storage| { 652 storage.push( 653 &ActionableEvent::CreateLinks { 654 record_id: RecordId { 655 did: "did:plc:asdf".into(), 656 collection: "app.t.c".into(), 657 rkey: "asdf".into(), 658 }, 659 links: vec![ 660 CollectedLink { 661 target: Link::Uri("a.com".into()), 662 path: ".abc.uri".into(), 663 }, 664 CollectedLink { 665 target: Link::Uri("a.com".into()), 666 path: ".def.uri".into(), 667 }, 668 ], 669 }, 670 0, 671 )?; 672 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 673 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1); 674 675 storage.push( 676 &ActionableEvent::DeleteRecord(RecordId { 677 did: "did:plc:asdf".into(), 678 collection: "app.t.c".into(), 679 rkey: "asdf".into(), 680 }), 681 0, 682 )?; 683 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0); 684 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0); 685 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0); 686 }); 687 688 test_each_storage!(get_links_basic, |storage| { 689 storage.push( 690 &ActionableEvent::CreateLinks { 691 record_id: RecordId { 692 did: "did:plc:asdf".into(), 693 collection: "app.t.c".into(), 694 rkey: "asdf".into(), 695 }, 696 links: vec![CollectedLink { 697 target: Link::Uri("a.com".into()), 698 path: ".abc.uri".into(), 699 }], 700 }, 701 0, 702 )?; 703 assert_eq!( 704 storage.get_links( 705 "a.com", 706 "app.t.c", 707 ".abc.uri", 708 Order::NewestToOldest, 709 100, 710 None, 711 &HashSet::default() 712 )?, 713 PagedAppendingCollection { 714 version: (1, 0), 715 items: vec![RecordId { 716 did: "did:plc:asdf".into(), 717 collection: "app.t.c".into(), 718 rkey: "asdf".into(), 719 }], 720 next: None, 721 total: 1, 722 } 723 ); 724 assert_eq!( 725 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 726 PagedAppendingCollection { 727 version: (1, 0), 728 items: vec!["did:plc:asdf".into()], 729 next: None, 730 total: 1, 731 } 732 ); 733 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 734 }); 735 736 test_each_storage!(get_links_paged, |storage| { 737 for i in 1..=5 { 738 storage.push( 739 &ActionableEvent::CreateLinks { 740 record_id: RecordId { 741 did: format!("did:plc:asdf-{i}").into(), 742 collection: "app.t.c".into(), 743 rkey: "asdf".into(), 744 }, 745 links: vec![CollectedLink { 746 target: Link::Uri("a.com".into()), 747 path: ".abc.uri".into(), 748 }], 749 }, 750 0, 751 )?; 752 } 753 754 let sub = "a.com"; 755 let col = "app.t.c"; 756 let path = ".abc.uri"; 757 let order = Order::NewestToOldest; 758 let dids_filter = HashSet::new(); 759 760 // --- --- round one! --- --- // 761 // all backlinks 762 let links = storage.get_links(sub, col, path, order, 2, None, &dids_filter)?; 763 assert_eq!( 764 links, 765 PagedAppendingCollection { 766 version: (5, 0), 767 items: vec![ 768 RecordId { 769 did: "did:plc:asdf-5".into(), 770 collection: col.into(), 771 rkey: "asdf".into(), 772 }, 773 RecordId { 774 did: "did:plc:asdf-4".into(), 775 collection: col.into(), 776 rkey: "asdf".into(), 777 }, 778 ], 779 next: Some(3), 780 total: 5, 781 } 782 ); 783 // distinct dids 784 let dids = storage.get_distinct_dids(sub, col, path, 2, None)?; 785 assert_eq!( 786 dids, 787 PagedAppendingCollection { 788 version: (5, 0), 789 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()], 790 next: Some(3), 791 total: 5, 792 } 793 ); 794 795 // --- --- round two! --- --- // 796 // all backlinks 797 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 798 assert_eq!( 799 links, 800 PagedAppendingCollection { 801 version: (5, 0), 802 items: vec![ 803 RecordId { 804 did: "did:plc:asdf-3".into(), 805 collection: col.into(), 806 rkey: "asdf".into(), 807 }, 808 RecordId { 809 did: "did:plc:asdf-2".into(), 810 collection: col.into(), 811 rkey: "asdf".into(), 812 }, 813 ], 814 next: Some(1), 815 total: 5, 816 } 817 ); 818 // distinct dids 819 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 820 assert_eq!( 821 dids, 822 PagedAppendingCollection { 823 version: (5, 0), 824 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()], 825 next: Some(1), 826 total: 5, 827 } 828 ); 829 830 // --- --- round three! --- --- // 831 // all backlinks 832 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 833 assert_eq!( 834 links, 835 PagedAppendingCollection { 836 version: (5, 0), 837 items: vec![RecordId { 838 did: "did:plc:asdf-1".into(), 839 collection: col.into(), 840 rkey: "asdf".into(), 841 },], 842 next: None, 843 total: 5, 844 } 845 ); 846 // distinct dids 847 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 848 assert_eq!( 849 dids, 850 PagedAppendingCollection { 851 version: (5, 0), 852 items: vec!["did:plc:asdf-1".into()], 853 next: None, 854 total: 5, 855 } 856 ); 857 858 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 859 }); 860 861 test_each_storage!(get_links_reverse_order, |storage| { 862 for i in 1..=5 { 863 storage.push( 864 &ActionableEvent::CreateLinks { 865 record_id: RecordId { 866 did: format!("did:plc:asdf-{i}").into(), 867 collection: "app.t.c".into(), 868 rkey: "asdf".into(), 869 }, 870 links: vec![CollectedLink { 871 target: Link::Uri("a.com".into()), 872 path: ".abc.uri".into(), 873 }], 874 }, 875 0, 876 )?; 877 } 878 879 // Test OldestToNewest order (oldest first) 880 let links = storage.get_links( 881 "a.com", 882 "app.t.c", 883 ".abc.uri", 884 Order::OldestToNewest, 885 2, 886 None, 887 &HashSet::default(), 888 )?; 889 assert_eq!( 890 links, 891 PagedAppendingCollection { 892 version: (5, 0), 893 items: vec![ 894 RecordId { 895 did: "did:plc:asdf-1".into(), 896 collection: "app.t.c".into(), 897 rkey: "asdf".into(), 898 }, 899 RecordId { 900 did: "did:plc:asdf-2".into(), 901 collection: "app.t.c".into(), 902 rkey: "asdf".into(), 903 }, 904 ], 905 next: Some(3), 906 total: 5, 907 } 908 ); 909 // Test NewestToOldest order (newest first) 910 let links = storage.get_links( 911 "a.com", 912 "app.t.c", 913 ".abc.uri", 914 Order::NewestToOldest, 915 2, 916 None, 917 &HashSet::default(), 918 )?; 919 assert_eq!( 920 links, 921 PagedAppendingCollection { 922 version: (5, 0), 923 items: vec![ 924 RecordId { 925 did: "did:plc:asdf-5".into(), 926 collection: "app.t.c".into(), 927 rkey: "asdf".into(), 928 }, 929 RecordId { 930 did: "did:plc:asdf-4".into(), 931 collection: "app.t.c".into(), 932 rkey: "asdf".into(), 933 }, 934 ], 935 next: Some(3), 936 total: 5, 937 } 938 ); 939 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 940 }); 941 942 test_each_storage!(get_filtered_links, |storage| { 943 let links = storage.get_links( 944 "a.com", 945 "app.t.c", 946 ".abc.uri", 947 Order::NewestToOldest, 948 2, 949 None, 950 &HashSet::from([Did("did:plc:linker".to_string())]), 951 )?; 952 assert_eq!(links, PagedAppendingCollection::empty()); 953 954 storage.push( 955 &ActionableEvent::CreateLinks { 956 record_id: RecordId { 957 did: "did:plc:linker".into(), 958 collection: "app.t.c".into(), 959 rkey: "asdf".into(), 960 }, 961 links: vec![CollectedLink { 962 target: Link::Uri("a.com".into()), 963 path: ".abc.uri".into(), 964 }], 965 }, 966 0, 967 )?; 968 969 let links = storage.get_links( 970 "a.com", 971 "app.t.c", 972 ".abc.uri", 973 Order::NewestToOldest, 974 2, 975 None, 976 &HashSet::from([Did("did:plc:linker".to_string())]), 977 )?; 978 assert_eq!( 979 links, 980 PagedAppendingCollection { 981 version: (1, 0), 982 items: vec![RecordId { 983 did: "did:plc:linker".into(), 984 collection: "app.t.c".into(), 985 rkey: "asdf".into(), 986 },], 987 next: None, 988 total: 1, 989 } 990 ); 991 992 let links = storage.get_links( 993 "a.com", 994 "app.t.c", 995 ".abc.uri", 996 Order::NewestToOldest, 997 2, 998 None, 999 &HashSet::from([Did("did:plc:someone-else".to_string())]), 1000 )?; 1001 assert_eq!(links, PagedAppendingCollection::empty()); 1002 1003 storage.push( 1004 &ActionableEvent::CreateLinks { 1005 record_id: RecordId { 1006 did: "did:plc:linker".into(), 1007 collection: "app.t.c".into(), 1008 rkey: "asdf-2".into(), 1009 }, 1010 links: vec![CollectedLink { 1011 target: Link::Uri("a.com".into()), 1012 path: ".abc.uri".into(), 1013 }], 1014 }, 1015 0, 1016 )?; 1017 storage.push( 1018 &ActionableEvent::CreateLinks { 1019 record_id: RecordId { 1020 did: "did:plc:someone-else".into(), 1021 collection: "app.t.c".into(), 1022 rkey: "asdf".into(), 1023 }, 1024 links: vec![CollectedLink { 1025 target: Link::Uri("a.com".into()), 1026 path: ".abc.uri".into(), 1027 }], 1028 }, 1029 0, 1030 )?; 1031 1032 let links = storage.get_links( 1033 "a.com", 1034 "app.t.c", 1035 ".abc.uri", 1036 Order::NewestToOldest, 1037 2, 1038 None, 1039 &HashSet::from([Did("did:plc:linker".to_string())]), 1040 )?; 1041 assert_eq!( 1042 links, 1043 PagedAppendingCollection { 1044 version: (2, 0), 1045 items: vec![ 1046 RecordId { 1047 did: "did:plc:linker".into(), 1048 collection: "app.t.c".into(), 1049 rkey: "asdf-2".into(), 1050 }, 1051 RecordId { 1052 did: "did:plc:linker".into(), 1053 collection: "app.t.c".into(), 1054 rkey: "asdf".into(), 1055 }, 1056 ], 1057 next: None, 1058 total: 2, 1059 } 1060 ); 1061 1062 let links = storage.get_links( 1063 "a.com", 1064 "app.t.c", 1065 ".abc.uri", 1066 Order::NewestToOldest, 1067 2, 1068 None, 1069 &HashSet::from([ 1070 Did("did:plc:linker".to_string()), 1071 Did("did:plc:someone-else".to_string()), 1072 ]), 1073 )?; 1074 assert_eq!( 1075 links, 1076 PagedAppendingCollection { 1077 version: (3, 0), 1078 items: vec![ 1079 RecordId { 1080 did: "did:plc:someone-else".into(), 1081 collection: "app.t.c".into(), 1082 rkey: "asdf".into(), 1083 }, 1084 RecordId { 1085 did: "did:plc:linker".into(), 1086 collection: "app.t.c".into(), 1087 rkey: "asdf-2".into(), 1088 }, 1089 ], 1090 next: Some(1), 1091 total: 3, 1092 } 1093 ); 1094 1095 let links = storage.get_links( 1096 "a.com", 1097 "app.t.c", 1098 ".abc.uri", 1099 Order::NewestToOldest, 1100 2, 1101 None, 1102 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), 1103 )?; 1104 assert_eq!(links, PagedAppendingCollection::empty()); 1105 }); 1106 1107 test_each_storage!(get_links_exact_multiple, |storage| { 1108 for i in 1..=4 { 1109 storage.push( 1110 &ActionableEvent::CreateLinks { 1111 record_id: RecordId { 1112 did: format!("did:plc:asdf-{i}").into(), 1113 collection: "app.t.c".into(), 1114 rkey: "asdf".into(), 1115 }, 1116 links: vec![CollectedLink { 1117 target: Link::Uri("a.com".into()), 1118 path: ".abc.uri".into(), 1119 }], 1120 }, 1121 0, 1122 )?; 1123 } 1124 let links = storage.get_links( 1125 "a.com", 1126 "app.t.c", 1127 ".abc.uri", 1128 Order::NewestToOldest, 1129 2, 1130 None, 1131 &HashSet::default(), 1132 )?; 1133 assert_eq!( 1134 links, 1135 PagedAppendingCollection { 1136 version: (4, 0), 1137 items: vec![ 1138 RecordId { 1139 did: "did:plc:asdf-4".into(), 1140 collection: "app.t.c".into(), 1141 rkey: "asdf".into(), 1142 }, 1143 RecordId { 1144 did: "did:plc:asdf-3".into(), 1145 collection: "app.t.c".into(), 1146 rkey: "asdf".into(), 1147 }, 1148 ], 1149 next: Some(2), 1150 total: 4, 1151 } 1152 ); 1153 let links = storage.get_links( 1154 "a.com", 1155 "app.t.c", 1156 ".abc.uri", 1157 Order::NewestToOldest, 1158 2, 1159 links.next, 1160 &HashSet::default(), 1161 )?; 1162 assert_eq!( 1163 links, 1164 PagedAppendingCollection { 1165 version: (4, 0), 1166 items: vec![ 1167 RecordId { 1168 did: "did:plc:asdf-2".into(), 1169 collection: "app.t.c".into(), 1170 rkey: "asdf".into(), 1171 }, 1172 RecordId { 1173 did: "did:plc:asdf-1".into(), 1174 collection: "app.t.c".into(), 1175 rkey: "asdf".into(), 1176 }, 1177 ], 1178 next: None, 1179 total: 4, 1180 } 1181 ); 1182 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1183 }); 1184 1185 test_each_storage!(page_links_while_new_links_arrive, |storage| { 1186 for i in 1..=4 { 1187 storage.push( 1188 &ActionableEvent::CreateLinks { 1189 record_id: RecordId { 1190 did: format!("did:plc:asdf-{i}").into(), 1191 collection: "app.t.c".into(), 1192 rkey: "asdf".into(), 1193 }, 1194 links: vec![CollectedLink { 1195 target: Link::Uri("a.com".into()), 1196 path: ".abc.uri".into(), 1197 }], 1198 }, 1199 0, 1200 )?; 1201 } 1202 let links = storage.get_links( 1203 "a.com", 1204 "app.t.c", 1205 ".abc.uri", 1206 Order::NewestToOldest, 1207 2, 1208 None, 1209 &HashSet::default(), 1210 )?; 1211 assert_eq!( 1212 links, 1213 PagedAppendingCollection { 1214 version: (4, 0), 1215 items: vec![ 1216 RecordId { 1217 did: "did:plc:asdf-4".into(), 1218 collection: "app.t.c".into(), 1219 rkey: "asdf".into(), 1220 }, 1221 RecordId { 1222 did: "did:plc:asdf-3".into(), 1223 collection: "app.t.c".into(), 1224 rkey: "asdf".into(), 1225 }, 1226 ], 1227 next: Some(2), 1228 total: 4, 1229 } 1230 ); 1231 storage.push( 1232 &ActionableEvent::CreateLinks { 1233 record_id: RecordId { 1234 did: "did:plc:asdf-5".into(), 1235 collection: "app.t.c".into(), 1236 rkey: "asdf".into(), 1237 }, 1238 links: vec![CollectedLink { 1239 target: Link::Uri("a.com".into()), 1240 path: ".abc.uri".into(), 1241 }], 1242 }, 1243 0, 1244 )?; 1245 let links = storage.get_links( 1246 "a.com", 1247 "app.t.c", 1248 ".abc.uri", 1249 Order::NewestToOldest, 1250 2, 1251 links.next, 1252 &HashSet::default(), 1253 )?; 1254 assert_eq!( 1255 links, 1256 PagedAppendingCollection { 1257 version: (5, 0), 1258 items: vec![ 1259 RecordId { 1260 did: "did:plc:asdf-2".into(), 1261 collection: "app.t.c".into(), 1262 rkey: "asdf".into(), 1263 }, 1264 RecordId { 1265 did: "did:plc:asdf-1".into(), 1266 collection: "app.t.c".into(), 1267 rkey: "asdf".into(), 1268 }, 1269 ], 1270 next: None, 1271 total: 5, 1272 } 1273 ); 1274 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 1275 }); 1276 1277 test_each_storage!(page_links_while_some_are_deleted, |storage| { 1278 for i in 1..=4 { 1279 storage.push( 1280 &ActionableEvent::CreateLinks { 1281 record_id: RecordId { 1282 did: format!("did:plc:asdf-{i}").into(), 1283 collection: "app.t.c".into(), 1284 rkey: "asdf".into(), 1285 }, 1286 links: vec![CollectedLink { 1287 target: Link::Uri("a.com".into()), 1288 path: ".abc.uri".into(), 1289 }], 1290 }, 1291 0, 1292 )?; 1293 } 1294 let links = storage.get_links( 1295 "a.com", 1296 "app.t.c", 1297 ".abc.uri", 1298 Order::NewestToOldest, 1299 2, 1300 None, 1301 &HashSet::default(), 1302 )?; 1303 assert_eq!( 1304 links, 1305 PagedAppendingCollection { 1306 version: (4, 0), 1307 items: vec![ 1308 RecordId { 1309 did: "did:plc:asdf-4".into(), 1310 collection: "app.t.c".into(), 1311 rkey: "asdf".into(), 1312 }, 1313 RecordId { 1314 did: "did:plc:asdf-3".into(), 1315 collection: "app.t.c".into(), 1316 rkey: "asdf".into(), 1317 }, 1318 ], 1319 next: Some(2), 1320 total: 4, 1321 } 1322 ); 1323 storage.push( 1324 &ActionableEvent::DeleteRecord(RecordId { 1325 did: "did:plc:asdf-2".into(), 1326 collection: "app.t.c".into(), 1327 rkey: "asdf".into(), 1328 }), 1329 0, 1330 )?; 1331 let links = storage.get_links( 1332 "a.com", 1333 "app.t.c", 1334 ".abc.uri", 1335 Order::NewestToOldest, 1336 2, 1337 links.next, 1338 &HashSet::default(), 1339 )?; 1340 assert_eq!( 1341 links, 1342 PagedAppendingCollection { 1343 version: (4, 1), 1344 items: vec![RecordId { 1345 did: "did:plc:asdf-1".into(), 1346 collection: "app.t.c".into(), 1347 rkey: "asdf".into(), 1348 },], 1349 next: None, 1350 total: 3, 1351 } 1352 ); 1353 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3); 1354 }); 1355 1356 test_each_storage!(page_links_accounts_inactive, |storage| { 1357 for i in 1..=4 { 1358 storage.push( 1359 &ActionableEvent::CreateLinks { 1360 record_id: RecordId { 1361 did: format!("did:plc:asdf-{i}").into(), 1362 collection: "app.t.c".into(), 1363 rkey: "asdf".into(), 1364 }, 1365 links: vec![CollectedLink { 1366 target: Link::Uri("a.com".into()), 1367 path: ".abc.uri".into(), 1368 }], 1369 }, 1370 0, 1371 )?; 1372 } 1373 let links = storage.get_links( 1374 "a.com", 1375 "app.t.c", 1376 ".abc.uri", 1377 Order::NewestToOldest, 1378 2, 1379 None, 1380 &HashSet::default(), 1381 )?; 1382 assert_eq!( 1383 links, 1384 PagedAppendingCollection { 1385 version: (4, 0), 1386 items: vec![ 1387 RecordId { 1388 did: "did:plc:asdf-4".into(), 1389 collection: "app.t.c".into(), 1390 rkey: "asdf".into(), 1391 }, 1392 RecordId { 1393 did: "did:plc:asdf-3".into(), 1394 collection: "app.t.c".into(), 1395 rkey: "asdf".into(), 1396 }, 1397 ], 1398 next: Some(2), 1399 total: 4, 1400 } 1401 ); 1402 storage.push( 1403 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()), 1404 0, 1405 )?; 1406 let links = storage.get_links( 1407 "a.com", 1408 "app.t.c", 1409 ".abc.uri", 1410 Order::NewestToOldest, 1411 2, 1412 links.next, 1413 &HashSet::default(), 1414 )?; 1415 assert_eq!( 1416 links, 1417 PagedAppendingCollection { 1418 version: (4, 0), 1419 items: vec![RecordId { 1420 did: "did:plc:asdf-2".into(), 1421 collection: "app.t.c".into(), 1422 rkey: "asdf".into(), 1423 },], 1424 next: None, 1425 total: 4, 1426 } 1427 ); 1428 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1429 }); 1430 1431 test_each_storage!(get_all_counts, |storage| { 1432 storage.push( 1433 &ActionableEvent::CreateLinks { 1434 record_id: RecordId { 1435 did: "did:plc:asdf".into(), 1436 collection: "app.t.c".into(), 1437 rkey: "asdf".into(), 1438 }, 1439 links: vec![ 1440 CollectedLink { 1441 target: Link::Uri("a.com".into()), 1442 path: ".abc.uri".into(), 1443 }, 1444 CollectedLink { 1445 target: Link::Uri("a.com".into()), 1446 path: ".def.uri".into(), 1447 }, 1448 ], 1449 }, 1450 0, 1451 )?; 1452 assert_eq!(storage.get_all_record_counts("a.com")?, { 1453 let mut counts = HashMap::new(); 1454 let mut t_c_counts = HashMap::new(); 1455 t_c_counts.insert(".abc.uri".into(), 1); 1456 t_c_counts.insert(".def.uri".into(), 1); 1457 counts.insert("app.t.c".into(), t_c_counts); 1458 counts 1459 }); 1460 assert_eq!(storage.get_all_counts("a.com")?, { 1461 let mut counts = HashMap::new(); 1462 let mut t_c_counts = HashMap::new(); 1463 t_c_counts.insert( 1464 ".abc.uri".into(), 1465 CountsByCount { 1466 records: 1, 1467 distinct_dids: 1, 1468 }, 1469 ); 1470 t_c_counts.insert( 1471 ".def.uri".into(), 1472 CountsByCount { 1473 records: 1, 1474 distinct_dids: 1, 1475 }, 1476 ); 1477 counts.insert("app.t.c".into(), t_c_counts); 1478 counts 1479 }); 1480 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1); 1481 }); 1482 1483 //////// many-to-many ///////// 1484 1485 test_each_storage!(get_m2m_counts_empty, |storage| { 1486 assert_eq!( 1487 storage.get_many_to_many_counts( 1488 "a.com", 1489 "a.b.c", 1490 ".d.e", 1491 ".f.g", 1492 10, 1493 None, 1494 &HashSet::new(), 1495 &HashSet::new(), 1496 )?, 1497 PagedOrderedCollection::empty() 1498 ); 1499 }); 1500 1501 test_each_storage!(get_m2m_counts_single, |storage| { 1502 storage.push( 1503 &ActionableEvent::CreateLinks { 1504 record_id: RecordId { 1505 did: "did:plc:asdf".into(), 1506 collection: "app.t.c".into(), 1507 rkey: "asdf".into(), 1508 }, 1509 links: vec![ 1510 CollectedLink { 1511 target: Link::Uri("a.com".into()), 1512 path: ".abc.uri".into(), 1513 }, 1514 CollectedLink { 1515 target: Link::Uri("b.com".into()), 1516 path: ".def.uri".into(), 1517 }, 1518 CollectedLink { 1519 target: Link::Uri("b.com".into()), 1520 path: ".ghi.uri".into(), 1521 }, 1522 ], 1523 }, 1524 0, 1525 )?; 1526 assert_eq!( 1527 storage.get_many_to_many_counts( 1528 "a.com", 1529 "app.t.c", 1530 ".abc.uri", 1531 ".def.uri", 1532 10, 1533 None, 1534 &HashSet::new(), 1535 &HashSet::new(), 1536 )?, 1537 PagedOrderedCollection { 1538 items: vec![("b.com".to_string(), 1, 1)], 1539 next: None, 1540 } 1541 ); 1542 }); 1543 1544 test_each_storage!(get_m2m_counts_filters, |storage| { 1545 storage.push( 1546 &ActionableEvent::CreateLinks { 1547 record_id: RecordId { 1548 did: "did:plc:asdf".into(), 1549 collection: "app.t.c".into(), 1550 rkey: "asdf".into(), 1551 }, 1552 links: vec![ 1553 CollectedLink { 1554 target: Link::Uri("a.com".into()), 1555 path: ".abc.uri".into(), 1556 }, 1557 CollectedLink { 1558 target: Link::Uri("b.com".into()), 1559 path: ".def.uri".into(), 1560 }, 1561 ], 1562 }, 1563 0, 1564 )?; 1565 storage.push( 1566 &ActionableEvent::CreateLinks { 1567 record_id: RecordId { 1568 did: "did:plc:asdfasdf".into(), 1569 collection: "app.t.c".into(), 1570 rkey: "asdf".into(), 1571 }, 1572 links: vec![ 1573 CollectedLink { 1574 target: Link::Uri("a.com".into()), 1575 path: ".abc.uri".into(), 1576 }, 1577 CollectedLink { 1578 target: Link::Uri("b.com".into()), 1579 path: ".def.uri".into(), 1580 }, 1581 ], 1582 }, 1583 1, 1584 )?; 1585 storage.push( 1586 &ActionableEvent::CreateLinks { 1587 record_id: RecordId { 1588 did: "did:plc:fdsa".into(), 1589 collection: "app.t.c".into(), 1590 rkey: "asdf".into(), 1591 }, 1592 links: vec![ 1593 CollectedLink { 1594 target: Link::Uri("a.com".into()), 1595 path: ".abc.uri".into(), 1596 }, 1597 CollectedLink { 1598 target: Link::Uri("c.com".into()), 1599 path: ".def.uri".into(), 1600 }, 1601 ], 1602 }, 1603 2, 1604 )?; 1605 storage.push( 1606 &ActionableEvent::CreateLinks { 1607 record_id: RecordId { 1608 did: "did:plc:fdsa".into(), 1609 collection: "app.t.c".into(), 1610 rkey: "asdf2".into(), 1611 }, 1612 links: vec![ 1613 CollectedLink { 1614 target: Link::Uri("a.com".into()), 1615 path: ".abc.uri".into(), 1616 }, 1617 CollectedLink { 1618 target: Link::Uri("c.com".into()), 1619 path: ".def.uri".into(), 1620 }, 1621 ], 1622 }, 1623 3, 1624 )?; 1625 assert_eq!( 1626 storage.get_many_to_many_counts( 1627 "a.com", 1628 "app.t.c", 1629 ".abc.uri", 1630 ".def.uri", 1631 10, 1632 None, 1633 &HashSet::new(), 1634 &HashSet::new(), 1635 )?, 1636 PagedOrderedCollection { 1637 items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),], 1638 next: None, 1639 } 1640 ); 1641 assert_eq!( 1642 storage.get_many_to_many_counts( 1643 "a.com", 1644 "app.t.c", 1645 ".abc.uri", 1646 ".def.uri", 1647 10, 1648 None, 1649 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), 1650 &HashSet::new(), 1651 )?, 1652 PagedOrderedCollection { 1653 items: vec![("c.com".to_string(), 2, 1),], 1654 next: None, 1655 } 1656 ); 1657 assert_eq!( 1658 storage.get_many_to_many_counts( 1659 "a.com", 1660 "app.t.c", 1661 ".abc.uri", 1662 ".def.uri", 1663 10, 1664 None, 1665 &HashSet::new(), 1666 &HashSet::from_iter(["b.com".to_string()]), 1667 )?, 1668 PagedOrderedCollection { 1669 items: vec![("b.com".to_string(), 2, 2),], 1670 next: None, 1671 } 1672 ); 1673 }); 1674}