Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, ManyToManyItem, RecordId};
2use anyhow::Result;
3use serde::{Deserialize, Serialize};
4use std::collections::{HashMap, HashSet};
5
6pub mod mem_store;
7pub use mem_store::MemStorage;
8
9#[cfg(feature = "rocks")]
10pub mod rocks_store;
11#[cfg(feature = "rocks")]
12pub use rocks_store::RocksStorage;
13
14/// Ordering for paginated link queries
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum Order {
17 /// Newest links first (default)
18 NewestToOldest,
19 /// Oldest links first
20 OldestToNewest,
21}
22
23#[derive(Debug, Default, PartialEq)]
24pub struct PagedAppendingCollection<T> {
25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted"
26 pub items: Vec<T>,
27 pub next: Option<u64>,
28 pub total: u64,
29}
30
31impl<T> PagedAppendingCollection<T> {
32 pub(crate) fn empty() -> Self {
33 Self {
34 version: (0, 0),
35 items: Vec::new(),
36 next: None,
37 total: 0,
38 }
39 }
40}
41
42#[derive(Copy, Clone, Debug)]
43struct ManyToManyCursor {
44 backlink_idx: u64,
45 other_link_idx: u64,
46}
47
48/// A paged collection whose keys are sorted instead of indexed
49///
50/// this has weaker guarantees than PagedAppendingCollection: it might
51/// return a totally consistent snapshot. but it should avoid duplicates
52/// and each page should at least be internally consistent.
53#[derive(Debug, PartialEq)]
54pub struct PagedOrderedCollection<T, K: Ord> {
55 pub items: Vec<T>,
56 pub next: Option<K>,
57}
58
59impl<T, K: Ord> PagedOrderedCollection<T, K> {
60 pub(crate) fn empty() -> Self {
61 Self {
62 items: Vec::new(),
63 next: None,
64 }
65 }
66}
67
68#[derive(Debug, Deserialize, Serialize, PartialEq)]
69pub struct StorageStats {
70 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
71 /// for example: new user A follows users B and C. this count will only increment by one, for A.
72 pub dids: u64,
73
74 /// estimate targets * distinct (collection, path)s to reference them.
75 /// distinct targets alone are currently challenging to estimate.
76 pub targetables: u64,
77
78 /// estimate of the count of atproto records seen that contain links.
79 /// records with multiple links are single-counted.
80 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
81 pub linking_records: u64,
82
83 /// first jetstream cursor when this instance first started
84 pub started_at: Option<u64>,
85
86 /// anything else we want to throw in
87 pub other_data: HashMap<String, u64>,
88}
89
90pub trait LinkStorage: Send + Sync {
91 /// jetstream cursor from last saved actions, if available
92 fn get_cursor(&mut self) -> Result<Option<u64>> {
93 Ok(None)
94 }
95
96 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>;
97
98 // readers are off from the writer instance
99 fn to_readable(&mut self) -> impl LinkReader;
100}
101
102pub trait LinkReader: Clone + Send + Sync + 'static {
103 #[allow(clippy::too_many_arguments)]
104 fn get_many_to_many_counts(
105 &self,
106 target: &str,
107 collection: &str,
108 path: &str,
109 path_to_other: &str,
110 limit: u64,
111 after: Option<String>,
112 filter_dids: &HashSet<Did>,
113 filter_to_targets: &HashSet<String>,
114 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
115
116 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
117
118 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
119
120 #[allow(clippy::too_many_arguments)]
121 fn get_links(
122 &self,
123 target: &str,
124 collection: &str,
125 path: &str,
126 order: Order,
127 limit: u64,
128 until: Option<u64>,
129 filter_dids: &HashSet<Did>,
130 ) -> Result<PagedAppendingCollection<RecordId>>;
131
132 fn get_distinct_dids(
133 &self,
134 target: &str,
135 collection: &str,
136 path: &str,
137 limit: u64,
138 until: Option<u64>,
139 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor
140
141 fn get_all_record_counts(&self, _target: &str)
142 -> Result<HashMap<String, HashMap<String, u64>>>;
143
144 #[allow(clippy::too_many_arguments)]
145 fn get_many_to_many(
146 &self,
147 target: &str,
148 collection: &str,
149 path: &str,
150 path_to_other: &str,
151 limit: u64,
152 after: Option<String>,
153 filter_dids: &HashSet<Did>,
154 filter_to_targets: &HashSet<String>,
155 ) -> Result<PagedOrderedCollection<ManyToManyItem, String>>;
156
157 fn get_all_counts(
158 &self,
159 _target: &str,
160 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>;
161
162 /// assume all stats are estimates, since exact counts are very challenging for LSMs
163 fn get_stats(&self) -> Result<StorageStats>;
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use links::{CollectedLink, Link};
170 use std::ops::RangeBounds;
171
172 macro_rules! test_each_storage {
173 ($test_name:ident, |$storage_label:ident| $test_code:block) => {
174 #[test]
175 fn $test_name() -> Result<()> {
176 {
177 println!("=> testing with memstorage backend");
178 #[allow(unused_mut)]
179 let mut $storage_label = MemStorage::new();
180 $test_code
181 }
182
183 #[cfg(feature = "rocks")]
184 {
185 println!("=> testing with rocksdb backend");
186 let rocks_db_path = tempfile::tempdir()?;
187 #[allow(unused_mut)]
188 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?;
189 $test_code
190 }
191
192 Ok(())
193 }
194 };
195 }
196
197 fn assert_stats(
198 stats: StorageStats,
199 dids: impl RangeBounds<u64>,
200 targetables: impl RangeBounds<u64>,
201 linking_records: impl RangeBounds<u64>,
202 ) {
203 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) {
204 assert!(
205 rb.contains(&stat),
206 "{name:?}: {stat:?} not in range {:?}–{:?}",
207 rb.start_bound(),
208 rb.end_bound()
209 );
210 }
211 check("dids", stats.dids, dids);
212 check("targetables", stats.targetables, targetables);
213 check("linking_records", stats.linking_records, linking_records);
214 }
215
216 test_each_storage!(test_empty, |storage| {
217 assert_eq!(storage.get_count("", "", "")?, 0);
218 assert_eq!(storage.get_count("a", "b", "c")?, 0);
219 assert_eq!(
220 storage.get_count(
221 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f",
222 "app.t.c",
223 ".reply.parent.uri"
224 )?,
225 0
226 );
227 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0);
228 assert_eq!(
229 storage.get_links(
230 "a.com",
231 "app.t.c",
232 ".abc.uri",
233 Order::NewestToOldest,
234 100,
235 None,
236 &HashSet::default()
237 )?,
238 PagedAppendingCollection::empty()
239 );
240 assert_eq!(
241 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
242 PagedAppendingCollection::empty()
243 );
244 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new());
245 assert_eq!(
246 storage.get_all_record_counts("bad-example.com")?,
247 HashMap::new()
248 );
249
250 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0);
251 });
252
253 test_each_storage!(test_add_link, |storage| {
254 storage.push(
255 &ActionableEvent::CreateLinks {
256 record_id: RecordId {
257 did: "did:plc:asdf".into(),
258 collection: "app.t.c".into(),
259 rkey: "fdsa".into(),
260 },
261 links: vec![CollectedLink {
262 target: Link::Uri("e.com".into()),
263 path: ".abc.uri".into(),
264 }],
265 },
266 0,
267 )?;
268 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
269 assert_eq!(
270 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
271 1
272 );
273 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0);
274 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0);
275 assert_eq!(
276 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?,
277 0
278 );
279 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
280 });
281
282 test_each_storage!(test_links, |storage| {
283 storage.push(
284 &ActionableEvent::CreateLinks {
285 record_id: RecordId {
286 did: "did:plc:asdf".into(),
287 collection: "app.t.c".into(),
288 rkey: "fdsa".into(),
289 },
290 links: vec![CollectedLink {
291 target: Link::Uri("e.com".into()),
292 path: ".abc.uri".into(),
293 }],
294 },
295 0,
296 )?;
297
298 // delete under the wrong collection
299 storage.push(
300 &ActionableEvent::DeleteRecord(RecordId {
301 did: "did:plc:asdf".into(),
302 collection: "app.test.wrongcollection".into(),
303 rkey: "fdsa".into(),
304 }),
305 0,
306 )?;
307 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
308
309 // delete under the wrong rkey
310 storage.push(
311 &ActionableEvent::DeleteRecord(RecordId {
312 did: "did:plc:asdf".into(),
313 collection: "app.t.c".into(),
314 rkey: "wrongkey".into(),
315 }),
316 0,
317 )?;
318 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
319
320 // finally actually delete it
321 storage.push(
322 &ActionableEvent::DeleteRecord(RecordId {
323 did: "did:plc:asdf".into(),
324 collection: "app.t.c".into(),
325 rkey: "fdsa".into(),
326 }),
327 0,
328 )?;
329 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
330
331 // put it back
332 storage.push(
333 &ActionableEvent::CreateLinks {
334 record_id: RecordId {
335 did: "did:plc:asdf".into(),
336 collection: "app.t.c".into(),
337 rkey: "fdsa".into(),
338 },
339 links: vec![CollectedLink {
340 target: Link::Uri("e.com".into()),
341 path: ".abc.uri".into(),
342 }],
343 },
344 0,
345 )?;
346 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
347 assert_eq!(
348 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
349 1
350 );
351
352 // add another link from this user
353 storage.push(
354 &ActionableEvent::CreateLinks {
355 record_id: RecordId {
356 did: "did:plc:asdf".into(),
357 collection: "app.t.c".into(),
358 rkey: "fdsa2".into(),
359 },
360 links: vec![CollectedLink {
361 target: Link::Uri("e.com".into()),
362 path: ".abc.uri".into(),
363 }],
364 },
365 0,
366 )?;
367 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
368 assert_eq!(
369 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
370 1
371 );
372
373 // add a link from someone else
374 storage.push(
375 &ActionableEvent::CreateLinks {
376 record_id: RecordId {
377 did: "did:plc:asdfasdf".into(),
378 collection: "app.t.c".into(),
379 rkey: "fdsa".into(),
380 },
381 links: vec![CollectedLink {
382 target: Link::Uri("e.com".into()),
383 path: ".abc.uri".into(),
384 }],
385 },
386 0,
387 )?;
388 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3);
389 assert_eq!(
390 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
391 2
392 );
393
394 // aaaand delete the first one again
395 storage.push(
396 &ActionableEvent::DeleteRecord(RecordId {
397 did: "did:plc:asdf".into(),
398 collection: "app.t.c".into(),
399 rkey: "fdsa".into(),
400 }),
401 0,
402 )?;
403 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
404 assert_eq!(
405 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
406 2
407 );
408 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2);
409 });
410
411 test_each_storage!(test_two_user_links_delete_one, |storage| {
412 // create the first link
413 storage.push(
414 &ActionableEvent::CreateLinks {
415 record_id: RecordId {
416 did: "did:plc:asdf".into(),
417 collection: "app.t.c".into(),
418 rkey: "A".into(),
419 },
420 links: vec![CollectedLink {
421 target: Link::Uri("e.com".into()),
422 path: ".abc.uri".into(),
423 }],
424 },
425 0,
426 )?;
427 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
428 assert_eq!(
429 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
430 1
431 );
432
433 // create the second link (same user, different rkey)
434 storage.push(
435 &ActionableEvent::CreateLinks {
436 record_id: RecordId {
437 did: "did:plc:asdf".into(),
438 collection: "app.t.c".into(),
439 rkey: "B".into(),
440 },
441 links: vec![CollectedLink {
442 target: Link::Uri("e.com".into()),
443 path: ".abc.uri".into(),
444 }],
445 },
446 0,
447 )?;
448 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
449 assert_eq!(
450 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
451 1
452 );
453
454 // aaaand delete the first link
455 storage.push(
456 &ActionableEvent::DeleteRecord(RecordId {
457 did: "did:plc:asdf".into(),
458 collection: "app.t.c".into(),
459 rkey: "A".into(),
460 }),
461 0,
462 )?;
463 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
464 assert_eq!(
465 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
466 1
467 );
468 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
469 });
470
471 test_each_storage!(test_accounts, |storage| {
472 // create two links
473 storage.push(
474 &ActionableEvent::CreateLinks {
475 record_id: RecordId {
476 did: "did:plc:asdf".into(),
477 collection: "app.t.c".into(),
478 rkey: "A".into(),
479 },
480 links: vec![CollectedLink {
481 target: Link::Uri("a.com".into()),
482 path: ".abc.uri".into(),
483 }],
484 },
485 0,
486 )?;
487 storage.push(
488 &ActionableEvent::CreateLinks {
489 record_id: RecordId {
490 did: "did:plc:asdf".into(),
491 collection: "app.t.c".into(),
492 rkey: "B".into(),
493 },
494 links: vec![CollectedLink {
495 target: Link::Uri("b.com".into()),
496 path: ".abc.uri".into(),
497 }],
498 },
499 0,
500 )?;
501 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
502 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1);
503
504 // and a third from a different account
505 storage.push(
506 &ActionableEvent::CreateLinks {
507 record_id: RecordId {
508 did: "did:plc:fdsa".into(),
509 collection: "app.t.c".into(),
510 rkey: "A".into(),
511 },
512 links: vec![CollectedLink {
513 target: Link::Uri("a.com".into()),
514 path: ".abc.uri".into(),
515 }],
516 },
517 0,
518 )?;
519 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2);
520
521 // delete the first account
522 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?;
523 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
524 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0);
525 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1);
526 });
527
528 test_each_storage!(multi_link, |storage| {
529 storage.push(
530 &ActionableEvent::CreateLinks {
531 record_id: RecordId {
532 did: "did:plc:asdf".into(),
533 collection: "app.t.c".into(),
534 rkey: "fdsa".into(),
535 },
536 links: vec![
537 CollectedLink {
538 target: Link::Uri("e.com".into()),
539 path: ".abc.uri".into(),
540 },
541 CollectedLink {
542 target: Link::Uri("f.com".into()),
543 path: ".xyz[].uri".into(),
544 },
545 CollectedLink {
546 target: Link::Uri("g.com".into()),
547 path: ".xyz[].uri".into(),
548 },
549 ],
550 },
551 0,
552 )?;
553 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
554 assert_eq!(
555 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
556 1
557 );
558 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
559 assert_eq!(
560 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?,
561 1
562 );
563 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
564 assert_eq!(
565 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?,
566 1
567 );
568
569 storage.push(
570 &ActionableEvent::DeleteRecord(RecordId {
571 did: "did:plc:asdf".into(),
572 collection: "app.t.c".into(),
573 rkey: "fdsa".into(),
574 }),
575 0,
576 )?;
577 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
578 assert_eq!(
579 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
580 0
581 );
582 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0);
583 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
584 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0);
585 });
586
587 test_each_storage!(update_link, |storage| {
588 // create the links
589 storage.push(
590 &ActionableEvent::CreateLinks {
591 record_id: RecordId {
592 did: "did:plc:asdf".into(),
593 collection: "app.t.c".into(),
594 rkey: "fdsa".into(),
595 },
596 links: vec![
597 CollectedLink {
598 target: Link::Uri("e.com".into()),
599 path: ".abc.uri".into(),
600 },
601 CollectedLink {
602 target: Link::Uri("f.com".into()),
603 path: ".xyz[].uri".into(),
604 },
605 CollectedLink {
606 target: Link::Uri("g.com".into()),
607 path: ".xyz[].uri".into(),
608 },
609 ],
610 },
611 0,
612 )?;
613 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
614 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
615 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
616
617 // update them
618 storage.push(
619 &ActionableEvent::UpdateLinks {
620 record_id: RecordId {
621 did: "did:plc:asdf".into(),
622 collection: "app.t.c".into(),
623 rkey: "fdsa".into(),
624 },
625 new_links: vec![
626 CollectedLink {
627 target: Link::Uri("h.com".into()),
628 path: ".abc.uri".into(),
629 },
630 CollectedLink {
631 target: Link::Uri("f.com".into()),
632 path: ".xyz[].uri".into(),
633 },
634 CollectedLink {
635 target: Link::Uri("i.com".into()),
636 path: ".xyz[].uri".into(),
637 },
638 ],
639 },
640 0,
641 )?;
642 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
643 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1);
644 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
645 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
646 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1);
647 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1);
648 });
649
650 test_each_storage!(update_no_links_to_links, |storage| {
651 // update without prior create (consumer would have filtered out the original)
652 storage.push(
653 &ActionableEvent::UpdateLinks {
654 record_id: RecordId {
655 did: "did:plc:asdf".into(),
656 collection: "app.t.c".into(),
657 rkey: "asdf".into(),
658 },
659 new_links: vec![CollectedLink {
660 target: Link::Uri("a.com".into()),
661 path: ".abc.uri".into(),
662 }],
663 },
664 0,
665 )?;
666 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
667 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
668 });
669
670 test_each_storage!(delete_multi_link_same_target, |storage| {
671 storage.push(
672 &ActionableEvent::CreateLinks {
673 record_id: RecordId {
674 did: "did:plc:asdf".into(),
675 collection: "app.t.c".into(),
676 rkey: "asdf".into(),
677 },
678 links: vec![
679 CollectedLink {
680 target: Link::Uri("a.com".into()),
681 path: ".abc.uri".into(),
682 },
683 CollectedLink {
684 target: Link::Uri("a.com".into()),
685 path: ".def.uri".into(),
686 },
687 ],
688 },
689 0,
690 )?;
691 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
692 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1);
693
694 storage.push(
695 &ActionableEvent::DeleteRecord(RecordId {
696 did: "did:plc:asdf".into(),
697 collection: "app.t.c".into(),
698 rkey: "asdf".into(),
699 }),
700 0,
701 )?;
702 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0);
703 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0);
704 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0);
705 });
706
707 test_each_storage!(get_links_basic, |storage| {
708 storage.push(
709 &ActionableEvent::CreateLinks {
710 record_id: RecordId {
711 did: "did:plc:asdf".into(),
712 collection: "app.t.c".into(),
713 rkey: "asdf".into(),
714 },
715 links: vec![CollectedLink {
716 target: Link::Uri("a.com".into()),
717 path: ".abc.uri".into(),
718 }],
719 },
720 0,
721 )?;
722 assert_eq!(
723 storage.get_links(
724 "a.com",
725 "app.t.c",
726 ".abc.uri",
727 Order::NewestToOldest,
728 100,
729 None,
730 &HashSet::default()
731 )?,
732 PagedAppendingCollection {
733 version: (1, 0),
734 items: vec![RecordId {
735 did: "did:plc:asdf".into(),
736 collection: "app.t.c".into(),
737 rkey: "asdf".into(),
738 }],
739 next: None,
740 total: 1,
741 }
742 );
743 assert_eq!(
744 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
745 PagedAppendingCollection {
746 version: (1, 0),
747 items: vec!["did:plc:asdf".into()],
748 next: None,
749 total: 1,
750 }
751 );
752 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
753 });
754
755 test_each_storage!(get_links_paged, |storage| {
756 for i in 1..=5 {
757 storage.push(
758 &ActionableEvent::CreateLinks {
759 record_id: RecordId {
760 did: format!("did:plc:asdf-{i}").into(),
761 collection: "app.t.c".into(),
762 rkey: "asdf".into(),
763 },
764 links: vec![CollectedLink {
765 target: Link::Uri("a.com".into()),
766 path: ".abc.uri".into(),
767 }],
768 },
769 0,
770 )?;
771 }
772
773 let sub = "a.com";
774 let col = "app.t.c";
775 let path = ".abc.uri";
776 let order = Order::NewestToOldest;
777 let dids_filter = HashSet::new();
778
779 // --- --- round one! --- --- //
780 // all backlinks
781 let links = storage.get_links(sub, col, path, order, 2, None, &dids_filter)?;
782 assert_eq!(
783 links,
784 PagedAppendingCollection {
785 version: (5, 0),
786 items: vec![
787 RecordId {
788 did: "did:plc:asdf-5".into(),
789 collection: col.into(),
790 rkey: "asdf".into(),
791 },
792 RecordId {
793 did: "did:plc:asdf-4".into(),
794 collection: col.into(),
795 rkey: "asdf".into(),
796 },
797 ],
798 next: Some(3),
799 total: 5,
800 }
801 );
802 // distinct dids
803 let dids = storage.get_distinct_dids(sub, col, path, 2, None)?;
804 assert_eq!(
805 dids,
806 PagedAppendingCollection {
807 version: (5, 0),
808 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()],
809 next: Some(3),
810 total: 5,
811 }
812 );
813
814 // --- --- round two! --- --- //
815 // all backlinks
816 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?;
817 assert_eq!(
818 links,
819 PagedAppendingCollection {
820 version: (5, 0),
821 items: vec![
822 RecordId {
823 did: "did:plc:asdf-3".into(),
824 collection: col.into(),
825 rkey: "asdf".into(),
826 },
827 RecordId {
828 did: "did:plc:asdf-2".into(),
829 collection: col.into(),
830 rkey: "asdf".into(),
831 },
832 ],
833 next: Some(1),
834 total: 5,
835 }
836 );
837 // distinct dids
838 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?;
839 assert_eq!(
840 dids,
841 PagedAppendingCollection {
842 version: (5, 0),
843 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()],
844 next: Some(1),
845 total: 5,
846 }
847 );
848
849 // --- --- round three! --- --- //
850 // all backlinks
851 let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?;
852 assert_eq!(
853 links,
854 PagedAppendingCollection {
855 version: (5, 0),
856 items: vec![RecordId {
857 did: "did:plc:asdf-1".into(),
858 collection: col.into(),
859 rkey: "asdf".into(),
860 },],
861 next: None,
862 total: 5,
863 }
864 );
865 // distinct dids
866 let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?;
867 assert_eq!(
868 dids,
869 PagedAppendingCollection {
870 version: (5, 0),
871 items: vec!["did:plc:asdf-1".into()],
872 next: None,
873 total: 5,
874 }
875 );
876
877 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
878 });
879
880 test_each_storage!(get_links_reverse_order, |storage| {
881 for i in 1..=5 {
882 storage.push(
883 &ActionableEvent::CreateLinks {
884 record_id: RecordId {
885 did: format!("did:plc:asdf-{i}").into(),
886 collection: "app.t.c".into(),
887 rkey: "asdf".into(),
888 },
889 links: vec![CollectedLink {
890 target: Link::Uri("a.com".into()),
891 path: ".abc.uri".into(),
892 }],
893 },
894 0,
895 )?;
896 }
897
898 // Test OldestToNewest order (oldest first)
899 let links = storage.get_links(
900 "a.com",
901 "app.t.c",
902 ".abc.uri",
903 Order::OldestToNewest,
904 2,
905 None,
906 &HashSet::default(),
907 )?;
908 assert_eq!(
909 links,
910 PagedAppendingCollection {
911 version: (5, 0),
912 items: vec![
913 RecordId {
914 did: "did:plc:asdf-1".into(),
915 collection: "app.t.c".into(),
916 rkey: "asdf".into(),
917 },
918 RecordId {
919 did: "did:plc:asdf-2".into(),
920 collection: "app.t.c".into(),
921 rkey: "asdf".into(),
922 },
923 ],
924 next: Some(3),
925 total: 5,
926 }
927 );
928 // Test NewestToOldest order (newest first)
929 let links = storage.get_links(
930 "a.com",
931 "app.t.c",
932 ".abc.uri",
933 Order::NewestToOldest,
934 2,
935 None,
936 &HashSet::default(),
937 )?;
938 assert_eq!(
939 links,
940 PagedAppendingCollection {
941 version: (5, 0),
942 items: vec![
943 RecordId {
944 did: "did:plc:asdf-5".into(),
945 collection: "app.t.c".into(),
946 rkey: "asdf".into(),
947 },
948 RecordId {
949 did: "did:plc:asdf-4".into(),
950 collection: "app.t.c".into(),
951 rkey: "asdf".into(),
952 },
953 ],
954 next: Some(3),
955 total: 5,
956 }
957 );
958 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
959 });
960
961 test_each_storage!(get_filtered_links, |storage| {
962 let links = storage.get_links(
963 "a.com",
964 "app.t.c",
965 ".abc.uri",
966 Order::NewestToOldest,
967 2,
968 None,
969 &HashSet::from([Did("did:plc:linker".to_string())]),
970 )?;
971 assert_eq!(links, PagedAppendingCollection::empty());
972
973 storage.push(
974 &ActionableEvent::CreateLinks {
975 record_id: RecordId {
976 did: "did:plc:linker".into(),
977 collection: "app.t.c".into(),
978 rkey: "asdf".into(),
979 },
980 links: vec![CollectedLink {
981 target: Link::Uri("a.com".into()),
982 path: ".abc.uri".into(),
983 }],
984 },
985 0,
986 )?;
987
988 let links = storage.get_links(
989 "a.com",
990 "app.t.c",
991 ".abc.uri",
992 Order::NewestToOldest,
993 2,
994 None,
995 &HashSet::from([Did("did:plc:linker".to_string())]),
996 )?;
997 assert_eq!(
998 links,
999 PagedAppendingCollection {
1000 version: (1, 0),
1001 items: vec![RecordId {
1002 did: "did:plc:linker".into(),
1003 collection: "app.t.c".into(),
1004 rkey: "asdf".into(),
1005 },],
1006 next: None,
1007 total: 1,
1008 }
1009 );
1010
1011 let links = storage.get_links(
1012 "a.com",
1013 "app.t.c",
1014 ".abc.uri",
1015 Order::NewestToOldest,
1016 2,
1017 None,
1018 &HashSet::from([Did("did:plc:someone-else".to_string())]),
1019 )?;
1020 assert_eq!(links, PagedAppendingCollection::empty());
1021
1022 storage.push(
1023 &ActionableEvent::CreateLinks {
1024 record_id: RecordId {
1025 did: "did:plc:linker".into(),
1026 collection: "app.t.c".into(),
1027 rkey: "asdf-2".into(),
1028 },
1029 links: vec![CollectedLink {
1030 target: Link::Uri("a.com".into()),
1031 path: ".abc.uri".into(),
1032 }],
1033 },
1034 0,
1035 )?;
1036 storage.push(
1037 &ActionableEvent::CreateLinks {
1038 record_id: RecordId {
1039 did: "did:plc:someone-else".into(),
1040 collection: "app.t.c".into(),
1041 rkey: "asdf".into(),
1042 },
1043 links: vec![CollectedLink {
1044 target: Link::Uri("a.com".into()),
1045 path: ".abc.uri".into(),
1046 }],
1047 },
1048 0,
1049 )?;
1050
1051 let links = storage.get_links(
1052 "a.com",
1053 "app.t.c",
1054 ".abc.uri",
1055 Order::NewestToOldest,
1056 2,
1057 None,
1058 &HashSet::from([Did("did:plc:linker".to_string())]),
1059 )?;
1060 assert_eq!(
1061 links,
1062 PagedAppendingCollection {
1063 version: (2, 0),
1064 items: vec![
1065 RecordId {
1066 did: "did:plc:linker".into(),
1067 collection: "app.t.c".into(),
1068 rkey: "asdf-2".into(),
1069 },
1070 RecordId {
1071 did: "did:plc:linker".into(),
1072 collection: "app.t.c".into(),
1073 rkey: "asdf".into(),
1074 },
1075 ],
1076 next: None,
1077 total: 2,
1078 }
1079 );
1080
1081 let links = storage.get_links(
1082 "a.com",
1083 "app.t.c",
1084 ".abc.uri",
1085 Order::NewestToOldest,
1086 2,
1087 None,
1088 &HashSet::from([
1089 Did("did:plc:linker".to_string()),
1090 Did("did:plc:someone-else".to_string()),
1091 ]),
1092 )?;
1093 assert_eq!(
1094 links,
1095 PagedAppendingCollection {
1096 version: (3, 0),
1097 items: vec![
1098 RecordId {
1099 did: "did:plc:someone-else".into(),
1100 collection: "app.t.c".into(),
1101 rkey: "asdf".into(),
1102 },
1103 RecordId {
1104 did: "did:plc:linker".into(),
1105 collection: "app.t.c".into(),
1106 rkey: "asdf-2".into(),
1107 },
1108 ],
1109 next: Some(1),
1110 total: 3,
1111 }
1112 );
1113
1114 let links = storage.get_links(
1115 "a.com",
1116 "app.t.c",
1117 ".abc.uri",
1118 Order::NewestToOldest,
1119 2,
1120 None,
1121 &HashSet::from([Did("did:plc:someone-unknown".to_string())]),
1122 )?;
1123 assert_eq!(links, PagedAppendingCollection::empty());
1124 });
1125
1126 test_each_storage!(get_links_exact_multiple, |storage| {
1127 for i in 1..=4 {
1128 storage.push(
1129 &ActionableEvent::CreateLinks {
1130 record_id: RecordId {
1131 did: format!("did:plc:asdf-{i}").into(),
1132 collection: "app.t.c".into(),
1133 rkey: "asdf".into(),
1134 },
1135 links: vec![CollectedLink {
1136 target: Link::Uri("a.com".into()),
1137 path: ".abc.uri".into(),
1138 }],
1139 },
1140 0,
1141 )?;
1142 }
1143 let links = storage.get_links(
1144 "a.com",
1145 "app.t.c",
1146 ".abc.uri",
1147 Order::NewestToOldest,
1148 2,
1149 None,
1150 &HashSet::default(),
1151 )?;
1152 assert_eq!(
1153 links,
1154 PagedAppendingCollection {
1155 version: (4, 0),
1156 items: vec![
1157 RecordId {
1158 did: "did:plc:asdf-4".into(),
1159 collection: "app.t.c".into(),
1160 rkey: "asdf".into(),
1161 },
1162 RecordId {
1163 did: "did:plc:asdf-3".into(),
1164 collection: "app.t.c".into(),
1165 rkey: "asdf".into(),
1166 },
1167 ],
1168 next: Some(2),
1169 total: 4,
1170 }
1171 );
1172 let links = storage.get_links(
1173 "a.com",
1174 "app.t.c",
1175 ".abc.uri",
1176 Order::NewestToOldest,
1177 2,
1178 links.next,
1179 &HashSet::default(),
1180 )?;
1181 assert_eq!(
1182 links,
1183 PagedAppendingCollection {
1184 version: (4, 0),
1185 items: vec![
1186 RecordId {
1187 did: "did:plc:asdf-2".into(),
1188 collection: "app.t.c".into(),
1189 rkey: "asdf".into(),
1190 },
1191 RecordId {
1192 did: "did:plc:asdf-1".into(),
1193 collection: "app.t.c".into(),
1194 rkey: "asdf".into(),
1195 },
1196 ],
1197 next: None,
1198 total: 4,
1199 }
1200 );
1201 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1202 });
1203
1204 test_each_storage!(page_links_while_new_links_arrive, |storage| {
1205 for i in 1..=4 {
1206 storage.push(
1207 &ActionableEvent::CreateLinks {
1208 record_id: RecordId {
1209 did: format!("did:plc:asdf-{i}").into(),
1210 collection: "app.t.c".into(),
1211 rkey: "asdf".into(),
1212 },
1213 links: vec![CollectedLink {
1214 target: Link::Uri("a.com".into()),
1215 path: ".abc.uri".into(),
1216 }],
1217 },
1218 0,
1219 )?;
1220 }
1221 let links = storage.get_links(
1222 "a.com",
1223 "app.t.c",
1224 ".abc.uri",
1225 Order::NewestToOldest,
1226 2,
1227 None,
1228 &HashSet::default(),
1229 )?;
1230 assert_eq!(
1231 links,
1232 PagedAppendingCollection {
1233 version: (4, 0),
1234 items: vec![
1235 RecordId {
1236 did: "did:plc:asdf-4".into(),
1237 collection: "app.t.c".into(),
1238 rkey: "asdf".into(),
1239 },
1240 RecordId {
1241 did: "did:plc:asdf-3".into(),
1242 collection: "app.t.c".into(),
1243 rkey: "asdf".into(),
1244 },
1245 ],
1246 next: Some(2),
1247 total: 4,
1248 }
1249 );
1250 storage.push(
1251 &ActionableEvent::CreateLinks {
1252 record_id: RecordId {
1253 did: "did:plc:asdf-5".into(),
1254 collection: "app.t.c".into(),
1255 rkey: "asdf".into(),
1256 },
1257 links: vec![CollectedLink {
1258 target: Link::Uri("a.com".into()),
1259 path: ".abc.uri".into(),
1260 }],
1261 },
1262 0,
1263 )?;
1264 let links = storage.get_links(
1265 "a.com",
1266 "app.t.c",
1267 ".abc.uri",
1268 Order::NewestToOldest,
1269 2,
1270 links.next,
1271 &HashSet::default(),
1272 )?;
1273 assert_eq!(
1274 links,
1275 PagedAppendingCollection {
1276 version: (5, 0),
1277 items: vec![
1278 RecordId {
1279 did: "did:plc:asdf-2".into(),
1280 collection: "app.t.c".into(),
1281 rkey: "asdf".into(),
1282 },
1283 RecordId {
1284 did: "did:plc:asdf-1".into(),
1285 collection: "app.t.c".into(),
1286 rkey: "asdf".into(),
1287 },
1288 ],
1289 next: None,
1290 total: 5,
1291 }
1292 );
1293 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
1294 });
1295
1296 test_each_storage!(page_links_while_some_are_deleted, |storage| {
1297 for i in 1..=4 {
1298 storage.push(
1299 &ActionableEvent::CreateLinks {
1300 record_id: RecordId {
1301 did: format!("did:plc:asdf-{i}").into(),
1302 collection: "app.t.c".into(),
1303 rkey: "asdf".into(),
1304 },
1305 links: vec![CollectedLink {
1306 target: Link::Uri("a.com".into()),
1307 path: ".abc.uri".into(),
1308 }],
1309 },
1310 0,
1311 )?;
1312 }
1313 let links = storage.get_links(
1314 "a.com",
1315 "app.t.c",
1316 ".abc.uri",
1317 Order::NewestToOldest,
1318 2,
1319 None,
1320 &HashSet::default(),
1321 )?;
1322 assert_eq!(
1323 links,
1324 PagedAppendingCollection {
1325 version: (4, 0),
1326 items: vec![
1327 RecordId {
1328 did: "did:plc:asdf-4".into(),
1329 collection: "app.t.c".into(),
1330 rkey: "asdf".into(),
1331 },
1332 RecordId {
1333 did: "did:plc:asdf-3".into(),
1334 collection: "app.t.c".into(),
1335 rkey: "asdf".into(),
1336 },
1337 ],
1338 next: Some(2),
1339 total: 4,
1340 }
1341 );
1342 storage.push(
1343 &ActionableEvent::DeleteRecord(RecordId {
1344 did: "did:plc:asdf-2".into(),
1345 collection: "app.t.c".into(),
1346 rkey: "asdf".into(),
1347 }),
1348 0,
1349 )?;
1350 let links = storage.get_links(
1351 "a.com",
1352 "app.t.c",
1353 ".abc.uri",
1354 Order::NewestToOldest,
1355 2,
1356 links.next,
1357 &HashSet::default(),
1358 )?;
1359 assert_eq!(
1360 links,
1361 PagedAppendingCollection {
1362 version: (4, 1),
1363 items: vec![RecordId {
1364 did: "did:plc:asdf-1".into(),
1365 collection: "app.t.c".into(),
1366 rkey: "asdf".into(),
1367 },],
1368 next: None,
1369 total: 3,
1370 }
1371 );
1372 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3);
1373 });
1374
1375 test_each_storage!(page_links_accounts_inactive, |storage| {
1376 for i in 1..=4 {
1377 storage.push(
1378 &ActionableEvent::CreateLinks {
1379 record_id: RecordId {
1380 did: format!("did:plc:asdf-{i}").into(),
1381 collection: "app.t.c".into(),
1382 rkey: "asdf".into(),
1383 },
1384 links: vec![CollectedLink {
1385 target: Link::Uri("a.com".into()),
1386 path: ".abc.uri".into(),
1387 }],
1388 },
1389 0,
1390 )?;
1391 }
1392 let links = storage.get_links(
1393 "a.com",
1394 "app.t.c",
1395 ".abc.uri",
1396 Order::NewestToOldest,
1397 2,
1398 None,
1399 &HashSet::default(),
1400 )?;
1401 assert_eq!(
1402 links,
1403 PagedAppendingCollection {
1404 version: (4, 0),
1405 items: vec![
1406 RecordId {
1407 did: "did:plc:asdf-4".into(),
1408 collection: "app.t.c".into(),
1409 rkey: "asdf".into(),
1410 },
1411 RecordId {
1412 did: "did:plc:asdf-3".into(),
1413 collection: "app.t.c".into(),
1414 rkey: "asdf".into(),
1415 },
1416 ],
1417 next: Some(2),
1418 total: 4,
1419 }
1420 );
1421 storage.push(
1422 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()),
1423 0,
1424 )?;
1425 let links = storage.get_links(
1426 "a.com",
1427 "app.t.c",
1428 ".abc.uri",
1429 Order::NewestToOldest,
1430 2,
1431 links.next,
1432 &HashSet::default(),
1433 )?;
1434 assert_eq!(
1435 links,
1436 PagedAppendingCollection {
1437 version: (4, 0),
1438 items: vec![RecordId {
1439 did: "did:plc:asdf-2".into(),
1440 collection: "app.t.c".into(),
1441 rkey: "asdf".into(),
1442 },],
1443 next: None,
1444 total: 4,
1445 }
1446 );
1447 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1448 });
1449
1450 test_each_storage!(get_all_counts, |storage| {
1451 storage.push(
1452 &ActionableEvent::CreateLinks {
1453 record_id: RecordId {
1454 did: "did:plc:asdf".into(),
1455 collection: "app.t.c".into(),
1456 rkey: "asdf".into(),
1457 },
1458 links: vec![
1459 CollectedLink {
1460 target: Link::Uri("a.com".into()),
1461 path: ".abc.uri".into(),
1462 },
1463 CollectedLink {
1464 target: Link::Uri("a.com".into()),
1465 path: ".def.uri".into(),
1466 },
1467 ],
1468 },
1469 0,
1470 )?;
1471 assert_eq!(storage.get_all_record_counts("a.com")?, {
1472 let mut counts = HashMap::new();
1473 let mut t_c_counts = HashMap::new();
1474 t_c_counts.insert(".abc.uri".into(), 1);
1475 t_c_counts.insert(".def.uri".into(), 1);
1476 counts.insert("app.t.c".into(), t_c_counts);
1477 counts
1478 });
1479 assert_eq!(storage.get_all_counts("a.com")?, {
1480 let mut counts = HashMap::new();
1481 let mut t_c_counts = HashMap::new();
1482 t_c_counts.insert(
1483 ".abc.uri".into(),
1484 CountsByCount {
1485 records: 1,
1486 distinct_dids: 1,
1487 },
1488 );
1489 t_c_counts.insert(
1490 ".def.uri".into(),
1491 CountsByCount {
1492 records: 1,
1493 distinct_dids: 1,
1494 },
1495 );
1496 counts.insert("app.t.c".into(), t_c_counts);
1497 counts
1498 });
1499 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1500 });
1501
1502 //////// many-to-many /////////
1503
1504 test_each_storage!(get_m2m_counts_empty, |storage| {
1505 assert_eq!(
1506 storage.get_many_to_many_counts(
1507 "a.com",
1508 "a.b.c",
1509 ".d.e",
1510 ".f.g",
1511 10,
1512 None,
1513 &HashSet::new(),
1514 &HashSet::new(),
1515 )?,
1516 PagedOrderedCollection::empty()
1517 );
1518 });
1519
1520 test_each_storage!(get_m2m_counts_single, |storage| {
1521 storage.push(
1522 &ActionableEvent::CreateLinks {
1523 record_id: RecordId {
1524 did: "did:plc:asdf".into(),
1525 collection: "app.t.c".into(),
1526 rkey: "asdf".into(),
1527 },
1528 links: vec![
1529 CollectedLink {
1530 target: Link::Uri("a.com".into()),
1531 path: ".abc.uri".into(),
1532 },
1533 CollectedLink {
1534 target: Link::Uri("b.com".into()),
1535 path: ".def.uri".into(),
1536 },
1537 CollectedLink {
1538 target: Link::Uri("b.com".into()),
1539 path: ".ghi.uri".into(),
1540 },
1541 ],
1542 },
1543 0,
1544 )?;
1545 assert_eq!(
1546 storage.get_many_to_many_counts(
1547 "a.com",
1548 "app.t.c",
1549 ".abc.uri",
1550 ".def.uri",
1551 10,
1552 None,
1553 &HashSet::new(),
1554 &HashSet::new(),
1555 )?,
1556 PagedOrderedCollection {
1557 items: vec![("b.com".to_string(), 1, 1)],
1558 next: None,
1559 }
1560 );
1561 });
1562
1563 test_each_storage!(get_m2m_counts_filters, |storage| {
1564 storage.push(
1565 &ActionableEvent::CreateLinks {
1566 record_id: RecordId {
1567 did: "did:plc:asdf".into(),
1568 collection: "app.t.c".into(),
1569 rkey: "asdf".into(),
1570 },
1571 links: vec![
1572 CollectedLink {
1573 target: Link::Uri("a.com".into()),
1574 path: ".abc.uri".into(),
1575 },
1576 CollectedLink {
1577 target: Link::Uri("b.com".into()),
1578 path: ".def.uri".into(),
1579 },
1580 ],
1581 },
1582 0,
1583 )?;
1584 storage.push(
1585 &ActionableEvent::CreateLinks {
1586 record_id: RecordId {
1587 did: "did:plc:asdfasdf".into(),
1588 collection: "app.t.c".into(),
1589 rkey: "asdf".into(),
1590 },
1591 links: vec![
1592 CollectedLink {
1593 target: Link::Uri("a.com".into()),
1594 path: ".abc.uri".into(),
1595 },
1596 CollectedLink {
1597 target: Link::Uri("b.com".into()),
1598 path: ".def.uri".into(),
1599 },
1600 ],
1601 },
1602 1,
1603 )?;
1604 storage.push(
1605 &ActionableEvent::CreateLinks {
1606 record_id: RecordId {
1607 did: "did:plc:fdsa".into(),
1608 collection: "app.t.c".into(),
1609 rkey: "asdf".into(),
1610 },
1611 links: vec![
1612 CollectedLink {
1613 target: Link::Uri("a.com".into()),
1614 path: ".abc.uri".into(),
1615 },
1616 CollectedLink {
1617 target: Link::Uri("c.com".into()),
1618 path: ".def.uri".into(),
1619 },
1620 ],
1621 },
1622 2,
1623 )?;
1624 storage.push(
1625 &ActionableEvent::CreateLinks {
1626 record_id: RecordId {
1627 did: "did:plc:fdsa".into(),
1628 collection: "app.t.c".into(),
1629 rkey: "asdf2".into(),
1630 },
1631 links: vec![
1632 CollectedLink {
1633 target: Link::Uri("a.com".into()),
1634 path: ".abc.uri".into(),
1635 },
1636 CollectedLink {
1637 target: Link::Uri("c.com".into()),
1638 path: ".def.uri".into(),
1639 },
1640 ],
1641 },
1642 3,
1643 )?;
1644 assert_eq!(
1645 storage.get_many_to_many_counts(
1646 "a.com",
1647 "app.t.c",
1648 ".abc.uri",
1649 ".def.uri",
1650 10,
1651 None,
1652 &HashSet::new(),
1653 &HashSet::new(),
1654 )?,
1655 PagedOrderedCollection {
1656 items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1657 next: None,
1658 }
1659 );
1660 assert_eq!(
1661 storage.get_many_to_many_counts(
1662 "a.com",
1663 "app.t.c",
1664 ".abc.uri",
1665 ".def.uri",
1666 10,
1667 None,
1668 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1669 &HashSet::new(),
1670 )?,
1671 PagedOrderedCollection {
1672 items: vec![("c.com".to_string(), 2, 1),],
1673 next: None,
1674 }
1675 );
1676 assert_eq!(
1677 storage.get_many_to_many_counts(
1678 "a.com",
1679 "app.t.c",
1680 ".abc.uri",
1681 ".def.uri",
1682 10,
1683 None,
1684 &HashSet::new(),
1685 &HashSet::from_iter(["b.com".to_string()]),
1686 )?,
1687 PagedOrderedCollection {
1688 items: vec![("b.com".to_string(), 2, 2),],
1689 next: None,
1690 }
1691 );
1692
1693 // Pagination edge cases: we have 2 grouped results (b.com and c.com)
1694
1695 // Case 1: limit > items (limit=10, items=2) -> next should be None
1696 let result = storage.get_many_to_many_counts(
1697 "a.com",
1698 "app.t.c",
1699 ".abc.uri",
1700 ".def.uri",
1701 10,
1702 None,
1703 &HashSet::new(),
1704 &HashSet::new(),
1705 )?;
1706 assert_eq!(result.items.len(), 2);
1707 assert_eq!(result.next, None, "next should be None when items < limit");
1708
1709 // Case 2: limit == items (limit=2, items=2) -> next should be None
1710 let result = storage.get_many_to_many_counts(
1711 "a.com",
1712 "app.t.c",
1713 ".abc.uri",
1714 ".def.uri",
1715 2,
1716 None,
1717 &HashSet::new(),
1718 &HashSet::new(),
1719 )?;
1720 assert_eq!(result.items.len(), 2);
1721 assert_eq!(
1722 result.next, None,
1723 "next should be None when items == limit (no more pages)"
1724 );
1725
1726 // Case 3: limit < items (limit=1, items=2) -> next should be Some
1727 let result = storage.get_many_to_many_counts(
1728 "a.com",
1729 "app.t.c",
1730 ".abc.uri",
1731 ".def.uri",
1732 1,
1733 None,
1734 &HashSet::new(),
1735 &HashSet::new(),
1736 )?;
1737 assert_eq!(result.items.len(), 1);
1738 assert!(
1739 result.next.is_some(),
1740 "next should be Some when items > limit"
1741 );
1742
1743 // Verify second page returns remaining item with no cursor
1744 let result2 = storage.get_many_to_many_counts(
1745 "a.com",
1746 "app.t.c",
1747 ".abc.uri",
1748 ".def.uri",
1749 1,
1750 result.next,
1751 &HashSet::new(),
1752 &HashSet::new(),
1753 )?;
1754 assert_eq!(result2.items.len(), 1);
1755 assert_eq!(result2.next, None, "next should be None on final page");
1756 });
1757
1758 test_each_storage!(get_m2m_empty, |storage| {
1759 assert_eq!(
1760 storage.get_many_to_many(
1761 "a.com",
1762 "a.b.c",
1763 ".d.e",
1764 ".f.g",
1765 10,
1766 None,
1767 &HashSet::new(),
1768 &HashSet::new(),
1769 )?,
1770 PagedOrderedCollection {
1771 items: vec![],
1772 next: None,
1773 }
1774 );
1775 });
1776
1777 test_each_storage!(get_m2m_single, |storage| {
1778 // One record linking to a.com (backward), with two forward links at
1779 // the same path_to_other (.def.uri) pointing to b.com and c.com.
1780 // Both forward targets must appear in the output.
1781 storage.push(
1782 &ActionableEvent::CreateLinks {
1783 record_id: RecordId {
1784 did: "did:plc:asdf".into(),
1785 collection: "app.t.c".into(),
1786 rkey: "asdf".into(),
1787 },
1788 links: vec![
1789 CollectedLink {
1790 target: Link::Uri("a.com".into()),
1791 path: ".abc.uri".into(),
1792 },
1793 CollectedLink {
1794 target: Link::Uri("b.com".into()),
1795 path: ".def.uri".into(),
1796 },
1797 CollectedLink {
1798 target: Link::Uri("c.com".into()),
1799 path: ".def.uri".into(),
1800 },
1801 ],
1802 },
1803 0,
1804 )?;
1805 let result = storage.get_many_to_many(
1806 "a.com",
1807 "app.t.c",
1808 ".abc.uri",
1809 ".def.uri",
1810 10,
1811 None,
1812 &HashSet::new(),
1813 &HashSet::new(),
1814 )?;
1815 assert_eq!(
1816 result.items.len(),
1817 2,
1818 "both forward links at path_to_other should be emitted"
1819 );
1820 let mut targets: Vec<_> = result
1821 .items
1822 .iter()
1823 .map(|item| item.other_subject.as_str())
1824 .collect();
1825 targets.sort();
1826 assert_eq!(targets, vec!["b.com", "c.com"]);
1827 assert!(result
1828 .items
1829 .iter()
1830 .all(|item| item.link_record.uri() == "at://did:plc:asdf/app.t.c/asdf"));
1831 assert_eq!(result.next, None);
1832 });
1833
1834 test_each_storage!(get_m2m_filters, |storage| {
1835 storage.push(
1836 &ActionableEvent::CreateLinks {
1837 record_id: RecordId {
1838 did: "did:plc:asdf".into(),
1839 collection: "app.t.c".into(),
1840 rkey: "asdf".into(),
1841 },
1842 links: vec![
1843 CollectedLink {
1844 target: Link::Uri("a.com".into()),
1845 path: ".abc.uri".into(),
1846 },
1847 CollectedLink {
1848 target: Link::Uri("b.com".into()),
1849 path: ".def.uri".into(),
1850 },
1851 ],
1852 },
1853 0,
1854 )?;
1855 storage.push(
1856 &ActionableEvent::CreateLinks {
1857 record_id: RecordId {
1858 did: "did:plc:asdf".into(),
1859 collection: "app.t.c".into(),
1860 rkey: "asdf2".into(),
1861 },
1862 links: vec![
1863 CollectedLink {
1864 target: Link::Uri("a.com".into()),
1865 path: ".abc.uri".into(),
1866 },
1867 CollectedLink {
1868 target: Link::Uri("b.com".into()),
1869 path: ".def.uri".into(),
1870 },
1871 ],
1872 },
1873 1,
1874 )?;
1875 storage.push(
1876 &ActionableEvent::CreateLinks {
1877 record_id: RecordId {
1878 did: "did:plc:fdsa".into(),
1879 collection: "app.t.c".into(),
1880 rkey: "fdsa".into(),
1881 },
1882 links: vec![
1883 CollectedLink {
1884 target: Link::Uri("a.com".into()),
1885 path: ".abc.uri".into(),
1886 },
1887 CollectedLink {
1888 target: Link::Uri("c.com".into()),
1889 path: ".def.uri".into(),
1890 },
1891 ],
1892 },
1893 2,
1894 )?;
1895 storage.push(
1896 &ActionableEvent::CreateLinks {
1897 record_id: RecordId {
1898 did: "did:plc:fdsa".into(),
1899 collection: "app.t.c".into(),
1900 rkey: "fdsa2".into(),
1901 },
1902 links: vec![
1903 CollectedLink {
1904 target: Link::Uri("a.com".into()),
1905 path: ".abc.uri".into(),
1906 },
1907 CollectedLink {
1908 target: Link::Uri("c.com".into()),
1909 path: ".def.uri".into(),
1910 },
1911 ],
1912 },
1913 3,
1914 )?;
1915
1916 // Test without filters - should get all records as flat items
1917 let result = storage.get_many_to_many(
1918 "a.com",
1919 "app.t.c",
1920 ".abc.uri",
1921 ".def.uri",
1922 10,
1923 None,
1924 &HashSet::new(),
1925 &HashSet::new(),
1926 )?;
1927 assert_eq!(result.items.len(), 4);
1928 assert_eq!(result.next, None);
1929 // Check b.com items
1930 let b_items: Vec<_> = result
1931 .items
1932 .iter()
1933 .filter(|item| item.other_subject == "b.com")
1934 .collect();
1935 assert_eq!(b_items.len(), 2);
1936 assert!(b_items.iter().any(
1937 |item| item.link_record.did.0 == "did:plc:asdf" && item.link_record.rkey == "asdf"
1938 ));
1939 assert!(b_items.iter().any(
1940 |item| item.link_record.did.0 == "did:plc:asdf" && item.link_record.rkey == "asdf2"
1941 ));
1942 // Check c.com items
1943 let c_items: Vec<_> = result
1944 .items
1945 .iter()
1946 .filter(|item| item.other_subject == "c.com")
1947 .collect();
1948 assert_eq!(c_items.len(), 2);
1949 assert!(c_items.iter().any(
1950 |item| item.link_record.did.0 == "did:plc:fdsa" && item.link_record.rkey == "fdsa"
1951 ));
1952 assert!(c_items.iter().any(
1953 |item| item.link_record.did.0 == "did:plc:fdsa" && item.link_record.rkey == "fdsa2"
1954 ));
1955
1956 // Test with DID filter - should only get records from did:plc:fdsa
1957 let result = storage.get_many_to_many(
1958 "a.com",
1959 "app.t.c",
1960 ".abc.uri",
1961 ".def.uri",
1962 10,
1963 None,
1964 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1965 &HashSet::new(),
1966 )?;
1967 assert_eq!(result.items.len(), 2);
1968 assert!(result
1969 .items
1970 .iter()
1971 .all(|item| item.other_subject == "c.com"));
1972 assert!(result
1973 .items
1974 .iter()
1975 .all(|item| item.link_record.did.0 == "did:plc:fdsa"));
1976
1977 // Test with target filter - should only get records linking to b.com
1978 let result = storage.get_many_to_many(
1979 "a.com",
1980 "app.t.c",
1981 ".abc.uri",
1982 ".def.uri",
1983 10,
1984 None,
1985 &HashSet::new(),
1986 &HashSet::from_iter(["b.com".to_string()]),
1987 )?;
1988 assert_eq!(result.items.len(), 2);
1989 assert!(result
1990 .items
1991 .iter()
1992 .all(|item| item.other_subject == "b.com"));
1993 assert!(result
1994 .items
1995 .iter()
1996 .all(|item| item.link_record.did.0 == "did:plc:asdf"));
1997
1998 // Pagination edge cases: we have 4 flat items
1999
2000 // Case 1: limit > items (limit=10, items=4) -> next should be None
2001 let result = storage.get_many_to_many(
2002 "a.com",
2003 "app.t.c",
2004 ".abc.uri",
2005 ".def.uri",
2006 10,
2007 None,
2008 &HashSet::new(),
2009 &HashSet::new(),
2010 )?;
2011 assert_eq!(result.items.len(), 4);
2012 assert_eq!(result.next, None, "next should be None when items < limit");
2013
2014 // Case 2: limit == items (limit=4, items=4) -> next should be None
2015 let result = storage.get_many_to_many(
2016 "a.com",
2017 "app.t.c",
2018 ".abc.uri",
2019 ".def.uri",
2020 4,
2021 None,
2022 &HashSet::new(),
2023 &HashSet::new(),
2024 )?;
2025 assert_eq!(result.items.len(), 4);
2026 assert_eq!(
2027 result.next, None,
2028 "next should be None when items == limit (no more pages)"
2029 );
2030
2031 // Case 3: limit < items (limit=3, items=4) -> next should be Some
2032 let result = storage.get_many_to_many(
2033 "a.com",
2034 "app.t.c",
2035 ".abc.uri",
2036 ".def.uri",
2037 3,
2038 None,
2039 &HashSet::new(),
2040 &HashSet::new(),
2041 )?;
2042 assert_eq!(result.items.len(), 3);
2043 assert!(
2044 result.next.is_some(),
2045 "next should be Some when items > limit"
2046 );
2047
2048 // Verify second page returns remaining item with no cursor.
2049 // This now works correctly because we use a composite cursor that includes
2050 // (target, did, rkey), allowing pagination even when multiple records share
2051 // the same target string.
2052 let result2 = storage.get_many_to_many(
2053 "a.com",
2054 "app.t.c",
2055 ".abc.uri",
2056 ".def.uri",
2057 3,
2058 result.next,
2059 &HashSet::new(),
2060 &HashSet::new(),
2061 )?;
2062 assert_eq!(
2063 result2.items.len(),
2064 1,
2065 "second page should have 1 remaining item"
2066 );
2067 assert_eq!(result2.next, None, "next should be None on final page");
2068
2069 // Verify we got all 4 unique items across both pages (no duplicates, no gaps)
2070 let mut all_rkeys: Vec<_> = result
2071 .items
2072 .iter()
2073 .map(|item| item.link_record.rkey.clone())
2074 .collect();
2075 all_rkeys.extend(
2076 result2
2077 .items
2078 .iter()
2079 .map(|item| item.link_record.rkey.clone()),
2080 );
2081 all_rkeys.sort();
2082 assert_eq!(
2083 all_rkeys,
2084 vec!["asdf", "asdf2", "fdsa", "fdsa2"],
2085 "should have all 4 records across both pages"
2086 );
2087 });
2088
2089 // Pagination that splits across forward links within a single backlinker.
2090 // The cursor should correctly resume mid-record on the next page.
2091 test_each_storage!(get_m2m_paginate_within_forward_links, |storage| {
2092 // Record with 1 backward link and 3 forward links at the same path
2093 storage.push(
2094 &ActionableEvent::CreateLinks {
2095 record_id: RecordId {
2096 did: "did:plc:lister".into(),
2097 collection: "app.t.c".into(),
2098 rkey: "list1".into(),
2099 },
2100 links: vec![
2101 CollectedLink {
2102 target: Link::Uri("a.com".into()),
2103 path: ".subject.uri".into(),
2104 },
2105 CollectedLink {
2106 target: Link::Uri("x.com".into()),
2107 path: ".items[].uri".into(),
2108 },
2109 CollectedLink {
2110 target: Link::Uri("y.com".into()),
2111 path: ".items[].uri".into(),
2112 },
2113 CollectedLink {
2114 target: Link::Uri("z.com".into()),
2115 path: ".items[].uri".into(),
2116 },
2117 ],
2118 },
2119 0,
2120 )?;
2121
2122 // Page 1: limit=2, should get 2 of the 3 forward links
2123 let page1 = storage.get_many_to_many(
2124 "a.com",
2125 "app.t.c",
2126 ".subject.uri",
2127 ".items[].uri",
2128 2,
2129 None,
2130 &HashSet::new(),
2131 &HashSet::new(),
2132 )?;
2133 assert_eq!(page1.items.len(), 2, "first page should have 2 items");
2134 assert!(
2135 page1.next.is_some(),
2136 "should have a next cursor for remaining item"
2137 );
2138
2139 // Page 2: should get the remaining 1 forward link
2140 let page2 = storage.get_many_to_many(
2141 "a.com",
2142 "app.t.c",
2143 ".subject.uri",
2144 ".items[].uri",
2145 2,
2146 page1.next,
2147 &HashSet::new(),
2148 &HashSet::new(),
2149 )?;
2150 assert_eq!(page2.items.len(), 1, "second page should have 1 item");
2151 assert_eq!(page2.next, None, "no more pages");
2152
2153 // Verify all 3 targets appear across pages with no duplicates
2154 let mut all_targets: Vec<_> = page1
2155 .items
2156 .iter()
2157 .chain(page2.items.iter())
2158 .map(|item| item.other_subject.clone())
2159 .collect();
2160 all_targets.sort();
2161 assert_eq!(
2162 all_targets,
2163 vec!["x.com", "y.com", "z.com"],
2164 "all forward targets should appear exactly once across pages"
2165 );
2166 });
2167}