Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, RecordId};
2use anyhow::Result;
3use serde::{Deserialize, Serialize};
4use std::collections::{HashMap, HashSet};
5
6pub mod mem_store;
7pub use mem_store::MemStorage;
8
9#[cfg(feature = "rocks")]
10pub mod rocks_store;
11#[cfg(feature = "rocks")]
12pub use rocks_store::RocksStorage;
13
14/// Ordering for paginated link queries
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum Order {
17 /// Newest links first (default)
18 NewestToOldest,
19 /// Oldest links first
20 OldestToNewest,
21}
22
23#[derive(Debug, Default, PartialEq)]
24pub struct PagedAppendingCollection<T> {
25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted"
26 pub items: Vec<T>,
27 pub next: Option<u64>,
28 pub total: u64,
29}
30
31impl<T> PagedAppendingCollection<T> {
32 pub(crate) fn empty() -> Self {
33 Self {
34 version: (0, 0),
35 items: Vec::new(),
36 next: None,
37 total: 0,
38 }
39 }
40}
41
42/// A paged collection whose keys are sorted instead of indexed
43///
44/// this has weaker guarantees than PagedAppendingCollection: it might
45/// return a totally consistent snapshot. but it should avoid duplicates
46/// and each page should at least be internally consistent.
47#[derive(Debug, PartialEq)]
48pub struct PagedOrderedCollection<T, K: Ord> {
49 pub items: Vec<T>,
50 pub next: Option<K>,
51}
52
53impl<T, K: Ord> PagedOrderedCollection<T, K> {
54 pub(crate) fn empty() -> Self {
55 Self {
56 items: Vec::new(),
57 next: None,
58 }
59 }
60}
61
62#[derive(Debug, Deserialize, Serialize, PartialEq)]
63pub struct StorageStats {
64 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
65 /// for example: new user A follows users B and C. this count will only increment by one, for A.
66 pub dids: u64,
67
68 /// estimate targets * distinct (collection, path)s to reference them.
69 /// distinct targets alone are currently challenging to estimate.
70 pub targetables: u64,
71
72 /// estimate of the count of atproto records seen that contain links.
73 /// records with multiple links are single-counted.
74 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
75 pub linking_records: u64,
76
77 /// first jetstream cursor when this instance first started
78 pub started_at: Option<u64>,
79
80 /// anything else we want to throw in
81 pub other_data: HashMap<String, u64>,
82}
83
84pub trait LinkStorage: Send + Sync {
85 /// jetstream cursor from last saved actions, if available
86 fn get_cursor(&mut self) -> Result<Option<u64>> {
87 Ok(None)
88 }
89
90 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>;
91
92 // readers are off from the writer instance
93 fn to_readable(&mut self) -> impl LinkReader;
94}
95
96pub trait LinkReader: Clone + Send + Sync + 'static {
97 #[allow(clippy::too_many_arguments)]
98 fn get_many_to_many_counts(
99 &self,
100 target: &str,
101 collection: &str,
102 path: &str,
103 path_to_other: &str,
104 limit: u64,
105 after: Option<String>,
106 filter_dids: &HashSet<Did>,
107 filter_to_targets: &HashSet<String>,
108 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
109
110 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
111
112 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
113
114 #[allow(clippy::too_many_arguments)]
115 fn get_links(
116 &self,
117 target: &str,
118 collection: &str,
119 path: &str,
120 order: Order,
121 limit: u64,
122 until: Option<u64>,
123 filter_dids: &HashSet<Did>,
124 ) -> Result<PagedAppendingCollection<RecordId>>;
125
126 fn get_distinct_dids(
127 &self,
128 target: &str,
129 collection: &str,
130 path: &str,
131 limit: u64,
132 until: Option<u64>,
133 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor
134
135 fn get_all_record_counts(&self, _target: &str)
136 -> Result<HashMap<String, HashMap<String, u64>>>;
137
138 fn get_all_counts(
139 &self,
140 _target: &str,
141 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>;
142
143 /// assume all stats are estimates, since exact counts are very challenging for LSMs
144 fn get_stats(&self) -> Result<StorageStats>;
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use links::{CollectedLink, Link};
151 use std::ops::RangeBounds;
152
153 macro_rules! test_each_storage {
154 ($test_name:ident, |$storage_label:ident| $test_code:block) => {
155 #[test]
156 fn $test_name() -> Result<()> {
157 {
158 println!("=> testing with memstorage backend");
159 #[allow(unused_mut)]
160 let mut $storage_label = MemStorage::new();
161 $test_code
162 }
163
164 #[cfg(feature = "rocks")]
165 {
166 println!("=> testing with rocksdb backend");
167 let rocks_db_path = tempfile::tempdir()?;
168 #[allow(unused_mut)]
169 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?;
170 $test_code
171 }
172
173 Ok(())
174 }
175 };
176 }
177
178 fn assert_stats(
179 stats: StorageStats,
180 dids: impl RangeBounds<u64>,
181 targetables: impl RangeBounds<u64>,
182 linking_records: impl RangeBounds<u64>,
183 ) {
184 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) {
185 assert!(
186 rb.contains(&stat),
187 "{name:?}: {stat:?} not in range {:?}–{:?}",
188 rb.start_bound(),
189 rb.end_bound()
190 );
191 }
192 check("dids", stats.dids, dids);
193 check("targetables", stats.targetables, targetables);
194 check("linking_records", stats.linking_records, linking_records);
195 }
196
197 test_each_storage!(test_empty, |storage| {
198 assert_eq!(storage.get_count("", "", "")?, 0);
199 assert_eq!(storage.get_count("a", "b", "c")?, 0);
200 assert_eq!(
201 storage.get_count(
202 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f",
203 "app.t.c",
204 ".reply.parent.uri"
205 )?,
206 0
207 );
208 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0);
209 assert_eq!(
210 storage.get_links(
211 "a.com",
212 "app.t.c",
213 ".abc.uri",
214 Order::NewestToOldest,
215 100,
216 None,
217 &HashSet::default()
218 )?,
219 PagedAppendingCollection::empty()
220 );
221 assert_eq!(
222 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
223 PagedAppendingCollection::empty()
224 );
225 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new());
226 assert_eq!(
227 storage.get_all_record_counts("bad-example.com")?,
228 HashMap::new()
229 );
230
231 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0);
232 });
233
234 test_each_storage!(test_add_link, |storage| {
235 storage.push(
236 &ActionableEvent::CreateLinks {
237 record_id: RecordId {
238 did: "did:plc:asdf".into(),
239 collection: "app.t.c".into(),
240 rkey: "fdsa".into(),
241 },
242 links: vec![CollectedLink {
243 target: Link::Uri("e.com".into()),
244 path: ".abc.uri".into(),
245 }],
246 },
247 0,
248 )?;
249 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
250 assert_eq!(
251 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
252 1
253 );
254 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0);
255 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0);
256 assert_eq!(
257 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?,
258 0
259 );
260 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
261 });
262
263 test_each_storage!(test_links, |storage| {
264 storage.push(
265 &ActionableEvent::CreateLinks {
266 record_id: RecordId {
267 did: "did:plc:asdf".into(),
268 collection: "app.t.c".into(),
269 rkey: "fdsa".into(),
270 },
271 links: vec![CollectedLink {
272 target: Link::Uri("e.com".into()),
273 path: ".abc.uri".into(),
274 }],
275 },
276 0,
277 )?;
278
279 // delete under the wrong collection
280 storage.push(
281 &ActionableEvent::DeleteRecord(RecordId {
282 did: "did:plc:asdf".into(),
283 collection: "app.test.wrongcollection".into(),
284 rkey: "fdsa".into(),
285 }),
286 0,
287 )?;
288 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
289
290 // delete under the wrong rkey
291 storage.push(
292 &ActionableEvent::DeleteRecord(RecordId {
293 did: "did:plc:asdf".into(),
294 collection: "app.t.c".into(),
295 rkey: "wrongkey".into(),
296 }),
297 0,
298 )?;
299 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
300
301 // finally actually delete it
302 storage.push(
303 &ActionableEvent::DeleteRecord(RecordId {
304 did: "did:plc:asdf".into(),
305 collection: "app.t.c".into(),
306 rkey: "fdsa".into(),
307 }),
308 0,
309 )?;
310 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
311
312 // put it back
313 storage.push(
314 &ActionableEvent::CreateLinks {
315 record_id: RecordId {
316 did: "did:plc:asdf".into(),
317 collection: "app.t.c".into(),
318 rkey: "fdsa".into(),
319 },
320 links: vec![CollectedLink {
321 target: Link::Uri("e.com".into()),
322 path: ".abc.uri".into(),
323 }],
324 },
325 0,
326 )?;
327 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
328 assert_eq!(
329 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
330 1
331 );
332
333 // add another link from this user
334 storage.push(
335 &ActionableEvent::CreateLinks {
336 record_id: RecordId {
337 did: "did:plc:asdf".into(),
338 collection: "app.t.c".into(),
339 rkey: "fdsa2".into(),
340 },
341 links: vec![CollectedLink {
342 target: Link::Uri("e.com".into()),
343 path: ".abc.uri".into(),
344 }],
345 },
346 0,
347 )?;
348 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
349 assert_eq!(
350 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
351 1
352 );
353
354 // add a link from someone else
355 storage.push(
356 &ActionableEvent::CreateLinks {
357 record_id: RecordId {
358 did: "did:plc:asdfasdf".into(),
359 collection: "app.t.c".into(),
360 rkey: "fdsa".into(),
361 },
362 links: vec![CollectedLink {
363 target: Link::Uri("e.com".into()),
364 path: ".abc.uri".into(),
365 }],
366 },
367 0,
368 )?;
369 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3);
370 assert_eq!(
371 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
372 2
373 );
374
375 // aaaand delete the first one again
376 storage.push(
377 &ActionableEvent::DeleteRecord(RecordId {
378 did: "did:plc:asdf".into(),
379 collection: "app.t.c".into(),
380 rkey: "fdsa".into(),
381 }),
382 0,
383 )?;
384 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
385 assert_eq!(
386 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
387 2
388 );
389 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2);
390 });
391
392 test_each_storage!(test_two_user_links_delete_one, |storage| {
393 // create the first link
394 storage.push(
395 &ActionableEvent::CreateLinks {
396 record_id: RecordId {
397 did: "did:plc:asdf".into(),
398 collection: "app.t.c".into(),
399 rkey: "A".into(),
400 },
401 links: vec![CollectedLink {
402 target: Link::Uri("e.com".into()),
403 path: ".abc.uri".into(),
404 }],
405 },
406 0,
407 )?;
408 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
409 assert_eq!(
410 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
411 1
412 );
413
414 // create the second link (same user, different rkey)
415 storage.push(
416 &ActionableEvent::CreateLinks {
417 record_id: RecordId {
418 did: "did:plc:asdf".into(),
419 collection: "app.t.c".into(),
420 rkey: "B".into(),
421 },
422 links: vec![CollectedLink {
423 target: Link::Uri("e.com".into()),
424 path: ".abc.uri".into(),
425 }],
426 },
427 0,
428 )?;
429 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
430 assert_eq!(
431 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
432 1
433 );
434
435 // aaaand delete the first link
436 storage.push(
437 &ActionableEvent::DeleteRecord(RecordId {
438 did: "did:plc:asdf".into(),
439 collection: "app.t.c".into(),
440 rkey: "A".into(),
441 }),
442 0,
443 )?;
444 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
445 assert_eq!(
446 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
447 1
448 );
449 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
450 });
451
452 test_each_storage!(test_accounts, |storage| {
453 // create two links
454 storage.push(
455 &ActionableEvent::CreateLinks {
456 record_id: RecordId {
457 did: "did:plc:asdf".into(),
458 collection: "app.t.c".into(),
459 rkey: "A".into(),
460 },
461 links: vec![CollectedLink {
462 target: Link::Uri("a.com".into()),
463 path: ".abc.uri".into(),
464 }],
465 },
466 0,
467 )?;
468 storage.push(
469 &ActionableEvent::CreateLinks {
470 record_id: RecordId {
471 did: "did:plc:asdf".into(),
472 collection: "app.t.c".into(),
473 rkey: "B".into(),
474 },
475 links: vec![CollectedLink {
476 target: Link::Uri("b.com".into()),
477 path: ".abc.uri".into(),
478 }],
479 },
480 0,
481 )?;
482 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
483 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1);
484
485 // and a third from a different account
486 storage.push(
487 &ActionableEvent::CreateLinks {
488 record_id: RecordId {
489 did: "did:plc:fdsa".into(),
490 collection: "app.t.c".into(),
491 rkey: "A".into(),
492 },
493 links: vec![CollectedLink {
494 target: Link::Uri("a.com".into()),
495 path: ".abc.uri".into(),
496 }],
497 },
498 0,
499 )?;
500 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2);
501
502 // delete the first account
503 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?;
504 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
505 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0);
506 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1);
507 });
508
509 test_each_storage!(multi_link, |storage| {
510 storage.push(
511 &ActionableEvent::CreateLinks {
512 record_id: RecordId {
513 did: "did:plc:asdf".into(),
514 collection: "app.t.c".into(),
515 rkey: "fdsa".into(),
516 },
517 links: vec![
518 CollectedLink {
519 target: Link::Uri("e.com".into()),
520 path: ".abc.uri".into(),
521 },
522 CollectedLink {
523 target: Link::Uri("f.com".into()),
524 path: ".xyz[].uri".into(),
525 },
526 CollectedLink {
527 target: Link::Uri("g.com".into()),
528 path: ".xyz[].uri".into(),
529 },
530 ],
531 },
532 0,
533 )?;
534 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
535 assert_eq!(
536 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
537 1
538 );
539 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
540 assert_eq!(
541 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?,
542 1
543 );
544 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
545 assert_eq!(
546 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?,
547 1
548 );
549
550 storage.push(
551 &ActionableEvent::DeleteRecord(RecordId {
552 did: "did:plc:asdf".into(),
553 collection: "app.t.c".into(),
554 rkey: "fdsa".into(),
555 }),
556 0,
557 )?;
558 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
559 assert_eq!(
560 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
561 0
562 );
563 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0);
564 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
565 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0);
566 });
567
568 test_each_storage!(update_link, |storage| {
569 // create the links
570 storage.push(
571 &ActionableEvent::CreateLinks {
572 record_id: RecordId {
573 did: "did:plc:asdf".into(),
574 collection: "app.t.c".into(),
575 rkey: "fdsa".into(),
576 },
577 links: vec![
578 CollectedLink {
579 target: Link::Uri("e.com".into()),
580 path: ".abc.uri".into(),
581 },
582 CollectedLink {
583 target: Link::Uri("f.com".into()),
584 path: ".xyz[].uri".into(),
585 },
586 CollectedLink {
587 target: Link::Uri("g.com".into()),
588 path: ".xyz[].uri".into(),
589 },
590 ],
591 },
592 0,
593 )?;
594 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
595 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
596 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
597
598 // update them
599 storage.push(
600 &ActionableEvent::UpdateLinks {
601 record_id: RecordId {
602 did: "did:plc:asdf".into(),
603 collection: "app.t.c".into(),
604 rkey: "fdsa".into(),
605 },
606 new_links: vec![
607 CollectedLink {
608 target: Link::Uri("h.com".into()),
609 path: ".abc.uri".into(),
610 },
611 CollectedLink {
612 target: Link::Uri("f.com".into()),
613 path: ".xyz[].uri".into(),
614 },
615 CollectedLink {
616 target: Link::Uri("i.com".into()),
617 path: ".xyz[].uri".into(),
618 },
619 ],
620 },
621 0,
622 )?;
623 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
624 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1);
625 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
626 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
627 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1);
628 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1);
629 });
630
631 test_each_storage!(update_no_links_to_links, |storage| {
632 // update without prior create (consumer would have filtered out the original)
633 storage.push(
634 &ActionableEvent::UpdateLinks {
635 record_id: RecordId {
636 did: "did:plc:asdf".into(),
637 collection: "app.t.c".into(),
638 rkey: "asdf".into(),
639 },
640 new_links: vec![CollectedLink {
641 target: Link::Uri("a.com".into()),
642 path: ".abc.uri".into(),
643 }],
644 },
645 0,
646 )?;
647 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
648 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
649 });
650
651 test_each_storage!(delete_multi_link_same_target, |storage| {
652 storage.push(
653 &ActionableEvent::CreateLinks {
654 record_id: RecordId {
655 did: "did:plc:asdf".into(),
656 collection: "app.t.c".into(),
657 rkey: "asdf".into(),
658 },
659 links: vec![
660 CollectedLink {
661 target: Link::Uri("a.com".into()),
662 path: ".abc.uri".into(),
663 },
664 CollectedLink {
665 target: Link::Uri("a.com".into()),
666 path: ".def.uri".into(),
667 },
668 ],
669 },
670 0,
671 )?;
672 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
673 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1);
674
675 storage.push(
676 &ActionableEvent::DeleteRecord(RecordId {
677 did: "did:plc:asdf".into(),
678 collection: "app.t.c".into(),
679 rkey: "asdf".into(),
680 }),
681 0,
682 )?;
683 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0);
684 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0);
685 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0);
686 });
687
688 test_each_storage!(get_links_basic, |storage| {
689 storage.push(
690 &ActionableEvent::CreateLinks {
691 record_id: RecordId {
692 did: "did:plc:asdf".into(),
693 collection: "app.t.c".into(),
694 rkey: "asdf".into(),
695 },
696 links: vec![CollectedLink {
697 target: Link::Uri("a.com".into()),
698 path: ".abc.uri".into(),
699 }],
700 },
701 0,
702 )?;
703 assert_eq!(
704 storage.get_links(
705 "a.com",
706 "app.t.c",
707 ".abc.uri",
708 Order::NewestToOldest,
709 100,
710 None,
711 &HashSet::default()
712 )?,
713 PagedAppendingCollection {
714 version: (1, 0),
715 items: vec![RecordId {
716 did: "did:plc:asdf".into(),
717 collection: "app.t.c".into(),
718 rkey: "asdf".into(),
719 }],
720 next: None,
721 total: 1,
722 }
723 );
724 assert_eq!(
725 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
726 PagedAppendingCollection {
727 version: (1, 0),
728 items: vec!["did:plc:asdf".into()],
729 next: None,
730 total: 1,
731 }
732 );
733 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
734 });
735
736 test_each_storage!(get_links_paged, |storage| {
737 for i in 1..=5 {
738 storage.push(
739 &ActionableEvent::CreateLinks {
740 record_id: RecordId {
741 did: format!("did:plc:asdf-{i}").into(),
742 collection: "app.t.c".into(),
743 rkey: "asdf".into(),
744 },
745 links: vec![CollectedLink {
746 target: Link::Uri("a.com".into()),
747 path: ".abc.uri".into(),
748 }],
749 },
750 0,
751 )?;
752 }
753
754 let sub = "a.com";
755 let col = "app.t.c";
756 let path = ".abc.uri";
757 let order = Order::NewestToOldest;
758 let dids_filter = HashSet::new();
759
760 // --- --- round one! --- --- //
761 // all backlinks
762 let links = storage.get_links(sub, col, path, order, 2, None, &dids_filter)?;
763 assert_eq!(
764 links,
765 PagedAppendingCollection {
766 version: (5, 0),
767 items: vec![
768 RecordId {
769 did: "did:plc:asdf-5".into(),
770 collection: col.into(),
771 rkey: "asdf".into(),
772 },
773 RecordId {
774 did: "did:plc:asdf-4".into(),
775 collection: col.into(),
776 rkey: "asdf".into(),
777 },
778 ],
779 next: Some(3),
780 total: 5,
781 }
782 );
783 // distinct dids
784 let dids = storage.get_distinct_dids(sub, col, path, 2, None)?;
785 assert_eq!(
786 dids,
787 PagedAppendingCollection {
788 version: (5, 0),
789 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()],
790 next: Some(3),
791 total: 5,
792 }
793 );
794
795 // --- --- round two! --- --- //
796 // all backlinks
797 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?;
798 assert_eq!(
799 links,
800 PagedAppendingCollection {
801 version: (5, 0),
802 items: vec![
803 RecordId {
804 did: "did:plc:asdf-3".into(),
805 collection: col.into(),
806 rkey: "asdf".into(),
807 },
808 RecordId {
809 did: "did:plc:asdf-2".into(),
810 collection: col.into(),
811 rkey: "asdf".into(),
812 },
813 ],
814 next: Some(1),
815 total: 5,
816 }
817 );
818 // distinct dids
819 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?;
820 assert_eq!(
821 dids,
822 PagedAppendingCollection {
823 version: (5, 0),
824 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()],
825 next: Some(1),
826 total: 5,
827 }
828 );
829
830 // --- --- round three! --- --- //
831 // all backlinks
832 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?;
833 assert_eq!(
834 links,
835 PagedAppendingCollection {
836 version: (5, 0),
837 items: vec![RecordId {
838 did: "did:plc:asdf-1".into(),
839 collection: col.into(),
840 rkey: "asdf".into(),
841 },],
842 next: None,
843 total: 5,
844 }
845 );
846 // distinct dids
847 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?;
848 assert_eq!(
849 dids,
850 PagedAppendingCollection {
851 version: (5, 0),
852 items: vec!["did:plc:asdf-1".into()],
853 next: None,
854 total: 5,
855 }
856 );
857
858 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
859 });
860
861 test_each_storage!(get_links_reverse_order, |storage| {
862 for i in 1..=5 {
863 storage.push(
864 &ActionableEvent::CreateLinks {
865 record_id: RecordId {
866 did: format!("did:plc:asdf-{i}").into(),
867 collection: "app.t.c".into(),
868 rkey: "asdf".into(),
869 },
870 links: vec![CollectedLink {
871 target: Link::Uri("a.com".into()),
872 path: ".abc.uri".into(),
873 }],
874 },
875 0,
876 )?;
877 }
878
879 // Test OldestToNewest order (oldest first)
880 let links = storage.get_links(
881 "a.com",
882 "app.t.c",
883 ".abc.uri",
884 Order::OldestToNewest,
885 2,
886 None,
887 &HashSet::default(),
888 )?;
889 assert_eq!(
890 links,
891 PagedAppendingCollection {
892 version: (5, 0),
893 items: vec![
894 RecordId {
895 did: "did:plc:asdf-1".into(),
896 collection: "app.t.c".into(),
897 rkey: "asdf".into(),
898 },
899 RecordId {
900 did: "did:plc:asdf-2".into(),
901 collection: "app.t.c".into(),
902 rkey: "asdf".into(),
903 },
904 ],
905 next: Some(3),
906 total: 5,
907 }
908 );
909 // Test NewestToOldest order (newest first)
910 let links = storage.get_links(
911 "a.com",
912 "app.t.c",
913 ".abc.uri",
914 Order::NewestToOldest,
915 2,
916 None,
917 &HashSet::default(),
918 )?;
919 assert_eq!(
920 links,
921 PagedAppendingCollection {
922 version: (5, 0),
923 items: vec![
924 RecordId {
925 did: "did:plc:asdf-5".into(),
926 collection: "app.t.c".into(),
927 rkey: "asdf".into(),
928 },
929 RecordId {
930 did: "did:plc:asdf-4".into(),
931 collection: "app.t.c".into(),
932 rkey: "asdf".into(),
933 },
934 ],
935 next: Some(3),
936 total: 5,
937 }
938 );
939 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
940 });
941
942 test_each_storage!(get_filtered_links, |storage| {
943 let links = storage.get_links(
944 "a.com",
945 "app.t.c",
946 ".abc.uri",
947 Order::NewestToOldest,
948 2,
949 None,
950 &HashSet::from([Did("did:plc:linker".to_string())]),
951 )?;
952 assert_eq!(links, PagedAppendingCollection::empty());
953
954 storage.push(
955 &ActionableEvent::CreateLinks {
956 record_id: RecordId {
957 did: "did:plc:linker".into(),
958 collection: "app.t.c".into(),
959 rkey: "asdf".into(),
960 },
961 links: vec![CollectedLink {
962 target: Link::Uri("a.com".into()),
963 path: ".abc.uri".into(),
964 }],
965 },
966 0,
967 )?;
968
969 let links = storage.get_links(
970 "a.com",
971 "app.t.c",
972 ".abc.uri",
973 Order::NewestToOldest,
974 2,
975 None,
976 &HashSet::from([Did("did:plc:linker".to_string())]),
977 )?;
978 assert_eq!(
979 links,
980 PagedAppendingCollection {
981 version: (1, 0),
982 items: vec![RecordId {
983 did: "did:plc:linker".into(),
984 collection: "app.t.c".into(),
985 rkey: "asdf".into(),
986 },],
987 next: None,
988 total: 1,
989 }
990 );
991
992 let links = storage.get_links(
993 "a.com",
994 "app.t.c",
995 ".abc.uri",
996 Order::NewestToOldest,
997 2,
998 None,
999 &HashSet::from([Did("did:plc:someone-else".to_string())]),
1000 )?;
1001 assert_eq!(links, PagedAppendingCollection::empty());
1002
1003 storage.push(
1004 &ActionableEvent::CreateLinks {
1005 record_id: RecordId {
1006 did: "did:plc:linker".into(),
1007 collection: "app.t.c".into(),
1008 rkey: "asdf-2".into(),
1009 },
1010 links: vec![CollectedLink {
1011 target: Link::Uri("a.com".into()),
1012 path: ".abc.uri".into(),
1013 }],
1014 },
1015 0,
1016 )?;
1017 storage.push(
1018 &ActionableEvent::CreateLinks {
1019 record_id: RecordId {
1020 did: "did:plc:someone-else".into(),
1021 collection: "app.t.c".into(),
1022 rkey: "asdf".into(),
1023 },
1024 links: vec![CollectedLink {
1025 target: Link::Uri("a.com".into()),
1026 path: ".abc.uri".into(),
1027 }],
1028 },
1029 0,
1030 )?;
1031
1032 let links = storage.get_links(
1033 "a.com",
1034 "app.t.c",
1035 ".abc.uri",
1036 Order::NewestToOldest,
1037 2,
1038 None,
1039 &HashSet::from([Did("did:plc:linker".to_string())]),
1040 )?;
1041 assert_eq!(
1042 links,
1043 PagedAppendingCollection {
1044 version: (2, 0),
1045 items: vec![
1046 RecordId {
1047 did: "did:plc:linker".into(),
1048 collection: "app.t.c".into(),
1049 rkey: "asdf-2".into(),
1050 },
1051 RecordId {
1052 did: "did:plc:linker".into(),
1053 collection: "app.t.c".into(),
1054 rkey: "asdf".into(),
1055 },
1056 ],
1057 next: None,
1058 total: 2,
1059 }
1060 );
1061
1062 let links = storage.get_links(
1063 "a.com",
1064 "app.t.c",
1065 ".abc.uri",
1066 Order::NewestToOldest,
1067 2,
1068 None,
1069 &HashSet::from([
1070 Did("did:plc:linker".to_string()),
1071 Did("did:plc:someone-else".to_string()),
1072 ]),
1073 )?;
1074 assert_eq!(
1075 links,
1076 PagedAppendingCollection {
1077 version: (3, 0),
1078 items: vec![
1079 RecordId {
1080 did: "did:plc:someone-else".into(),
1081 collection: "app.t.c".into(),
1082 rkey: "asdf".into(),
1083 },
1084 RecordId {
1085 did: "did:plc:linker".into(),
1086 collection: "app.t.c".into(),
1087 rkey: "asdf-2".into(),
1088 },
1089 ],
1090 next: Some(1),
1091 total: 3,
1092 }
1093 );
1094
1095 let links = storage.get_links(
1096 "a.com",
1097 "app.t.c",
1098 ".abc.uri",
1099 Order::NewestToOldest,
1100 2,
1101 None,
1102 &HashSet::from([Did("did:plc:someone-unknown".to_string())]),
1103 )?;
1104 assert_eq!(links, PagedAppendingCollection::empty());
1105 });
1106
1107 test_each_storage!(get_links_exact_multiple, |storage| {
1108 for i in 1..=4 {
1109 storage.push(
1110 &ActionableEvent::CreateLinks {
1111 record_id: RecordId {
1112 did: format!("did:plc:asdf-{i}").into(),
1113 collection: "app.t.c".into(),
1114 rkey: "asdf".into(),
1115 },
1116 links: vec![CollectedLink {
1117 target: Link::Uri("a.com".into()),
1118 path: ".abc.uri".into(),
1119 }],
1120 },
1121 0,
1122 )?;
1123 }
1124 let links = storage.get_links(
1125 "a.com",
1126 "app.t.c",
1127 ".abc.uri",
1128 Order::NewestToOldest,
1129 2,
1130 None,
1131 &HashSet::default(),
1132 )?;
1133 assert_eq!(
1134 links,
1135 PagedAppendingCollection {
1136 version: (4, 0),
1137 items: vec![
1138 RecordId {
1139 did: "did:plc:asdf-4".into(),
1140 collection: "app.t.c".into(),
1141 rkey: "asdf".into(),
1142 },
1143 RecordId {
1144 did: "did:plc:asdf-3".into(),
1145 collection: "app.t.c".into(),
1146 rkey: "asdf".into(),
1147 },
1148 ],
1149 next: Some(2),
1150 total: 4,
1151 }
1152 );
1153 let links = storage.get_links(
1154 "a.com",
1155 "app.t.c",
1156 ".abc.uri",
1157 Order::NewestToOldest,
1158 2,
1159 links.next,
1160 &HashSet::default(),
1161 )?;
1162 assert_eq!(
1163 links,
1164 PagedAppendingCollection {
1165 version: (4, 0),
1166 items: vec![
1167 RecordId {
1168 did: "did:plc:asdf-2".into(),
1169 collection: "app.t.c".into(),
1170 rkey: "asdf".into(),
1171 },
1172 RecordId {
1173 did: "did:plc:asdf-1".into(),
1174 collection: "app.t.c".into(),
1175 rkey: "asdf".into(),
1176 },
1177 ],
1178 next: None,
1179 total: 4,
1180 }
1181 );
1182 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1183 });
1184
1185 test_each_storage!(page_links_while_new_links_arrive, |storage| {
1186 for i in 1..=4 {
1187 storage.push(
1188 &ActionableEvent::CreateLinks {
1189 record_id: RecordId {
1190 did: format!("did:plc:asdf-{i}").into(),
1191 collection: "app.t.c".into(),
1192 rkey: "asdf".into(),
1193 },
1194 links: vec![CollectedLink {
1195 target: Link::Uri("a.com".into()),
1196 path: ".abc.uri".into(),
1197 }],
1198 },
1199 0,
1200 )?;
1201 }
1202 let links = storage.get_links(
1203 "a.com",
1204 "app.t.c",
1205 ".abc.uri",
1206 Order::NewestToOldest,
1207 2,
1208 None,
1209 &HashSet::default(),
1210 )?;
1211 assert_eq!(
1212 links,
1213 PagedAppendingCollection {
1214 version: (4, 0),
1215 items: vec![
1216 RecordId {
1217 did: "did:plc:asdf-4".into(),
1218 collection: "app.t.c".into(),
1219 rkey: "asdf".into(),
1220 },
1221 RecordId {
1222 did: "did:plc:asdf-3".into(),
1223 collection: "app.t.c".into(),
1224 rkey: "asdf".into(),
1225 },
1226 ],
1227 next: Some(2),
1228 total: 4,
1229 }
1230 );
1231 storage.push(
1232 &ActionableEvent::CreateLinks {
1233 record_id: RecordId {
1234 did: "did:plc:asdf-5".into(),
1235 collection: "app.t.c".into(),
1236 rkey: "asdf".into(),
1237 },
1238 links: vec![CollectedLink {
1239 target: Link::Uri("a.com".into()),
1240 path: ".abc.uri".into(),
1241 }],
1242 },
1243 0,
1244 )?;
1245 let links = storage.get_links(
1246 "a.com",
1247 "app.t.c",
1248 ".abc.uri",
1249 Order::NewestToOldest,
1250 2,
1251 links.next,
1252 &HashSet::default(),
1253 )?;
1254 assert_eq!(
1255 links,
1256 PagedAppendingCollection {
1257 version: (5, 0),
1258 items: vec![
1259 RecordId {
1260 did: "did:plc:asdf-2".into(),
1261 collection: "app.t.c".into(),
1262 rkey: "asdf".into(),
1263 },
1264 RecordId {
1265 did: "did:plc:asdf-1".into(),
1266 collection: "app.t.c".into(),
1267 rkey: "asdf".into(),
1268 },
1269 ],
1270 next: None,
1271 total: 5,
1272 }
1273 );
1274 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
1275 });
1276
1277 test_each_storage!(page_links_while_some_are_deleted, |storage| {
1278 for i in 1..=4 {
1279 storage.push(
1280 &ActionableEvent::CreateLinks {
1281 record_id: RecordId {
1282 did: format!("did:plc:asdf-{i}").into(),
1283 collection: "app.t.c".into(),
1284 rkey: "asdf".into(),
1285 },
1286 links: vec![CollectedLink {
1287 target: Link::Uri("a.com".into()),
1288 path: ".abc.uri".into(),
1289 }],
1290 },
1291 0,
1292 )?;
1293 }
1294 let links = storage.get_links(
1295 "a.com",
1296 "app.t.c",
1297 ".abc.uri",
1298 Order::NewestToOldest,
1299 2,
1300 None,
1301 &HashSet::default(),
1302 )?;
1303 assert_eq!(
1304 links,
1305 PagedAppendingCollection {
1306 version: (4, 0),
1307 items: vec![
1308 RecordId {
1309 did: "did:plc:asdf-4".into(),
1310 collection: "app.t.c".into(),
1311 rkey: "asdf".into(),
1312 },
1313 RecordId {
1314 did: "did:plc:asdf-3".into(),
1315 collection: "app.t.c".into(),
1316 rkey: "asdf".into(),
1317 },
1318 ],
1319 next: Some(2),
1320 total: 4,
1321 }
1322 );
1323 storage.push(
1324 &ActionableEvent::DeleteRecord(RecordId {
1325 did: "did:plc:asdf-2".into(),
1326 collection: "app.t.c".into(),
1327 rkey: "asdf".into(),
1328 }),
1329 0,
1330 )?;
1331 let links = storage.get_links(
1332 "a.com",
1333 "app.t.c",
1334 ".abc.uri",
1335 Order::NewestToOldest,
1336 2,
1337 links.next,
1338 &HashSet::default(),
1339 )?;
1340 assert_eq!(
1341 links,
1342 PagedAppendingCollection {
1343 version: (4, 1),
1344 items: vec![RecordId {
1345 did: "did:plc:asdf-1".into(),
1346 collection: "app.t.c".into(),
1347 rkey: "asdf".into(),
1348 },],
1349 next: None,
1350 total: 3,
1351 }
1352 );
1353 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3);
1354 });
1355
1356 test_each_storage!(page_links_accounts_inactive, |storage| {
1357 for i in 1..=4 {
1358 storage.push(
1359 &ActionableEvent::CreateLinks {
1360 record_id: RecordId {
1361 did: format!("did:plc:asdf-{i}").into(),
1362 collection: "app.t.c".into(),
1363 rkey: "asdf".into(),
1364 },
1365 links: vec![CollectedLink {
1366 target: Link::Uri("a.com".into()),
1367 path: ".abc.uri".into(),
1368 }],
1369 },
1370 0,
1371 )?;
1372 }
1373 let links = storage.get_links(
1374 "a.com",
1375 "app.t.c",
1376 ".abc.uri",
1377 Order::NewestToOldest,
1378 2,
1379 None,
1380 &HashSet::default(),
1381 )?;
1382 assert_eq!(
1383 links,
1384 PagedAppendingCollection {
1385 version: (4, 0),
1386 items: vec![
1387 RecordId {
1388 did: "did:plc:asdf-4".into(),
1389 collection: "app.t.c".into(),
1390 rkey: "asdf".into(),
1391 },
1392 RecordId {
1393 did: "did:plc:asdf-3".into(),
1394 collection: "app.t.c".into(),
1395 rkey: "asdf".into(),
1396 },
1397 ],
1398 next: Some(2),
1399 total: 4,
1400 }
1401 );
1402 storage.push(
1403 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()),
1404 0,
1405 )?;
1406 let links = storage.get_links(
1407 "a.com",
1408 "app.t.c",
1409 ".abc.uri",
1410 Order::NewestToOldest,
1411 2,
1412 links.next,
1413 &HashSet::default(),
1414 )?;
1415 assert_eq!(
1416 links,
1417 PagedAppendingCollection {
1418 version: (4, 0),
1419 items: vec![RecordId {
1420 did: "did:plc:asdf-2".into(),
1421 collection: "app.t.c".into(),
1422 rkey: "asdf".into(),
1423 },],
1424 next: None,
1425 total: 4,
1426 }
1427 );
1428 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1429 });
1430
1431 test_each_storage!(get_all_counts, |storage| {
1432 storage.push(
1433 &ActionableEvent::CreateLinks {
1434 record_id: RecordId {
1435 did: "did:plc:asdf".into(),
1436 collection: "app.t.c".into(),
1437 rkey: "asdf".into(),
1438 },
1439 links: vec![
1440 CollectedLink {
1441 target: Link::Uri("a.com".into()),
1442 path: ".abc.uri".into(),
1443 },
1444 CollectedLink {
1445 target: Link::Uri("a.com".into()),
1446 path: ".def.uri".into(),
1447 },
1448 ],
1449 },
1450 0,
1451 )?;
1452 assert_eq!(storage.get_all_record_counts("a.com")?, {
1453 let mut counts = HashMap::new();
1454 let mut t_c_counts = HashMap::new();
1455 t_c_counts.insert(".abc.uri".into(), 1);
1456 t_c_counts.insert(".def.uri".into(), 1);
1457 counts.insert("app.t.c".into(), t_c_counts);
1458 counts
1459 });
1460 assert_eq!(storage.get_all_counts("a.com")?, {
1461 let mut counts = HashMap::new();
1462 let mut t_c_counts = HashMap::new();
1463 t_c_counts.insert(
1464 ".abc.uri".into(),
1465 CountsByCount {
1466 records: 1,
1467 distinct_dids: 1,
1468 },
1469 );
1470 t_c_counts.insert(
1471 ".def.uri".into(),
1472 CountsByCount {
1473 records: 1,
1474 distinct_dids: 1,
1475 },
1476 );
1477 counts.insert("app.t.c".into(), t_c_counts);
1478 counts
1479 });
1480 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1481 });
1482
1483 //////// many-to-many /////////
1484
1485 test_each_storage!(get_m2m_counts_empty, |storage| {
1486 assert_eq!(
1487 storage.get_many_to_many_counts(
1488 "a.com",
1489 "a.b.c",
1490 ".d.e",
1491 ".f.g",
1492 10,
1493 None,
1494 &HashSet::new(),
1495 &HashSet::new(),
1496 )?,
1497 PagedOrderedCollection::empty()
1498 );
1499 });
1500
1501 test_each_storage!(get_m2m_counts_single, |storage| {
1502 storage.push(
1503 &ActionableEvent::CreateLinks {
1504 record_id: RecordId {
1505 did: "did:plc:asdf".into(),
1506 collection: "app.t.c".into(),
1507 rkey: "asdf".into(),
1508 },
1509 links: vec![
1510 CollectedLink {
1511 target: Link::Uri("a.com".into()),
1512 path: ".abc.uri".into(),
1513 },
1514 CollectedLink {
1515 target: Link::Uri("b.com".into()),
1516 path: ".def.uri".into(),
1517 },
1518 CollectedLink {
1519 target: Link::Uri("b.com".into()),
1520 path: ".ghi.uri".into(),
1521 },
1522 ],
1523 },
1524 0,
1525 )?;
1526 assert_eq!(
1527 storage.get_many_to_many_counts(
1528 "a.com",
1529 "app.t.c",
1530 ".abc.uri",
1531 ".def.uri",
1532 10,
1533 None,
1534 &HashSet::new(),
1535 &HashSet::new(),
1536 )?,
1537 PagedOrderedCollection {
1538 items: vec![("b.com".to_string(), 1, 1)],
1539 next: None,
1540 }
1541 );
1542 });
1543
1544 test_each_storage!(get_m2m_counts_filters, |storage| {
1545 storage.push(
1546 &ActionableEvent::CreateLinks {
1547 record_id: RecordId {
1548 did: "did:plc:asdf".into(),
1549 collection: "app.t.c".into(),
1550 rkey: "asdf".into(),
1551 },
1552 links: vec![
1553 CollectedLink {
1554 target: Link::Uri("a.com".into()),
1555 path: ".abc.uri".into(),
1556 },
1557 CollectedLink {
1558 target: Link::Uri("b.com".into()),
1559 path: ".def.uri".into(),
1560 },
1561 ],
1562 },
1563 0,
1564 )?;
1565 storage.push(
1566 &ActionableEvent::CreateLinks {
1567 record_id: RecordId {
1568 did: "did:plc:asdfasdf".into(),
1569 collection: "app.t.c".into(),
1570 rkey: "asdf".into(),
1571 },
1572 links: vec![
1573 CollectedLink {
1574 target: Link::Uri("a.com".into()),
1575 path: ".abc.uri".into(),
1576 },
1577 CollectedLink {
1578 target: Link::Uri("b.com".into()),
1579 path: ".def.uri".into(),
1580 },
1581 ],
1582 },
1583 1,
1584 )?;
1585 storage.push(
1586 &ActionableEvent::CreateLinks {
1587 record_id: RecordId {
1588 did: "did:plc:fdsa".into(),
1589 collection: "app.t.c".into(),
1590 rkey: "asdf".into(),
1591 },
1592 links: vec![
1593 CollectedLink {
1594 target: Link::Uri("a.com".into()),
1595 path: ".abc.uri".into(),
1596 },
1597 CollectedLink {
1598 target: Link::Uri("c.com".into()),
1599 path: ".def.uri".into(),
1600 },
1601 ],
1602 },
1603 2,
1604 )?;
1605 storage.push(
1606 &ActionableEvent::CreateLinks {
1607 record_id: RecordId {
1608 did: "did:plc:fdsa".into(),
1609 collection: "app.t.c".into(),
1610 rkey: "asdf2".into(),
1611 },
1612 links: vec![
1613 CollectedLink {
1614 target: Link::Uri("a.com".into()),
1615 path: ".abc.uri".into(),
1616 },
1617 CollectedLink {
1618 target: Link::Uri("c.com".into()),
1619 path: ".def.uri".into(),
1620 },
1621 ],
1622 },
1623 3,
1624 )?;
1625 assert_eq!(
1626 storage.get_many_to_many_counts(
1627 "a.com",
1628 "app.t.c",
1629 ".abc.uri",
1630 ".def.uri",
1631 10,
1632 None,
1633 &HashSet::new(),
1634 &HashSet::new(),
1635 )?,
1636 PagedOrderedCollection {
1637 items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1638 next: None,
1639 }
1640 );
1641 assert_eq!(
1642 storage.get_many_to_many_counts(
1643 "a.com",
1644 "app.t.c",
1645 ".abc.uri",
1646 ".def.uri",
1647 10,
1648 None,
1649 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1650 &HashSet::new(),
1651 )?,
1652 PagedOrderedCollection {
1653 items: vec![("c.com".to_string(), 2, 1),],
1654 next: None,
1655 }
1656 );
1657 assert_eq!(
1658 storage.get_many_to_many_counts(
1659 "a.com",
1660 "app.t.c",
1661 ".abc.uri",
1662 ".def.uri",
1663 10,
1664 None,
1665 &HashSet::new(),
1666 &HashSet::from_iter(["b.com".to_string()]),
1667 )?,
1668 PagedOrderedCollection {
1669 items: vec![("b.com".to_string(), 2, 2),],
1670 next: None,
1671 }
1672 );
1673 });
1674}