Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 2167 lines 71 kB view raw
1use crate::{ActionableEvent, CountsByCount, Did, ManyToManyItem, RecordId}; 2use anyhow::Result; 3use serde::{Deserialize, Serialize}; 4use std::collections::{HashMap, HashSet}; 5 6pub mod mem_store; 7pub use mem_store::MemStorage; 8 9#[cfg(feature = "rocks")] 10pub mod rocks_store; 11#[cfg(feature = "rocks")] 12pub use rocks_store::RocksStorage; 13 14/// Ordering for paginated link queries 15#[derive(Debug, Clone, Copy, PartialEq, Eq)] 16pub enum Order { 17 /// Newest links first (default) 18 NewestToOldest, 19 /// Oldest links first 20 OldestToNewest, 21} 22 23#[derive(Debug, Default, PartialEq)] 24pub struct PagedAppendingCollection<T> { 25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 26 pub items: Vec<T>, 27 pub next: Option<u64>, 28 pub total: u64, 29} 30 31impl<T> PagedAppendingCollection<T> { 32 pub(crate) fn empty() -> Self { 33 Self { 34 version: (0, 0), 35 items: Vec::new(), 36 next: None, 37 total: 0, 38 } 39 } 40} 41 42#[derive(Copy, Clone, Debug)] 43struct ManyToManyCursor { 44 backlink_idx: u64, 45 other_link_idx: u64, 46} 47 48/// A paged collection whose keys are sorted instead of indexed 49/// 50/// this has weaker guarantees than PagedAppendingCollection: it might 51/// return a totally consistent snapshot. but it should avoid duplicates 52/// and each page should at least be internally consistent. 53#[derive(Debug, PartialEq)] 54pub struct PagedOrderedCollection<T, K: Ord> { 55 pub items: Vec<T>, 56 pub next: Option<K>, 57} 58 59impl<T, K: Ord> PagedOrderedCollection<T, K> { 60 pub(crate) fn empty() -> Self { 61 Self { 62 items: Vec::new(), 63 next: None, 64 } 65 } 66} 67 68#[derive(Debug, Deserialize, Serialize, PartialEq)] 69pub struct StorageStats { 70 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. 71 /// for example: new user A follows users B and C. this count will only increment by one, for A. 72 pub dids: u64, 73 74 /// estimate targets * distinct (collection, path)s to reference them. 75 /// distinct targets alone are currently challenging to estimate. 76 pub targetables: u64, 77 78 /// estimate of the count of atproto records seen that contain links. 79 /// records with multiple links are single-counted. 80 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it. 81 pub linking_records: u64, 82 83 /// first jetstream cursor when this instance first started 84 pub started_at: Option<u64>, 85 86 /// anything else we want to throw in 87 pub other_data: HashMap<String, u64>, 88} 89 90pub trait LinkStorage: Send + Sync { 91 /// jetstream cursor from last saved actions, if available 92 fn get_cursor(&mut self) -> Result<Option<u64>> { 93 Ok(None) 94 } 95 96 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>; 97 98 // readers are off from the writer instance 99 fn to_readable(&mut self) -> impl LinkReader; 100} 101 102pub trait LinkReader: Clone + Send + Sync + 'static { 103 #[allow(clippy::too_many_arguments)] 104 fn get_many_to_many_counts( 105 &self, 106 target: &str, 107 collection: &str, 108 path: &str, 109 path_to_other: &str, 110 limit: u64, 111 after: Option<String>, 112 filter_dids: &HashSet<Did>, 113 filter_to_targets: &HashSet<String>, 114 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>>; 115 116 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 117 118 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 119 120 #[allow(clippy::too_many_arguments)] 121 fn get_links( 122 &self, 123 target: &str, 124 collection: &str, 125 path: &str, 126 order: Order, 127 limit: u64, 128 until: Option<u64>, 129 filter_dids: &HashSet<Did>, 130 ) -> Result<PagedAppendingCollection<RecordId>>; 131 132 fn get_distinct_dids( 133 &self, 134 target: &str, 135 collection: &str, 136 path: &str, 137 limit: u64, 138 until: Option<u64>, 139 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor 140 141 fn get_all_record_counts(&self, _target: &str) 142 -> Result<HashMap<String, HashMap<String, u64>>>; 143 144 #[allow(clippy::too_many_arguments)] 145 fn get_many_to_many( 146 &self, 147 target: &str, 148 collection: &str, 149 path: &str, 150 path_to_other: &str, 151 limit: u64, 152 after: Option<String>, 153 filter_dids: &HashSet<Did>, 154 filter_to_targets: &HashSet<String>, 155 ) -> Result<PagedOrderedCollection<ManyToManyItem, String>>; 156 157 fn get_all_counts( 158 &self, 159 _target: &str, 160 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>; 161 162 /// assume all stats are estimates, since exact counts are very challenging for LSMs 163 fn get_stats(&self) -> Result<StorageStats>; 164} 165 166#[cfg(test)] 167mod tests { 168 use super::*; 169 use links::{CollectedLink, Link}; 170 use std::ops::RangeBounds; 171 172 macro_rules! test_each_storage { 173 ($test_name:ident, |$storage_label:ident| $test_code:block) => { 174 #[test] 175 fn $test_name() -> Result<()> { 176 { 177 println!("=> testing with memstorage backend"); 178 #[allow(unused_mut)] 179 let mut $storage_label = MemStorage::new(); 180 $test_code 181 } 182 183 #[cfg(feature = "rocks")] 184 { 185 println!("=> testing with rocksdb backend"); 186 let rocks_db_path = tempfile::tempdir()?; 187 #[allow(unused_mut)] 188 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?; 189 $test_code 190 } 191 192 Ok(()) 193 } 194 }; 195 } 196 197 fn assert_stats( 198 stats: StorageStats, 199 dids: impl RangeBounds<u64>, 200 targetables: impl RangeBounds<u64>, 201 linking_records: impl RangeBounds<u64>, 202 ) { 203 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) { 204 assert!( 205 rb.contains(&stat), 206 "{name:?}: {stat:?} not in range {:?}–{:?}", 207 rb.start_bound(), 208 rb.end_bound() 209 ); 210 } 211 check("dids", stats.dids, dids); 212 check("targetables", stats.targetables, targetables); 213 check("linking_records", stats.linking_records, linking_records); 214 } 215 216 test_each_storage!(test_empty, |storage| { 217 assert_eq!(storage.get_count("", "", "")?, 0); 218 assert_eq!(storage.get_count("a", "b", "c")?, 0); 219 assert_eq!( 220 storage.get_count( 221 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f", 222 "app.t.c", 223 ".reply.parent.uri" 224 )?, 225 0 226 ); 227 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0); 228 assert_eq!( 229 storage.get_links( 230 "a.com", 231 "app.t.c", 232 ".abc.uri", 233 Order::NewestToOldest, 234 100, 235 None, 236 &HashSet::default() 237 )?, 238 PagedAppendingCollection::empty() 239 ); 240 assert_eq!( 241 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 242 PagedAppendingCollection::empty() 243 ); 244 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 245 assert_eq!( 246 storage.get_all_record_counts("bad-example.com")?, 247 HashMap::new() 248 ); 249 250 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0); 251 }); 252 253 test_each_storage!(test_add_link, |storage| { 254 storage.push( 255 &ActionableEvent::CreateLinks { 256 record_id: RecordId { 257 did: "did:plc:asdf".into(), 258 collection: "app.t.c".into(), 259 rkey: "fdsa".into(), 260 }, 261 links: vec![CollectedLink { 262 target: Link::Uri("e.com".into()), 263 path: ".abc.uri".into(), 264 }], 265 }, 266 0, 267 )?; 268 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 269 assert_eq!( 270 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 271 1 272 ); 273 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0); 274 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0); 275 assert_eq!( 276 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?, 277 0 278 ); 279 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 280 }); 281 282 test_each_storage!(test_links, |storage| { 283 storage.push( 284 &ActionableEvent::CreateLinks { 285 record_id: RecordId { 286 did: "did:plc:asdf".into(), 287 collection: "app.t.c".into(), 288 rkey: "fdsa".into(), 289 }, 290 links: vec![CollectedLink { 291 target: Link::Uri("e.com".into()), 292 path: ".abc.uri".into(), 293 }], 294 }, 295 0, 296 )?; 297 298 // delete under the wrong collection 299 storage.push( 300 &ActionableEvent::DeleteRecord(RecordId { 301 did: "did:plc:asdf".into(), 302 collection: "app.test.wrongcollection".into(), 303 rkey: "fdsa".into(), 304 }), 305 0, 306 )?; 307 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 308 309 // delete under the wrong rkey 310 storage.push( 311 &ActionableEvent::DeleteRecord(RecordId { 312 did: "did:plc:asdf".into(), 313 collection: "app.t.c".into(), 314 rkey: "wrongkey".into(), 315 }), 316 0, 317 )?; 318 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 319 320 // finally actually delete it 321 storage.push( 322 &ActionableEvent::DeleteRecord(RecordId { 323 did: "did:plc:asdf".into(), 324 collection: "app.t.c".into(), 325 rkey: "fdsa".into(), 326 }), 327 0, 328 )?; 329 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 330 331 // put it back 332 storage.push( 333 &ActionableEvent::CreateLinks { 334 record_id: RecordId { 335 did: "did:plc:asdf".into(), 336 collection: "app.t.c".into(), 337 rkey: "fdsa".into(), 338 }, 339 links: vec![CollectedLink { 340 target: Link::Uri("e.com".into()), 341 path: ".abc.uri".into(), 342 }], 343 }, 344 0, 345 )?; 346 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 347 assert_eq!( 348 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 349 1 350 ); 351 352 // add another link from this user 353 storage.push( 354 &ActionableEvent::CreateLinks { 355 record_id: RecordId { 356 did: "did:plc:asdf".into(), 357 collection: "app.t.c".into(), 358 rkey: "fdsa2".into(), 359 }, 360 links: vec![CollectedLink { 361 target: Link::Uri("e.com".into()), 362 path: ".abc.uri".into(), 363 }], 364 }, 365 0, 366 )?; 367 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 368 assert_eq!( 369 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 370 1 371 ); 372 373 // add a link from someone else 374 storage.push( 375 &ActionableEvent::CreateLinks { 376 record_id: RecordId { 377 did: "did:plc:asdfasdf".into(), 378 collection: "app.t.c".into(), 379 rkey: "fdsa".into(), 380 }, 381 links: vec![CollectedLink { 382 target: Link::Uri("e.com".into()), 383 path: ".abc.uri".into(), 384 }], 385 }, 386 0, 387 )?; 388 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3); 389 assert_eq!( 390 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 391 2 392 ); 393 394 // aaaand delete the first one again 395 storage.push( 396 &ActionableEvent::DeleteRecord(RecordId { 397 did: "did:plc:asdf".into(), 398 collection: "app.t.c".into(), 399 rkey: "fdsa".into(), 400 }), 401 0, 402 )?; 403 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 404 assert_eq!( 405 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 406 2 407 ); 408 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2); 409 }); 410 411 test_each_storage!(test_two_user_links_delete_one, |storage| { 412 // create the first link 413 storage.push( 414 &ActionableEvent::CreateLinks { 415 record_id: RecordId { 416 did: "did:plc:asdf".into(), 417 collection: "app.t.c".into(), 418 rkey: "A".into(), 419 }, 420 links: vec![CollectedLink { 421 target: Link::Uri("e.com".into()), 422 path: ".abc.uri".into(), 423 }], 424 }, 425 0, 426 )?; 427 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 428 assert_eq!( 429 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 430 1 431 ); 432 433 // create the second link (same user, different rkey) 434 storage.push( 435 &ActionableEvent::CreateLinks { 436 record_id: RecordId { 437 did: "did:plc:asdf".into(), 438 collection: "app.t.c".into(), 439 rkey: "B".into(), 440 }, 441 links: vec![CollectedLink { 442 target: Link::Uri("e.com".into()), 443 path: ".abc.uri".into(), 444 }], 445 }, 446 0, 447 )?; 448 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 449 assert_eq!( 450 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 451 1 452 ); 453 454 // aaaand delete the first link 455 storage.push( 456 &ActionableEvent::DeleteRecord(RecordId { 457 did: "did:plc:asdf".into(), 458 collection: "app.t.c".into(), 459 rkey: "A".into(), 460 }), 461 0, 462 )?; 463 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 464 assert_eq!( 465 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 466 1 467 ); 468 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 469 }); 470 471 test_each_storage!(test_accounts, |storage| { 472 // create two links 473 storage.push( 474 &ActionableEvent::CreateLinks { 475 record_id: RecordId { 476 did: "did:plc:asdf".into(), 477 collection: "app.t.c".into(), 478 rkey: "A".into(), 479 }, 480 links: vec![CollectedLink { 481 target: Link::Uri("a.com".into()), 482 path: ".abc.uri".into(), 483 }], 484 }, 485 0, 486 )?; 487 storage.push( 488 &ActionableEvent::CreateLinks { 489 record_id: RecordId { 490 did: "did:plc:asdf".into(), 491 collection: "app.t.c".into(), 492 rkey: "B".into(), 493 }, 494 links: vec![CollectedLink { 495 target: Link::Uri("b.com".into()), 496 path: ".abc.uri".into(), 497 }], 498 }, 499 0, 500 )?; 501 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 502 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1); 503 504 // and a third from a different account 505 storage.push( 506 &ActionableEvent::CreateLinks { 507 record_id: RecordId { 508 did: "did:plc:fdsa".into(), 509 collection: "app.t.c".into(), 510 rkey: "A".into(), 511 }, 512 links: vec![CollectedLink { 513 target: Link::Uri("a.com".into()), 514 path: ".abc.uri".into(), 515 }], 516 }, 517 0, 518 )?; 519 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2); 520 521 // delete the first account 522 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?; 523 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 524 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0); 525 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1); 526 }); 527 528 test_each_storage!(multi_link, |storage| { 529 storage.push( 530 &ActionableEvent::CreateLinks { 531 record_id: RecordId { 532 did: "did:plc:asdf".into(), 533 collection: "app.t.c".into(), 534 rkey: "fdsa".into(), 535 }, 536 links: vec![ 537 CollectedLink { 538 target: Link::Uri("e.com".into()), 539 path: ".abc.uri".into(), 540 }, 541 CollectedLink { 542 target: Link::Uri("f.com".into()), 543 path: ".xyz[].uri".into(), 544 }, 545 CollectedLink { 546 target: Link::Uri("g.com".into()), 547 path: ".xyz[].uri".into(), 548 }, 549 ], 550 }, 551 0, 552 )?; 553 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 554 assert_eq!( 555 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 556 1 557 ); 558 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 559 assert_eq!( 560 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?, 561 1 562 ); 563 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 564 assert_eq!( 565 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?, 566 1 567 ); 568 569 storage.push( 570 &ActionableEvent::DeleteRecord(RecordId { 571 did: "did:plc:asdf".into(), 572 collection: "app.t.c".into(), 573 rkey: "fdsa".into(), 574 }), 575 0, 576 )?; 577 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 578 assert_eq!( 579 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 580 0 581 ); 582 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0); 583 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 584 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0); 585 }); 586 587 test_each_storage!(update_link, |storage| { 588 // create the links 589 storage.push( 590 &ActionableEvent::CreateLinks { 591 record_id: RecordId { 592 did: "did:plc:asdf".into(), 593 collection: "app.t.c".into(), 594 rkey: "fdsa".into(), 595 }, 596 links: vec![ 597 CollectedLink { 598 target: Link::Uri("e.com".into()), 599 path: ".abc.uri".into(), 600 }, 601 CollectedLink { 602 target: Link::Uri("f.com".into()), 603 path: ".xyz[].uri".into(), 604 }, 605 CollectedLink { 606 target: Link::Uri("g.com".into()), 607 path: ".xyz[].uri".into(), 608 }, 609 ], 610 }, 611 0, 612 )?; 613 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 614 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 615 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 616 617 // update them 618 storage.push( 619 &ActionableEvent::UpdateLinks { 620 record_id: RecordId { 621 did: "did:plc:asdf".into(), 622 collection: "app.t.c".into(), 623 rkey: "fdsa".into(), 624 }, 625 new_links: vec![ 626 CollectedLink { 627 target: Link::Uri("h.com".into()), 628 path: ".abc.uri".into(), 629 }, 630 CollectedLink { 631 target: Link::Uri("f.com".into()), 632 path: ".xyz[].uri".into(), 633 }, 634 CollectedLink { 635 target: Link::Uri("i.com".into()), 636 path: ".xyz[].uri".into(), 637 }, 638 ], 639 }, 640 0, 641 )?; 642 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 643 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1); 644 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 645 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 646 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1); 647 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1); 648 }); 649 650 test_each_storage!(update_no_links_to_links, |storage| { 651 // update without prior create (consumer would have filtered out the original) 652 storage.push( 653 &ActionableEvent::UpdateLinks { 654 record_id: RecordId { 655 did: "did:plc:asdf".into(), 656 collection: "app.t.c".into(), 657 rkey: "asdf".into(), 658 }, 659 new_links: vec![CollectedLink { 660 target: Link::Uri("a.com".into()), 661 path: ".abc.uri".into(), 662 }], 663 }, 664 0, 665 )?; 666 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 667 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 668 }); 669 670 test_each_storage!(delete_multi_link_same_target, |storage| { 671 storage.push( 672 &ActionableEvent::CreateLinks { 673 record_id: RecordId { 674 did: "did:plc:asdf".into(), 675 collection: "app.t.c".into(), 676 rkey: "asdf".into(), 677 }, 678 links: vec![ 679 CollectedLink { 680 target: Link::Uri("a.com".into()), 681 path: ".abc.uri".into(), 682 }, 683 CollectedLink { 684 target: Link::Uri("a.com".into()), 685 path: ".def.uri".into(), 686 }, 687 ], 688 }, 689 0, 690 )?; 691 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 692 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1); 693 694 storage.push( 695 &ActionableEvent::DeleteRecord(RecordId { 696 did: "did:plc:asdf".into(), 697 collection: "app.t.c".into(), 698 rkey: "asdf".into(), 699 }), 700 0, 701 )?; 702 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0); 703 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0); 704 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0); 705 }); 706 707 test_each_storage!(get_links_basic, |storage| { 708 storage.push( 709 &ActionableEvent::CreateLinks { 710 record_id: RecordId { 711 did: "did:plc:asdf".into(), 712 collection: "app.t.c".into(), 713 rkey: "asdf".into(), 714 }, 715 links: vec![CollectedLink { 716 target: Link::Uri("a.com".into()), 717 path: ".abc.uri".into(), 718 }], 719 }, 720 0, 721 )?; 722 assert_eq!( 723 storage.get_links( 724 "a.com", 725 "app.t.c", 726 ".abc.uri", 727 Order::NewestToOldest, 728 100, 729 None, 730 &HashSet::default() 731 )?, 732 PagedAppendingCollection { 733 version: (1, 0), 734 items: vec![RecordId { 735 did: "did:plc:asdf".into(), 736 collection: "app.t.c".into(), 737 rkey: "asdf".into(), 738 }], 739 next: None, 740 total: 1, 741 } 742 ); 743 assert_eq!( 744 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 745 PagedAppendingCollection { 746 version: (1, 0), 747 items: vec!["did:plc:asdf".into()], 748 next: None, 749 total: 1, 750 } 751 ); 752 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 753 }); 754 755 test_each_storage!(get_links_paged, |storage| { 756 for i in 1..=5 { 757 storage.push( 758 &ActionableEvent::CreateLinks { 759 record_id: RecordId { 760 did: format!("did:plc:asdf-{i}").into(), 761 collection: "app.t.c".into(), 762 rkey: "asdf".into(), 763 }, 764 links: vec![CollectedLink { 765 target: Link::Uri("a.com".into()), 766 path: ".abc.uri".into(), 767 }], 768 }, 769 0, 770 )?; 771 } 772 773 let sub = "a.com"; 774 let col = "app.t.c"; 775 let path = ".abc.uri"; 776 let order = Order::NewestToOldest; 777 let dids_filter = HashSet::new(); 778 779 // --- --- round one! --- --- // 780 // all backlinks 781 let links = storage.get_links(sub, col, path, order, 2, None, &dids_filter)?; 782 assert_eq!( 783 links, 784 PagedAppendingCollection { 785 version: (5, 0), 786 items: vec![ 787 RecordId { 788 did: "did:plc:asdf-5".into(), 789 collection: col.into(), 790 rkey: "asdf".into(), 791 }, 792 RecordId { 793 did: "did:plc:asdf-4".into(), 794 collection: col.into(), 795 rkey: "asdf".into(), 796 }, 797 ], 798 next: Some(3), 799 total: 5, 800 } 801 ); 802 // distinct dids 803 let dids = storage.get_distinct_dids(sub, col, path, 2, None)?; 804 assert_eq!( 805 dids, 806 PagedAppendingCollection { 807 version: (5, 0), 808 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()], 809 next: Some(3), 810 total: 5, 811 } 812 ); 813 814 // --- --- round two! --- --- // 815 // all backlinks 816 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 817 assert_eq!( 818 links, 819 PagedAppendingCollection { 820 version: (5, 0), 821 items: vec![ 822 RecordId { 823 did: "did:plc:asdf-3".into(), 824 collection: col.into(), 825 rkey: "asdf".into(), 826 }, 827 RecordId { 828 did: "did:plc:asdf-2".into(), 829 collection: col.into(), 830 rkey: "asdf".into(), 831 }, 832 ], 833 next: Some(1), 834 total: 5, 835 } 836 ); 837 // distinct dids 838 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 839 assert_eq!( 840 dids, 841 PagedAppendingCollection { 842 version: (5, 0), 843 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()], 844 next: Some(1), 845 total: 5, 846 } 847 ); 848 849 // --- --- round three! --- --- // 850 // all backlinks 851 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 852 assert_eq!( 853 links, 854 PagedAppendingCollection { 855 version: (5, 0), 856 items: vec![RecordId { 857 did: "did:plc:asdf-1".into(), 858 collection: col.into(), 859 rkey: "asdf".into(), 860 },], 861 next: None, 862 total: 5, 863 } 864 ); 865 // distinct dids 866 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 867 assert_eq!( 868 dids, 869 PagedAppendingCollection { 870 version: (5, 0), 871 items: vec!["did:plc:asdf-1".into()], 872 next: None, 873 total: 5, 874 } 875 ); 876 877 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 878 }); 879 880 test_each_storage!(get_links_reverse_order, |storage| { 881 for i in 1..=5 { 882 storage.push( 883 &ActionableEvent::CreateLinks { 884 record_id: RecordId { 885 did: format!("did:plc:asdf-{i}").into(), 886 collection: "app.t.c".into(), 887 rkey: "asdf".into(), 888 }, 889 links: vec![CollectedLink { 890 target: Link::Uri("a.com".into()), 891 path: ".abc.uri".into(), 892 }], 893 }, 894 0, 895 )?; 896 } 897 898 // Test OldestToNewest order (oldest first) 899 let links = storage.get_links( 900 "a.com", 901 "app.t.c", 902 ".abc.uri", 903 Order::OldestToNewest, 904 2, 905 None, 906 &HashSet::default(), 907 )?; 908 assert_eq!( 909 links, 910 PagedAppendingCollection { 911 version: (5, 0), 912 items: vec![ 913 RecordId { 914 did: "did:plc:asdf-1".into(), 915 collection: "app.t.c".into(), 916 rkey: "asdf".into(), 917 }, 918 RecordId { 919 did: "did:plc:asdf-2".into(), 920 collection: "app.t.c".into(), 921 rkey: "asdf".into(), 922 }, 923 ], 924 next: Some(3), 925 total: 5, 926 } 927 ); 928 // Test NewestToOldest order (newest first) 929 let links = storage.get_links( 930 "a.com", 931 "app.t.c", 932 ".abc.uri", 933 Order::NewestToOldest, 934 2, 935 None, 936 &HashSet::default(), 937 )?; 938 assert_eq!( 939 links, 940 PagedAppendingCollection { 941 version: (5, 0), 942 items: vec![ 943 RecordId { 944 did: "did:plc:asdf-5".into(), 945 collection: "app.t.c".into(), 946 rkey: "asdf".into(), 947 }, 948 RecordId { 949 did: "did:plc:asdf-4".into(), 950 collection: "app.t.c".into(), 951 rkey: "asdf".into(), 952 }, 953 ], 954 next: Some(3), 955 total: 5, 956 } 957 ); 958 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 959 }); 960 961 test_each_storage!(get_filtered_links, |storage| { 962 let links = storage.get_links( 963 "a.com", 964 "app.t.c", 965 ".abc.uri", 966 Order::NewestToOldest, 967 2, 968 None, 969 &HashSet::from([Did("did:plc:linker".to_string())]), 970 )?; 971 assert_eq!(links, PagedAppendingCollection::empty()); 972 973 storage.push( 974 &ActionableEvent::CreateLinks { 975 record_id: RecordId { 976 did: "did:plc:linker".into(), 977 collection: "app.t.c".into(), 978 rkey: "asdf".into(), 979 }, 980 links: vec![CollectedLink { 981 target: Link::Uri("a.com".into()), 982 path: ".abc.uri".into(), 983 }], 984 }, 985 0, 986 )?; 987 988 let links = storage.get_links( 989 "a.com", 990 "app.t.c", 991 ".abc.uri", 992 Order::NewestToOldest, 993 2, 994 None, 995 &HashSet::from([Did("did:plc:linker".to_string())]), 996 )?; 997 assert_eq!( 998 links, 999 PagedAppendingCollection { 1000 version: (1, 0), 1001 items: vec![RecordId { 1002 did: "did:plc:linker".into(), 1003 collection: "app.t.c".into(), 1004 rkey: "asdf".into(), 1005 },], 1006 next: None, 1007 total: 1, 1008 } 1009 ); 1010 1011 let links = storage.get_links( 1012 "a.com", 1013 "app.t.c", 1014 ".abc.uri", 1015 Order::NewestToOldest, 1016 2, 1017 None, 1018 &HashSet::from([Did("did:plc:someone-else".to_string())]), 1019 )?; 1020 assert_eq!(links, PagedAppendingCollection::empty()); 1021 1022 storage.push( 1023 &ActionableEvent::CreateLinks { 1024 record_id: RecordId { 1025 did: "did:plc:linker".into(), 1026 collection: "app.t.c".into(), 1027 rkey: "asdf-2".into(), 1028 }, 1029 links: vec![CollectedLink { 1030 target: Link::Uri("a.com".into()), 1031 path: ".abc.uri".into(), 1032 }], 1033 }, 1034 0, 1035 )?; 1036 storage.push( 1037 &ActionableEvent::CreateLinks { 1038 record_id: RecordId { 1039 did: "did:plc:someone-else".into(), 1040 collection: "app.t.c".into(), 1041 rkey: "asdf".into(), 1042 }, 1043 links: vec![CollectedLink { 1044 target: Link::Uri("a.com".into()), 1045 path: ".abc.uri".into(), 1046 }], 1047 }, 1048 0, 1049 )?; 1050 1051 let links = storage.get_links( 1052 "a.com", 1053 "app.t.c", 1054 ".abc.uri", 1055 Order::NewestToOldest, 1056 2, 1057 None, 1058 &HashSet::from([Did("did:plc:linker".to_string())]), 1059 )?; 1060 assert_eq!( 1061 links, 1062 PagedAppendingCollection { 1063 version: (2, 0), 1064 items: vec![ 1065 RecordId { 1066 did: "did:plc:linker".into(), 1067 collection: "app.t.c".into(), 1068 rkey: "asdf-2".into(), 1069 }, 1070 RecordId { 1071 did: "did:plc:linker".into(), 1072 collection: "app.t.c".into(), 1073 rkey: "asdf".into(), 1074 }, 1075 ], 1076 next: None, 1077 total: 2, 1078 } 1079 ); 1080 1081 let links = storage.get_links( 1082 "a.com", 1083 "app.t.c", 1084 ".abc.uri", 1085 Order::NewestToOldest, 1086 2, 1087 None, 1088 &HashSet::from([ 1089 Did("did:plc:linker".to_string()), 1090 Did("did:plc:someone-else".to_string()), 1091 ]), 1092 )?; 1093 assert_eq!( 1094 links, 1095 PagedAppendingCollection { 1096 version: (3, 0), 1097 items: vec![ 1098 RecordId { 1099 did: "did:plc:someone-else".into(), 1100 collection: "app.t.c".into(), 1101 rkey: "asdf".into(), 1102 }, 1103 RecordId { 1104 did: "did:plc:linker".into(), 1105 collection: "app.t.c".into(), 1106 rkey: "asdf-2".into(), 1107 }, 1108 ], 1109 next: Some(1), 1110 total: 3, 1111 } 1112 ); 1113 1114 let links = storage.get_links( 1115 "a.com", 1116 "app.t.c", 1117 ".abc.uri", 1118 Order::NewestToOldest, 1119 2, 1120 None, 1121 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), 1122 )?; 1123 assert_eq!(links, PagedAppendingCollection::empty()); 1124 }); 1125 1126 test_each_storage!(get_links_exact_multiple, |storage| { 1127 for i in 1..=4 { 1128 storage.push( 1129 &ActionableEvent::CreateLinks { 1130 record_id: RecordId { 1131 did: format!("did:plc:asdf-{i}").into(), 1132 collection: "app.t.c".into(), 1133 rkey: "asdf".into(), 1134 }, 1135 links: vec![CollectedLink { 1136 target: Link::Uri("a.com".into()), 1137 path: ".abc.uri".into(), 1138 }], 1139 }, 1140 0, 1141 )?; 1142 } 1143 let links = storage.get_links( 1144 "a.com", 1145 "app.t.c", 1146 ".abc.uri", 1147 Order::NewestToOldest, 1148 2, 1149 None, 1150 &HashSet::default(), 1151 )?; 1152 assert_eq!( 1153 links, 1154 PagedAppendingCollection { 1155 version: (4, 0), 1156 items: vec![ 1157 RecordId { 1158 did: "did:plc:asdf-4".into(), 1159 collection: "app.t.c".into(), 1160 rkey: "asdf".into(), 1161 }, 1162 RecordId { 1163 did: "did:plc:asdf-3".into(), 1164 collection: "app.t.c".into(), 1165 rkey: "asdf".into(), 1166 }, 1167 ], 1168 next: Some(2), 1169 total: 4, 1170 } 1171 ); 1172 let links = storage.get_links( 1173 "a.com", 1174 "app.t.c", 1175 ".abc.uri", 1176 Order::NewestToOldest, 1177 2, 1178 links.next, 1179 &HashSet::default(), 1180 )?; 1181 assert_eq!( 1182 links, 1183 PagedAppendingCollection { 1184 version: (4, 0), 1185 items: vec![ 1186 RecordId { 1187 did: "did:plc:asdf-2".into(), 1188 collection: "app.t.c".into(), 1189 rkey: "asdf".into(), 1190 }, 1191 RecordId { 1192 did: "did:plc:asdf-1".into(), 1193 collection: "app.t.c".into(), 1194 rkey: "asdf".into(), 1195 }, 1196 ], 1197 next: None, 1198 total: 4, 1199 } 1200 ); 1201 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1202 }); 1203 1204 test_each_storage!(page_links_while_new_links_arrive, |storage| { 1205 for i in 1..=4 { 1206 storage.push( 1207 &ActionableEvent::CreateLinks { 1208 record_id: RecordId { 1209 did: format!("did:plc:asdf-{i}").into(), 1210 collection: "app.t.c".into(), 1211 rkey: "asdf".into(), 1212 }, 1213 links: vec![CollectedLink { 1214 target: Link::Uri("a.com".into()), 1215 path: ".abc.uri".into(), 1216 }], 1217 }, 1218 0, 1219 )?; 1220 } 1221 let links = storage.get_links( 1222 "a.com", 1223 "app.t.c", 1224 ".abc.uri", 1225 Order::NewestToOldest, 1226 2, 1227 None, 1228 &HashSet::default(), 1229 )?; 1230 assert_eq!( 1231 links, 1232 PagedAppendingCollection { 1233 version: (4, 0), 1234 items: vec![ 1235 RecordId { 1236 did: "did:plc:asdf-4".into(), 1237 collection: "app.t.c".into(), 1238 rkey: "asdf".into(), 1239 }, 1240 RecordId { 1241 did: "did:plc:asdf-3".into(), 1242 collection: "app.t.c".into(), 1243 rkey: "asdf".into(), 1244 }, 1245 ], 1246 next: Some(2), 1247 total: 4, 1248 } 1249 ); 1250 storage.push( 1251 &ActionableEvent::CreateLinks { 1252 record_id: RecordId { 1253 did: "did:plc:asdf-5".into(), 1254 collection: "app.t.c".into(), 1255 rkey: "asdf".into(), 1256 }, 1257 links: vec![CollectedLink { 1258 target: Link::Uri("a.com".into()), 1259 path: ".abc.uri".into(), 1260 }], 1261 }, 1262 0, 1263 )?; 1264 let links = storage.get_links( 1265 "a.com", 1266 "app.t.c", 1267 ".abc.uri", 1268 Order::NewestToOldest, 1269 2, 1270 links.next, 1271 &HashSet::default(), 1272 )?; 1273 assert_eq!( 1274 links, 1275 PagedAppendingCollection { 1276 version: (5, 0), 1277 items: vec![ 1278 RecordId { 1279 did: "did:plc:asdf-2".into(), 1280 collection: "app.t.c".into(), 1281 rkey: "asdf".into(), 1282 }, 1283 RecordId { 1284 did: "did:plc:asdf-1".into(), 1285 collection: "app.t.c".into(), 1286 rkey: "asdf".into(), 1287 }, 1288 ], 1289 next: None, 1290 total: 5, 1291 } 1292 ); 1293 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 1294 }); 1295 1296 test_each_storage!(page_links_while_some_are_deleted, |storage| { 1297 for i in 1..=4 { 1298 storage.push( 1299 &ActionableEvent::CreateLinks { 1300 record_id: RecordId { 1301 did: format!("did:plc:asdf-{i}").into(), 1302 collection: "app.t.c".into(), 1303 rkey: "asdf".into(), 1304 }, 1305 links: vec![CollectedLink { 1306 target: Link::Uri("a.com".into()), 1307 path: ".abc.uri".into(), 1308 }], 1309 }, 1310 0, 1311 )?; 1312 } 1313 let links = storage.get_links( 1314 "a.com", 1315 "app.t.c", 1316 ".abc.uri", 1317 Order::NewestToOldest, 1318 2, 1319 None, 1320 &HashSet::default(), 1321 )?; 1322 assert_eq!( 1323 links, 1324 PagedAppendingCollection { 1325 version: (4, 0), 1326 items: vec![ 1327 RecordId { 1328 did: "did:plc:asdf-4".into(), 1329 collection: "app.t.c".into(), 1330 rkey: "asdf".into(), 1331 }, 1332 RecordId { 1333 did: "did:plc:asdf-3".into(), 1334 collection: "app.t.c".into(), 1335 rkey: "asdf".into(), 1336 }, 1337 ], 1338 next: Some(2), 1339 total: 4, 1340 } 1341 ); 1342 storage.push( 1343 &ActionableEvent::DeleteRecord(RecordId { 1344 did: "did:plc:asdf-2".into(), 1345 collection: "app.t.c".into(), 1346 rkey: "asdf".into(), 1347 }), 1348 0, 1349 )?; 1350 let links = storage.get_links( 1351 "a.com", 1352 "app.t.c", 1353 ".abc.uri", 1354 Order::NewestToOldest, 1355 2, 1356 links.next, 1357 &HashSet::default(), 1358 )?; 1359 assert_eq!( 1360 links, 1361 PagedAppendingCollection { 1362 version: (4, 1), 1363 items: vec![RecordId { 1364 did: "did:plc:asdf-1".into(), 1365 collection: "app.t.c".into(), 1366 rkey: "asdf".into(), 1367 },], 1368 next: None, 1369 total: 3, 1370 } 1371 ); 1372 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3); 1373 }); 1374 1375 test_each_storage!(page_links_accounts_inactive, |storage| { 1376 for i in 1..=4 { 1377 storage.push( 1378 &ActionableEvent::CreateLinks { 1379 record_id: RecordId { 1380 did: format!("did:plc:asdf-{i}").into(), 1381 collection: "app.t.c".into(), 1382 rkey: "asdf".into(), 1383 }, 1384 links: vec![CollectedLink { 1385 target: Link::Uri("a.com".into()), 1386 path: ".abc.uri".into(), 1387 }], 1388 }, 1389 0, 1390 )?; 1391 } 1392 let links = storage.get_links( 1393 "a.com", 1394 "app.t.c", 1395 ".abc.uri", 1396 Order::NewestToOldest, 1397 2, 1398 None, 1399 &HashSet::default(), 1400 )?; 1401 assert_eq!( 1402 links, 1403 PagedAppendingCollection { 1404 version: (4, 0), 1405 items: vec![ 1406 RecordId { 1407 did: "did:plc:asdf-4".into(), 1408 collection: "app.t.c".into(), 1409 rkey: "asdf".into(), 1410 }, 1411 RecordId { 1412 did: "did:plc:asdf-3".into(), 1413 collection: "app.t.c".into(), 1414 rkey: "asdf".into(), 1415 }, 1416 ], 1417 next: Some(2), 1418 total: 4, 1419 } 1420 ); 1421 storage.push( 1422 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()), 1423 0, 1424 )?; 1425 let links = storage.get_links( 1426 "a.com", 1427 "app.t.c", 1428 ".abc.uri", 1429 Order::NewestToOldest, 1430 2, 1431 links.next, 1432 &HashSet::default(), 1433 )?; 1434 assert_eq!( 1435 links, 1436 PagedAppendingCollection { 1437 version: (4, 0), 1438 items: vec![RecordId { 1439 did: "did:plc:asdf-2".into(), 1440 collection: "app.t.c".into(), 1441 rkey: "asdf".into(), 1442 },], 1443 next: None, 1444 total: 4, 1445 } 1446 ); 1447 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1448 }); 1449 1450 test_each_storage!(get_all_counts, |storage| { 1451 storage.push( 1452 &ActionableEvent::CreateLinks { 1453 record_id: RecordId { 1454 did: "did:plc:asdf".into(), 1455 collection: "app.t.c".into(), 1456 rkey: "asdf".into(), 1457 }, 1458 links: vec![ 1459 CollectedLink { 1460 target: Link::Uri("a.com".into()), 1461 path: ".abc.uri".into(), 1462 }, 1463 CollectedLink { 1464 target: Link::Uri("a.com".into()), 1465 path: ".def.uri".into(), 1466 }, 1467 ], 1468 }, 1469 0, 1470 )?; 1471 assert_eq!(storage.get_all_record_counts("a.com")?, { 1472 let mut counts = HashMap::new(); 1473 let mut t_c_counts = HashMap::new(); 1474 t_c_counts.insert(".abc.uri".into(), 1); 1475 t_c_counts.insert(".def.uri".into(), 1); 1476 counts.insert("app.t.c".into(), t_c_counts); 1477 counts 1478 }); 1479 assert_eq!(storage.get_all_counts("a.com")?, { 1480 let mut counts = HashMap::new(); 1481 let mut t_c_counts = HashMap::new(); 1482 t_c_counts.insert( 1483 ".abc.uri".into(), 1484 CountsByCount { 1485 records: 1, 1486 distinct_dids: 1, 1487 }, 1488 ); 1489 t_c_counts.insert( 1490 ".def.uri".into(), 1491 CountsByCount { 1492 records: 1, 1493 distinct_dids: 1, 1494 }, 1495 ); 1496 counts.insert("app.t.c".into(), t_c_counts); 1497 counts 1498 }); 1499 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1); 1500 }); 1501 1502 //////// many-to-many ///////// 1503 1504 test_each_storage!(get_m2m_counts_empty, |storage| { 1505 assert_eq!( 1506 storage.get_many_to_many_counts( 1507 "a.com", 1508 "a.b.c", 1509 ".d.e", 1510 ".f.g", 1511 10, 1512 None, 1513 &HashSet::new(), 1514 &HashSet::new(), 1515 )?, 1516 PagedOrderedCollection::empty() 1517 ); 1518 }); 1519 1520 test_each_storage!(get_m2m_counts_single, |storage| { 1521 storage.push( 1522 &ActionableEvent::CreateLinks { 1523 record_id: RecordId { 1524 did: "did:plc:asdf".into(), 1525 collection: "app.t.c".into(), 1526 rkey: "asdf".into(), 1527 }, 1528 links: vec![ 1529 CollectedLink { 1530 target: Link::Uri("a.com".into()), 1531 path: ".abc.uri".into(), 1532 }, 1533 CollectedLink { 1534 target: Link::Uri("b.com".into()), 1535 path: ".def.uri".into(), 1536 }, 1537 CollectedLink { 1538 target: Link::Uri("b.com".into()), 1539 path: ".ghi.uri".into(), 1540 }, 1541 ], 1542 }, 1543 0, 1544 )?; 1545 assert_eq!( 1546 storage.get_many_to_many_counts( 1547 "a.com", 1548 "app.t.c", 1549 ".abc.uri", 1550 ".def.uri", 1551 10, 1552 None, 1553 &HashSet::new(), 1554 &HashSet::new(), 1555 )?, 1556 PagedOrderedCollection { 1557 items: vec![("b.com".to_string(), 1, 1)], 1558 next: None, 1559 } 1560 ); 1561 }); 1562 1563 test_each_storage!(get_m2m_counts_filters, |storage| { 1564 storage.push( 1565 &ActionableEvent::CreateLinks { 1566 record_id: RecordId { 1567 did: "did:plc:asdf".into(), 1568 collection: "app.t.c".into(), 1569 rkey: "asdf".into(), 1570 }, 1571 links: vec![ 1572 CollectedLink { 1573 target: Link::Uri("a.com".into()), 1574 path: ".abc.uri".into(), 1575 }, 1576 CollectedLink { 1577 target: Link::Uri("b.com".into()), 1578 path: ".def.uri".into(), 1579 }, 1580 ], 1581 }, 1582 0, 1583 )?; 1584 storage.push( 1585 &ActionableEvent::CreateLinks { 1586 record_id: RecordId { 1587 did: "did:plc:asdfasdf".into(), 1588 collection: "app.t.c".into(), 1589 rkey: "asdf".into(), 1590 }, 1591 links: vec![ 1592 CollectedLink { 1593 target: Link::Uri("a.com".into()), 1594 path: ".abc.uri".into(), 1595 }, 1596 CollectedLink { 1597 target: Link::Uri("b.com".into()), 1598 path: ".def.uri".into(), 1599 }, 1600 ], 1601 }, 1602 1, 1603 )?; 1604 storage.push( 1605 &ActionableEvent::CreateLinks { 1606 record_id: RecordId { 1607 did: "did:plc:fdsa".into(), 1608 collection: "app.t.c".into(), 1609 rkey: "asdf".into(), 1610 }, 1611 links: vec![ 1612 CollectedLink { 1613 target: Link::Uri("a.com".into()), 1614 path: ".abc.uri".into(), 1615 }, 1616 CollectedLink { 1617 target: Link::Uri("c.com".into()), 1618 path: ".def.uri".into(), 1619 }, 1620 ], 1621 }, 1622 2, 1623 )?; 1624 storage.push( 1625 &ActionableEvent::CreateLinks { 1626 record_id: RecordId { 1627 did: "did:plc:fdsa".into(), 1628 collection: "app.t.c".into(), 1629 rkey: "asdf2".into(), 1630 }, 1631 links: vec![ 1632 CollectedLink { 1633 target: Link::Uri("a.com".into()), 1634 path: ".abc.uri".into(), 1635 }, 1636 CollectedLink { 1637 target: Link::Uri("c.com".into()), 1638 path: ".def.uri".into(), 1639 }, 1640 ], 1641 }, 1642 3, 1643 )?; 1644 assert_eq!( 1645 storage.get_many_to_many_counts( 1646 "a.com", 1647 "app.t.c", 1648 ".abc.uri", 1649 ".def.uri", 1650 10, 1651 None, 1652 &HashSet::new(), 1653 &HashSet::new(), 1654 )?, 1655 PagedOrderedCollection { 1656 items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),], 1657 next: None, 1658 } 1659 ); 1660 assert_eq!( 1661 storage.get_many_to_many_counts( 1662 "a.com", 1663 "app.t.c", 1664 ".abc.uri", 1665 ".def.uri", 1666 10, 1667 None, 1668 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), 1669 &HashSet::new(), 1670 )?, 1671 PagedOrderedCollection { 1672 items: vec![("c.com".to_string(), 2, 1),], 1673 next: None, 1674 } 1675 ); 1676 assert_eq!( 1677 storage.get_many_to_many_counts( 1678 "a.com", 1679 "app.t.c", 1680 ".abc.uri", 1681 ".def.uri", 1682 10, 1683 None, 1684 &HashSet::new(), 1685 &HashSet::from_iter(["b.com".to_string()]), 1686 )?, 1687 PagedOrderedCollection { 1688 items: vec![("b.com".to_string(), 2, 2),], 1689 next: None, 1690 } 1691 ); 1692 1693 // Pagination edge cases: we have 2 grouped results (b.com and c.com) 1694 1695 // Case 1: limit > items (limit=10, items=2) -> next should be None 1696 let result = storage.get_many_to_many_counts( 1697 "a.com", 1698 "app.t.c", 1699 ".abc.uri", 1700 ".def.uri", 1701 10, 1702 None, 1703 &HashSet::new(), 1704 &HashSet::new(), 1705 )?; 1706 assert_eq!(result.items.len(), 2); 1707 assert_eq!(result.next, None, "next should be None when items < limit"); 1708 1709 // Case 2: limit == items (limit=2, items=2) -> next should be None 1710 let result = storage.get_many_to_many_counts( 1711 "a.com", 1712 "app.t.c", 1713 ".abc.uri", 1714 ".def.uri", 1715 2, 1716 None, 1717 &HashSet::new(), 1718 &HashSet::new(), 1719 )?; 1720 assert_eq!(result.items.len(), 2); 1721 assert_eq!( 1722 result.next, None, 1723 "next should be None when items == limit (no more pages)" 1724 ); 1725 1726 // Case 3: limit < items (limit=1, items=2) -> next should be Some 1727 let result = storage.get_many_to_many_counts( 1728 "a.com", 1729 "app.t.c", 1730 ".abc.uri", 1731 ".def.uri", 1732 1, 1733 None, 1734 &HashSet::new(), 1735 &HashSet::new(), 1736 )?; 1737 assert_eq!(result.items.len(), 1); 1738 assert!( 1739 result.next.is_some(), 1740 "next should be Some when items > limit" 1741 ); 1742 1743 // Verify second page returns remaining item with no cursor 1744 let result2 = storage.get_many_to_many_counts( 1745 "a.com", 1746 "app.t.c", 1747 ".abc.uri", 1748 ".def.uri", 1749 1, 1750 result.next, 1751 &HashSet::new(), 1752 &HashSet::new(), 1753 )?; 1754 assert_eq!(result2.items.len(), 1); 1755 assert_eq!(result2.next, None, "next should be None on final page"); 1756 }); 1757 1758 test_each_storage!(get_m2m_empty, |storage| { 1759 assert_eq!( 1760 storage.get_many_to_many( 1761 "a.com", 1762 "a.b.c", 1763 ".d.e", 1764 ".f.g", 1765 10, 1766 None, 1767 &HashSet::new(), 1768 &HashSet::new(), 1769 )?, 1770 PagedOrderedCollection { 1771 items: vec![], 1772 next: None, 1773 } 1774 ); 1775 }); 1776 1777 test_each_storage!(get_m2m_single, |storage| { 1778 // One record linking to a.com (backward), with two forward links at 1779 // the same path_to_other (.def.uri) pointing to b.com and c.com. 1780 // Both forward targets must appear in the output. 1781 storage.push( 1782 &ActionableEvent::CreateLinks { 1783 record_id: RecordId { 1784 did: "did:plc:asdf".into(), 1785 collection: "app.t.c".into(), 1786 rkey: "asdf".into(), 1787 }, 1788 links: vec![ 1789 CollectedLink { 1790 target: Link::Uri("a.com".into()), 1791 path: ".abc.uri".into(), 1792 }, 1793 CollectedLink { 1794 target: Link::Uri("b.com".into()), 1795 path: ".def.uri".into(), 1796 }, 1797 CollectedLink { 1798 target: Link::Uri("c.com".into()), 1799 path: ".def.uri".into(), 1800 }, 1801 ], 1802 }, 1803 0, 1804 )?; 1805 let result = storage.get_many_to_many( 1806 "a.com", 1807 "app.t.c", 1808 ".abc.uri", 1809 ".def.uri", 1810 10, 1811 None, 1812 &HashSet::new(), 1813 &HashSet::new(), 1814 )?; 1815 assert_eq!( 1816 result.items.len(), 1817 2, 1818 "both forward links at path_to_other should be emitted" 1819 ); 1820 let mut targets: Vec<_> = result 1821 .items 1822 .iter() 1823 .map(|item| item.other_subject.as_str()) 1824 .collect(); 1825 targets.sort(); 1826 assert_eq!(targets, vec!["b.com", "c.com"]); 1827 assert!(result 1828 .items 1829 .iter() 1830 .all(|item| item.link_record.uri() == "at://did:plc:asdf/app.t.c/asdf")); 1831 assert_eq!(result.next, None); 1832 }); 1833 1834 test_each_storage!(get_m2m_filters, |storage| { 1835 storage.push( 1836 &ActionableEvent::CreateLinks { 1837 record_id: RecordId { 1838 did: "did:plc:asdf".into(), 1839 collection: "app.t.c".into(), 1840 rkey: "asdf".into(), 1841 }, 1842 links: vec![ 1843 CollectedLink { 1844 target: Link::Uri("a.com".into()), 1845 path: ".abc.uri".into(), 1846 }, 1847 CollectedLink { 1848 target: Link::Uri("b.com".into()), 1849 path: ".def.uri".into(), 1850 }, 1851 ], 1852 }, 1853 0, 1854 )?; 1855 storage.push( 1856 &ActionableEvent::CreateLinks { 1857 record_id: RecordId { 1858 did: "did:plc:asdf".into(), 1859 collection: "app.t.c".into(), 1860 rkey: "asdf2".into(), 1861 }, 1862 links: vec![ 1863 CollectedLink { 1864 target: Link::Uri("a.com".into()), 1865 path: ".abc.uri".into(), 1866 }, 1867 CollectedLink { 1868 target: Link::Uri("b.com".into()), 1869 path: ".def.uri".into(), 1870 }, 1871 ], 1872 }, 1873 1, 1874 )?; 1875 storage.push( 1876 &ActionableEvent::CreateLinks { 1877 record_id: RecordId { 1878 did: "did:plc:fdsa".into(), 1879 collection: "app.t.c".into(), 1880 rkey: "fdsa".into(), 1881 }, 1882 links: vec![ 1883 CollectedLink { 1884 target: Link::Uri("a.com".into()), 1885 path: ".abc.uri".into(), 1886 }, 1887 CollectedLink { 1888 target: Link::Uri("c.com".into()), 1889 path: ".def.uri".into(), 1890 }, 1891 ], 1892 }, 1893 2, 1894 )?; 1895 storage.push( 1896 &ActionableEvent::CreateLinks { 1897 record_id: RecordId { 1898 did: "did:plc:fdsa".into(), 1899 collection: "app.t.c".into(), 1900 rkey: "fdsa2".into(), 1901 }, 1902 links: vec![ 1903 CollectedLink { 1904 target: Link::Uri("a.com".into()), 1905 path: ".abc.uri".into(), 1906 }, 1907 CollectedLink { 1908 target: Link::Uri("c.com".into()), 1909 path: ".def.uri".into(), 1910 }, 1911 ], 1912 }, 1913 3, 1914 )?; 1915 1916 // Test without filters - should get all records as flat items 1917 let result = storage.get_many_to_many( 1918 "a.com", 1919 "app.t.c", 1920 ".abc.uri", 1921 ".def.uri", 1922 10, 1923 None, 1924 &HashSet::new(), 1925 &HashSet::new(), 1926 )?; 1927 assert_eq!(result.items.len(), 4); 1928 assert_eq!(result.next, None); 1929 // Check b.com items 1930 let b_items: Vec<_> = result 1931 .items 1932 .iter() 1933 .filter(|item| item.other_subject == "b.com") 1934 .collect(); 1935 assert_eq!(b_items.len(), 2); 1936 assert!(b_items.iter().any( 1937 |item| item.link_record.did.0 == "did:plc:asdf" && item.link_record.rkey == "asdf" 1938 )); 1939 assert!(b_items.iter().any( 1940 |item| item.link_record.did.0 == "did:plc:asdf" && item.link_record.rkey == "asdf2" 1941 )); 1942 // Check c.com items 1943 let c_items: Vec<_> = result 1944 .items 1945 .iter() 1946 .filter(|item| item.other_subject == "c.com") 1947 .collect(); 1948 assert_eq!(c_items.len(), 2); 1949 assert!(c_items.iter().any( 1950 |item| item.link_record.did.0 == "did:plc:fdsa" && item.link_record.rkey == "fdsa" 1951 )); 1952 assert!(c_items.iter().any( 1953 |item| item.link_record.did.0 == "did:plc:fdsa" && item.link_record.rkey == "fdsa2" 1954 )); 1955 1956 // Test with DID filter - should only get records from did:plc:fdsa 1957 let result = storage.get_many_to_many( 1958 "a.com", 1959 "app.t.c", 1960 ".abc.uri", 1961 ".def.uri", 1962 10, 1963 None, 1964 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), 1965 &HashSet::new(), 1966 )?; 1967 assert_eq!(result.items.len(), 2); 1968 assert!(result 1969 .items 1970 .iter() 1971 .all(|item| item.other_subject == "c.com")); 1972 assert!(result 1973 .items 1974 .iter() 1975 .all(|item| item.link_record.did.0 == "did:plc:fdsa")); 1976 1977 // Test with target filter - should only get records linking to b.com 1978 let result = storage.get_many_to_many( 1979 "a.com", 1980 "app.t.c", 1981 ".abc.uri", 1982 ".def.uri", 1983 10, 1984 None, 1985 &HashSet::new(), 1986 &HashSet::from_iter(["b.com".to_string()]), 1987 )?; 1988 assert_eq!(result.items.len(), 2); 1989 assert!(result 1990 .items 1991 .iter() 1992 .all(|item| item.other_subject == "b.com")); 1993 assert!(result 1994 .items 1995 .iter() 1996 .all(|item| item.link_record.did.0 == "did:plc:asdf")); 1997 1998 // Pagination edge cases: we have 4 flat items 1999 2000 // Case 1: limit > items (limit=10, items=4) -> next should be None 2001 let result = storage.get_many_to_many( 2002 "a.com", 2003 "app.t.c", 2004 ".abc.uri", 2005 ".def.uri", 2006 10, 2007 None, 2008 &HashSet::new(), 2009 &HashSet::new(), 2010 )?; 2011 assert_eq!(result.items.len(), 4); 2012 assert_eq!(result.next, None, "next should be None when items < limit"); 2013 2014 // Case 2: limit == items (limit=4, items=4) -> next should be None 2015 let result = storage.get_many_to_many( 2016 "a.com", 2017 "app.t.c", 2018 ".abc.uri", 2019 ".def.uri", 2020 4, 2021 None, 2022 &HashSet::new(), 2023 &HashSet::new(), 2024 )?; 2025 assert_eq!(result.items.len(), 4); 2026 assert_eq!( 2027 result.next, None, 2028 "next should be None when items == limit (no more pages)" 2029 ); 2030 2031 // Case 3: limit < items (limit=3, items=4) -> next should be Some 2032 let result = storage.get_many_to_many( 2033 "a.com", 2034 "app.t.c", 2035 ".abc.uri", 2036 ".def.uri", 2037 3, 2038 None, 2039 &HashSet::new(), 2040 &HashSet::new(), 2041 )?; 2042 assert_eq!(result.items.len(), 3); 2043 assert!( 2044 result.next.is_some(), 2045 "next should be Some when items > limit" 2046 ); 2047 2048 // Verify second page returns remaining item with no cursor. 2049 // This now works correctly because we use a composite cursor that includes 2050 // (target, did, rkey), allowing pagination even when multiple records share 2051 // the same target string. 2052 let result2 = storage.get_many_to_many( 2053 "a.com", 2054 "app.t.c", 2055 ".abc.uri", 2056 ".def.uri", 2057 3, 2058 result.next, 2059 &HashSet::new(), 2060 &HashSet::new(), 2061 )?; 2062 assert_eq!( 2063 result2.items.len(), 2064 1, 2065 "second page should have 1 remaining item" 2066 ); 2067 assert_eq!(result2.next, None, "next should be None on final page"); 2068 2069 // Verify we got all 4 unique items across both pages (no duplicates, no gaps) 2070 let mut all_rkeys: Vec<_> = result 2071 .items 2072 .iter() 2073 .map(|item| item.link_record.rkey.clone()) 2074 .collect(); 2075 all_rkeys.extend( 2076 result2 2077 .items 2078 .iter() 2079 .map(|item| item.link_record.rkey.clone()), 2080 ); 2081 all_rkeys.sort(); 2082 assert_eq!( 2083 all_rkeys, 2084 vec!["asdf", "asdf2", "fdsa", "fdsa2"], 2085 "should have all 4 records across both pages" 2086 ); 2087 }); 2088 2089 // Pagination that splits across forward links within a single backlinker. 2090 // The cursor should correctly resume mid-record on the next page. 2091 test_each_storage!(get_m2m_paginate_within_forward_links, |storage| { 2092 // Record with 1 backward link and 3 forward links at the same path 2093 storage.push( 2094 &ActionableEvent::CreateLinks { 2095 record_id: RecordId { 2096 did: "did:plc:lister".into(), 2097 collection: "app.t.c".into(), 2098 rkey: "list1".into(), 2099 }, 2100 links: vec![ 2101 CollectedLink { 2102 target: Link::Uri("a.com".into()), 2103 path: ".subject.uri".into(), 2104 }, 2105 CollectedLink { 2106 target: Link::Uri("x.com".into()), 2107 path: ".items[].uri".into(), 2108 }, 2109 CollectedLink { 2110 target: Link::Uri("y.com".into()), 2111 path: ".items[].uri".into(), 2112 }, 2113 CollectedLink { 2114 target: Link::Uri("z.com".into()), 2115 path: ".items[].uri".into(), 2116 }, 2117 ], 2118 }, 2119 0, 2120 )?; 2121 2122 // Page 1: limit=2, should get 2 of the 3 forward links 2123 let page1 = storage.get_many_to_many( 2124 "a.com", 2125 "app.t.c", 2126 ".subject.uri", 2127 ".items[].uri", 2128 2, 2129 None, 2130 &HashSet::new(), 2131 &HashSet::new(), 2132 )?; 2133 assert_eq!(page1.items.len(), 2, "first page should have 2 items"); 2134 assert!( 2135 page1.next.is_some(), 2136 "should have a next cursor for remaining item" 2137 ); 2138 2139 // Page 2: should get the remaining 1 forward link 2140 let page2 = storage.get_many_to_many( 2141 "a.com", 2142 "app.t.c", 2143 ".subject.uri", 2144 ".items[].uri", 2145 2, 2146 page1.next, 2147 &HashSet::new(), 2148 &HashSet::new(), 2149 )?; 2150 assert_eq!(page2.items.len(), 1, "second page should have 1 item"); 2151 assert_eq!(page2.next, None, "no more pages"); 2152 2153 // Verify all 3 targets appear across pages with no duplicates 2154 let mut all_targets: Vec<_> = page1 2155 .items 2156 .iter() 2157 .chain(page2.items.iter()) 2158 .map(|item| item.other_subject.clone()) 2159 .collect(); 2160 all_targets.sort(); 2161 assert_eq!( 2162 all_targets, 2163 vec!["x.com", "y.com", "z.com"], 2164 "all forward targets should appear exactly once across pages" 2165 ); 2166 }); 2167}