Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Add new get_many_to_many XRPC endpoint #7

merged opened by seoul.systems targeting main from seoul.systems/microcosm-rs: xrpc_many2many

Added a new XRPC API endpoint to fetch joined record URIs, termed get_many_to_many (we talked about this briefly on Discord already). It is implemented and functions almost identical to the existing get_many_to_many_counts endpoint and handler. Some of its possible flaws like the two step lookup to verify a matching DID is indeed active are duplicated as well. On the plus side, this should make the PR pretty straightforward to review and make it easier to modify both endpoints later on when a more efficient way to validate the status of DIDs is possible.

If you have comments remarks etc. I am happy to work on some parts again.

Labels

None yet.

Participants 2
AT URI
at://did:plc:53wellrw53o7sw4zlpfenvuh/sh.tangled.repo.pull/3mbkyehqooh22
+1091 -20
Diff #7
+2 -2
constellation/src/lib.rs
··· 22 22 DeleteAccount(Did), 23 23 } 24 24 25 - #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] 25 + #[derive(Debug, Hash, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] 26 26 pub struct Did(pub String); 27 27 28 28 impl<T: Into<String>> From<T> for Did { ··· 31 31 } 32 32 } 33 33 34 - #[derive(Debug, PartialEq, Serialize, Deserialize)] 34 + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)] 35 35 pub struct RecordId { 36 36 pub did: Did, 37 37 pub collection: String,
+115
constellation/src/server/mod.rs
··· 114 114 }), 115 115 ) 116 116 .route( 117 + "/xrpc/blue.microcosm.links.getManyToMany", 118 + get({ 119 + let store = store.clone(); 120 + move |accept, query| async { 121 + spawn_blocking(|| get_many_to_many(accept, query, store)) 122 + .await 123 + .map_err(to500)? 124 + } 125 + }), 126 + ) 127 + .route( 117 128 "/xrpc/blue.microcosm.links.getBacklinks", 118 129 get({ 119 130 let store = store.clone(); ··· 661 672 GetLinkItemsResponse { 662 673 total: paged.total, 663 674 linking_records: paged.items, 675 + cursor, 676 + query: (*query).clone(), 677 + }, 678 + )) 679 + } 680 + 681 + #[derive(Clone, Deserialize)] 682 + #[serde(rename_all = "camelCase")] 683 + struct GetManyToManyItemsQuery { 684 + subject: String, 685 + source: String, 686 + /// path to the secondary link in the linking record 687 + path_to_other: String, 688 + /// filter to linking records (join of the m2m) by these DIDs 689 + #[serde(default)] 690 + did: Vec<String>, 691 + /// filter to specific secondary records 692 + #[serde(default)] 693 + other_subject: Vec<String>, 694 + cursor: Option<OpaqueApiCursor>, 695 + #[serde(default = "get_default_cursor_limit")] 696 + limit: u64, 697 + } 698 + #[derive(Debug, Serialize, Clone)] 699 + struct ManyToManyItem { 700 + link: RecordId, 701 + subject: String, 702 + } 703 + #[derive(Template, Serialize)] 704 + #[template(path = "get-many-to-many.html.j2")] 705 + struct GetManyToManyItemsResponse { 706 + items: Vec<ManyToManyItem>, 707 + cursor: Option<OpaqueApiCursor>, 708 + #[serde(skip_serializing)] 709 + query: GetManyToManyItemsQuery, 710 + } 711 + fn get_many_to_many( 712 + accept: ExtractAccept, 713 + query: axum_extra::extract::Query<GetManyToManyItemsQuery>, // supports multiple param occurrences 714 + store: impl LinkReader, 715 + ) -> Result<impl IntoResponse, http::StatusCode> { 716 + let after = query 717 + .cursor 718 + .clone() 719 + .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 720 + .transpose()? 721 + .map(|c| c.next); 722 + 723 + let limit = query.limit; 724 + if limit > DEFAULT_CURSOR_LIMIT_MAX { 725 + return Err(http::StatusCode::BAD_REQUEST); 726 + } 727 + 728 + let filter_dids: HashSet<Did> = query 729 + .did 730 + .iter() 731 + .map(|d| d.trim()) 732 + .filter(|d| !d.is_empty()) 733 + .map(Did::from) 734 + .collect(); 735 + 736 + let filter_other_subjects: HashSet<String> = HashSet::from_iter( 737 + query 738 + .other_subject 739 + .iter() 740 + .map(|s| s.trim().to_string()) 741 + .filter(|s| !s.is_empty()), 742 + ); 743 + 744 + let Some((collection, path)) = query.source.split_once(':') else { 745 + return Err(http::StatusCode::BAD_REQUEST); 746 + }; 747 + let path = format!(".{path}"); 748 + 749 + let path_to_other = format!(".{}", query.path_to_other); 750 + 751 + let paged = store 752 + .get_many_to_many( 753 + &query.subject, 754 + collection, 755 + &path, 756 + &path_to_other, 757 + limit, 758 + after, 759 + &filter_dids, 760 + &filter_other_subjects, 761 + ) 762 + .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 763 + 764 + let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 765 + 766 + let items: Vec<ManyToManyItem> = paged 767 + .items 768 + .into_iter() 769 + .map(|(record_id, subject)| ManyToManyItem { 770 + link: record_id, 771 + subject, 772 + }) 773 + .collect(); 774 + 775 + Ok(acceptable( 776 + accept, 777 + GetManyToManyItemsResponse { 778 + items, 664 779 cursor, 665 780 query: (*query).clone(), 666 781 },
+114 -3
constellation/src/storage/mem_store.rs
··· 1 1 use super::{ 2 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 3 }; 4 + use crate::storage::CompositeCursor; 4 5 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 - use anyhow::Result; 6 + 7 + use anyhow::{anyhow, Result}; 6 8 use links::CollectedLink; 9 + 7 10 use std::collections::{HashMap, HashSet}; 8 11 use std::sync::{Arc, Mutex}; 9 12 ··· 197 200 items = items 198 201 .into_iter() 199 202 .skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false)) 200 - .take(limit as usize) 203 + .take(limit as usize + 1) 201 204 .collect(); 202 - let next = if items.len() as u64 >= limit { 205 + let next = if items.len() as u64 > limit { 206 + items.truncate(limit as usize); 203 207 items.last().map(|(t, _, _)| t.clone()) 204 208 } else { 205 209 None ··· 232 236 .map(|(did, _)| did) 233 237 .collect::<HashSet<_>>() 234 238 .len() as u64) 239 + } 240 + 241 + fn get_many_to_many( 242 + &self, 243 + target: &str, 244 + collection: &str, 245 + path: &str, 246 + path_to_other: &str, 247 + limit: u64, 248 + after: Option<String>, 249 + filter_dids: &HashSet<Did>, 250 + filter_targets: &HashSet<String>, 251 + ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 252 + // setup variables that we need later 253 + let path_to_other = RecordPath(path_to_other.to_string()); 254 + let filter_targets: HashSet<Target> = 255 + HashSet::from_iter(filter_targets.iter().map(|s| Target::new(s))); 256 + 257 + // extract parts form composite cursor 258 + let cursor = match after { 259 + Some(a) => { 260 + let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 261 + let b = b 262 + .parse::<u64>() 263 + .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 264 + let f = f 265 + .parse::<u64>() 266 + .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 267 + Some(CompositeCursor { 268 + backward: b, 269 + forward: f, 270 + }) 271 + } 272 + None => None, 273 + }; 274 + 275 + let data = self.0.lock().unwrap(); 276 + let Some(sources) = data.targets.get(&Target::new(target)) else { 277 + return Ok(PagedOrderedCollection::empty()); 278 + }; 279 + let Some(linkers) = sources.get(&Source::new(collection, path)) else { 280 + return Ok(PagedOrderedCollection::empty()); 281 + }; 282 + 283 + let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 284 + 285 + // iterate backwards (who linked to the target?) 286 + for (linker_idx, (did, rkey)) in linkers 287 + .iter() 288 + .enumerate() 289 + .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v))) 290 + .skip_while(|(linker_idx, _)| cursor.is_some_and(|c| *linker_idx < c.backward as usize)) 291 + .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(&did)) 292 + { 293 + let Some(links) = data.links.get(&did).and_then(|m| { 294 + m.get(&RepoId { 295 + collection: collection.to_string(), 296 + rkey: rkey.clone(), 297 + }) 298 + }) else { 299 + continue; 300 + }; 301 + 302 + // iterate forward (which of these links point to the __other__ target?) 303 + for (link_idx, (_, fwd_target)) in links 304 + .iter() 305 + .enumerate() 306 + .filter(|(_, (p, t))| { 307 + *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t)) 308 + }) 309 + .skip_while(|(link_idx, _)| { 310 + cursor.is_some_and(|c| { 311 + linker_idx == c.backward as usize && *link_idx <= c.forward as usize 312 + }) 313 + }) 314 + .take(limit as usize + 1 - items.len()) 315 + { 316 + items.push(( 317 + linker_idx, 318 + link_idx, 319 + RecordId { 320 + did: did.clone(), 321 + collection: collection.to_string(), 322 + rkey: rkey.0.clone(), 323 + }, 324 + fwd_target.0.clone(), 325 + )); 326 + } 327 + 328 + // page full - eject 329 + if items.len() > limit as usize { 330 + break; 331 + } 332 + } 333 + 334 + let next = (items.len() > limit as usize).then(|| { 335 + let (l, f, _, _) = items[limit as usize - 1]; 336 + format!("{l},{f}") 337 + }); 338 + 339 + let items = items 340 + .into_iter() 341 + .take(limit as usize) 342 + .map(|(_, _, rid, t)| (rid, t)) 343 + .collect(); 344 + 345 + Ok(PagedOrderedCollection { items, next }) 235 346 } 236 347 237 348 fn get_links(
+468
constellation/src/storage/mod.rs
··· 39 39 } 40 40 } 41 41 42 + // get-many-to-many composite cursor 43 + #[derive(Copy, Clone, Debug)] 44 + struct CompositeCursor { 45 + backward: u64, 46 + forward: u64, 47 + } 48 + 42 49 /// A paged collection whose keys are sorted instead of indexed 43 50 /// 44 51 /// this has weaker guarantees than PagedAppendingCollection: it might ··· 134 141 135 142 fn get_all_record_counts(&self, _target: &str) 136 143 -> Result<HashMap<String, HashMap<String, u64>>>; 144 + 145 + fn get_many_to_many( 146 + &self, 147 + target: &str, 148 + collection: &str, 149 + path: &str, 150 + path_to_other: &str, 151 + limit: u64, 152 + after: Option<String>, 153 + filter_dids: &HashSet<Did>, 154 + filter_to_targets: &HashSet<String>, 155 + ) -> Result<PagedOrderedCollection<(RecordId, String), String>>; 137 156 138 157 fn get_all_counts( 139 158 &self, ··· 1669 1688 items: vec![("b.com".to_string(), 2, 2),], 1670 1689 next: None, 1671 1690 } 1691 + ); 1692 + 1693 + // Pagination edge cases: we have 2 grouped results (b.com and c.com) 1694 + 1695 + // Case 1: limit > items (limit=10, items=2) -> next should be None 1696 + let result = storage.get_many_to_many_counts( 1697 + "a.com", 1698 + "app.t.c", 1699 + ".abc.uri", 1700 + ".def.uri", 1701 + 10, 1702 + None, 1703 + &HashSet::new(), 1704 + &HashSet::new(), 1705 + )?; 1706 + assert_eq!(result.items.len(), 2); 1707 + assert_eq!(result.next, None, "next should be None when items < limit"); 1708 + 1709 + // Case 2: limit == items (limit=2, items=2) -> next should be None 1710 + let result = storage.get_many_to_many_counts( 1711 + "a.com", 1712 + "app.t.c", 1713 + ".abc.uri", 1714 + ".def.uri", 1715 + 2, 1716 + None, 1717 + &HashSet::new(), 1718 + &HashSet::new(), 1719 + )?; 1720 + assert_eq!(result.items.len(), 2); 1721 + assert_eq!( 1722 + result.next, None, 1723 + "next should be None when items == limit (no more pages)" 1724 + ); 1725 + 1726 + // Case 3: limit < items (limit=1, items=2) -> next should be Some 1727 + let result = storage.get_many_to_many_counts( 1728 + "a.com", 1729 + "app.t.c", 1730 + ".abc.uri", 1731 + ".def.uri", 1732 + 1, 1733 + None, 1734 + &HashSet::new(), 1735 + &HashSet::new(), 1736 + )?; 1737 + assert_eq!(result.items.len(), 1); 1738 + assert!( 1739 + result.next.is_some(), 1740 + "next should be Some when items > limit" 1741 + ); 1742 + 1743 + // Verify second page returns remaining item with no cursor 1744 + let result2 = storage.get_many_to_many_counts( 1745 + "a.com", 1746 + "app.t.c", 1747 + ".abc.uri", 1748 + ".def.uri", 1749 + 1, 1750 + result.next, 1751 + &HashSet::new(), 1752 + &HashSet::new(), 1753 + )?; 1754 + assert_eq!(result2.items.len(), 1); 1755 + assert_eq!(result2.next, None, "next should be None on final page"); 1756 + }); 1757 + 1758 + test_each_storage!(get_m2m_empty, |storage| { 1759 + assert_eq!( 1760 + storage.get_many_to_many( 1761 + "a.com", 1762 + "a.b.c", 1763 + ".d.e", 1764 + ".f.g", 1765 + 10, 1766 + None, 1767 + &HashSet::new(), 1768 + &HashSet::new(), 1769 + )?, 1770 + PagedOrderedCollection { 1771 + items: vec![], 1772 + next: None, 1773 + } 1774 + ); 1775 + }); 1776 + 1777 + test_each_storage!(get_m2m_single, |storage| { 1778 + // One record linking to a.com (backward), with two forward links at 1779 + // the same path_to_other (.def.uri) pointing to b.com and c.com. 1780 + // Both forward targets must appear in the output. 1781 + storage.push( 1782 + &ActionableEvent::CreateLinks { 1783 + record_id: RecordId { 1784 + did: "did:plc:asdf".into(), 1785 + collection: "app.t.c".into(), 1786 + rkey: "asdf".into(), 1787 + }, 1788 + links: vec![ 1789 + CollectedLink { 1790 + target: Link::Uri("a.com".into()), 1791 + path: ".abc.uri".into(), 1792 + }, 1793 + CollectedLink { 1794 + target: Link::Uri("b.com".into()), 1795 + path: ".def.uri".into(), 1796 + }, 1797 + CollectedLink { 1798 + target: Link::Uri("c.com".into()), 1799 + path: ".def.uri".into(), 1800 + }, 1801 + ], 1802 + }, 1803 + 0, 1804 + )?; 1805 + let result = storage.get_many_to_many( 1806 + "a.com", 1807 + "app.t.c", 1808 + ".abc.uri", 1809 + ".def.uri", 1810 + 10, 1811 + None, 1812 + &HashSet::new(), 1813 + &HashSet::new(), 1814 + )?; 1815 + assert_eq!( 1816 + result.items.len(), 1817 + 2, 1818 + "both forward links at path_to_other should be emitted" 1819 + ); 1820 + let mut targets: Vec<_> = result.items.iter().map(|(_, t)| t.as_str()).collect(); 1821 + targets.sort(); 1822 + assert_eq!(targets, vec!["b.com", "c.com"]); 1823 + assert!(result 1824 + .items 1825 + .iter() 1826 + .all(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1827 + assert_eq!(result.next, None); 1828 + }); 1829 + 1830 + test_each_storage!(get_m2m_filters, |storage| { 1831 + storage.push( 1832 + &ActionableEvent::CreateLinks { 1833 + record_id: RecordId { 1834 + did: "did:plc:asdf".into(), 1835 + collection: "app.t.c".into(), 1836 + rkey: "asdf".into(), 1837 + }, 1838 + links: vec![ 1839 + CollectedLink { 1840 + target: Link::Uri("a.com".into()), 1841 + path: ".abc.uri".into(), 1842 + }, 1843 + CollectedLink { 1844 + target: Link::Uri("b.com".into()), 1845 + path: ".def.uri".into(), 1846 + }, 1847 + ], 1848 + }, 1849 + 0, 1850 + )?; 1851 + storage.push( 1852 + &ActionableEvent::CreateLinks { 1853 + record_id: RecordId { 1854 + did: "did:plc:asdf".into(), 1855 + collection: "app.t.c".into(), 1856 + rkey: "asdf2".into(), 1857 + }, 1858 + links: vec![ 1859 + CollectedLink { 1860 + target: Link::Uri("a.com".into()), 1861 + path: ".abc.uri".into(), 1862 + }, 1863 + CollectedLink { 1864 + target: Link::Uri("b.com".into()), 1865 + path: ".def.uri".into(), 1866 + }, 1867 + ], 1868 + }, 1869 + 1, 1870 + )?; 1871 + storage.push( 1872 + &ActionableEvent::CreateLinks { 1873 + record_id: RecordId { 1874 + did: "did:plc:fdsa".into(), 1875 + collection: "app.t.c".into(), 1876 + rkey: "fdsa".into(), 1877 + }, 1878 + links: vec![ 1879 + CollectedLink { 1880 + target: Link::Uri("a.com".into()), 1881 + path: ".abc.uri".into(), 1882 + }, 1883 + CollectedLink { 1884 + target: Link::Uri("c.com".into()), 1885 + path: ".def.uri".into(), 1886 + }, 1887 + ], 1888 + }, 1889 + 2, 1890 + )?; 1891 + storage.push( 1892 + &ActionableEvent::CreateLinks { 1893 + record_id: RecordId { 1894 + did: "did:plc:fdsa".into(), 1895 + collection: "app.t.c".into(), 1896 + rkey: "fdsa2".into(), 1897 + }, 1898 + links: vec![ 1899 + CollectedLink { 1900 + target: Link::Uri("a.com".into()), 1901 + path: ".abc.uri".into(), 1902 + }, 1903 + CollectedLink { 1904 + target: Link::Uri("c.com".into()), 1905 + path: ".def.uri".into(), 1906 + }, 1907 + ], 1908 + }, 1909 + 3, 1910 + )?; 1911 + 1912 + // Test without filters - should get all records as flat items 1913 + let result = storage.get_many_to_many( 1914 + "a.com", 1915 + "app.t.c", 1916 + ".abc.uri", 1917 + ".def.uri", 1918 + 10, 1919 + None, 1920 + &HashSet::new(), 1921 + &HashSet::new(), 1922 + )?; 1923 + assert_eq!(result.items.len(), 4); 1924 + assert_eq!(result.next, None); 1925 + // Check b.com items 1926 + let b_items: Vec<_> = result 1927 + .items 1928 + .iter() 1929 + .filter(|(_, subject)| subject == "b.com") 1930 + .collect(); 1931 + assert_eq!(b_items.len(), 2); 1932 + assert!(b_items 1933 + .iter() 1934 + .any(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1935 + assert!(b_items 1936 + .iter() 1937 + .any(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf2")); 1938 + // Check c.com items 1939 + let c_items: Vec<_> = result 1940 + .items 1941 + .iter() 1942 + .filter(|(_, subject)| subject == "c.com") 1943 + .collect(); 1944 + assert_eq!(c_items.len(), 2); 1945 + assert!(c_items 1946 + .iter() 1947 + .any(|(r, _)| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa")); 1948 + assert!(c_items 1949 + .iter() 1950 + .any(|(r, _)| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa2")); 1951 + 1952 + // Test with DID filter - should only get records from did:plc:fdsa 1953 + let result = storage.get_many_to_many( 1954 + "a.com", 1955 + "app.t.c", 1956 + ".abc.uri", 1957 + ".def.uri", 1958 + 10, 1959 + None, 1960 + &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), 1961 + &HashSet::new(), 1962 + )?; 1963 + assert_eq!(result.items.len(), 2); 1964 + assert!(result.items.iter().all(|(_, subject)| subject == "c.com")); 1965 + assert!(result.items.iter().all(|(r, _)| r.did.0 == "did:plc:fdsa")); 1966 + 1967 + // Test with target filter - should only get records linking to b.com 1968 + let result = storage.get_many_to_many( 1969 + "a.com", 1970 + "app.t.c", 1971 + ".abc.uri", 1972 + ".def.uri", 1973 + 10, 1974 + None, 1975 + &HashSet::new(), 1976 + &HashSet::from_iter(["b.com".to_string()]), 1977 + )?; 1978 + assert_eq!(result.items.len(), 2); 1979 + assert!(result.items.iter().all(|(_, subject)| subject == "b.com")); 1980 + assert!(result.items.iter().all(|(r, _)| r.did.0 == "did:plc:asdf")); 1981 + 1982 + // Pagination edge cases: we have 4 flat items 1983 + 1984 + // Case 1: limit > items (limit=10, items=4) -> next should be None 1985 + let result = storage.get_many_to_many( 1986 + "a.com", 1987 + "app.t.c", 1988 + ".abc.uri", 1989 + ".def.uri", 1990 + 10, 1991 + None, 1992 + &HashSet::new(), 1993 + &HashSet::new(), 1994 + )?; 1995 + assert_eq!(result.items.len(), 4); 1996 + assert_eq!(result.next, None, "next should be None when items < limit"); 1997 + 1998 + // Case 2: limit == items (limit=4, items=4) -> next should be None 1999 + let result = storage.get_many_to_many( 2000 + "a.com", 2001 + "app.t.c", 2002 + ".abc.uri", 2003 + ".def.uri", 2004 + 4, 2005 + None, 2006 + &HashSet::new(), 2007 + &HashSet::new(), 2008 + )?; 2009 + assert_eq!(result.items.len(), 4); 2010 + assert_eq!( 2011 + result.next, None, 2012 + "next should be None when items == limit (no more pages)" 2013 + ); 2014 + 2015 + // Case 3: limit < items (limit=3, items=4) -> next should be Some 2016 + let result = storage.get_many_to_many( 2017 + "a.com", 2018 + "app.t.c", 2019 + ".abc.uri", 2020 + ".def.uri", 2021 + 3, 2022 + None, 2023 + &HashSet::new(), 2024 + &HashSet::new(), 2025 + )?; 2026 + assert_eq!(result.items.len(), 3); 2027 + assert!( 2028 + result.next.is_some(), 2029 + "next should be Some when items > limit" 2030 + ); 2031 + 2032 + // Verify second page returns remaining item with no cursor. 2033 + // This now works correctly because we use a composite cursor that includes 2034 + // (target, did, rkey), allowing pagination even when multiple records share 2035 + // the same target string. 2036 + let result2 = storage.get_many_to_many( 2037 + "a.com", 2038 + "app.t.c", 2039 + ".abc.uri", 2040 + ".def.uri", 2041 + 3, 2042 + result.next, 2043 + &HashSet::new(), 2044 + &HashSet::new(), 2045 + )?; 2046 + assert_eq!( 2047 + result2.items.len(), 2048 + 1, 2049 + "second page should have 1 remaining item" 2050 + ); 2051 + assert_eq!(result2.next, None, "next should be None on final page"); 2052 + 2053 + // Verify we got all 4 unique items across both pages (no duplicates, no gaps) 2054 + let mut all_rkeys: Vec<_> = result.items.iter().map(|(r, _)| r.rkey.clone()).collect(); 2055 + all_rkeys.extend(result2.items.iter().map(|(r, _)| r.rkey.clone())); 2056 + all_rkeys.sort(); 2057 + assert_eq!( 2058 + all_rkeys, 2059 + vec!["asdf", "asdf2", "fdsa", "fdsa2"], 2060 + "should have all 4 records across both pages" 2061 + ); 2062 + }); 2063 + 2064 + // Pagination that splits across forward links within a single backlinker. 2065 + // The cursor should correctly resume mid-record on the next page. 2066 + test_each_storage!(get_m2m_paginate_within_forward_links, |storage| { 2067 + // Record with 1 backward link and 3 forward links at the same path 2068 + storage.push( 2069 + &ActionableEvent::CreateLinks { 2070 + record_id: RecordId { 2071 + did: "did:plc:lister".into(), 2072 + collection: "app.t.c".into(), 2073 + rkey: "list1".into(), 2074 + }, 2075 + links: vec![ 2076 + CollectedLink { 2077 + target: Link::Uri("a.com".into()), 2078 + path: ".subject.uri".into(), 2079 + }, 2080 + CollectedLink { 2081 + target: Link::Uri("x.com".into()), 2082 + path: ".items[].uri".into(), 2083 + }, 2084 + CollectedLink { 2085 + target: Link::Uri("y.com".into()), 2086 + path: ".items[].uri".into(), 2087 + }, 2088 + CollectedLink { 2089 + target: Link::Uri("z.com".into()), 2090 + path: ".items[].uri".into(), 2091 + }, 2092 + ], 2093 + }, 2094 + 0, 2095 + )?; 2096 + 2097 + // Page 1: limit=2, should get 2 of the 3 forward links 2098 + let page1 = storage.get_many_to_many( 2099 + "a.com", 2100 + "app.t.c", 2101 + ".subject.uri", 2102 + ".items[].uri", 2103 + 2, 2104 + None, 2105 + &HashSet::new(), 2106 + &HashSet::new(), 2107 + )?; 2108 + assert_eq!(page1.items.len(), 2, "first page should have 2 items"); 2109 + assert!( 2110 + page1.next.is_some(), 2111 + "should have a next cursor for remaining item" 2112 + ); 2113 + 2114 + // Page 2: should get the remaining 1 forward link 2115 + let page2 = storage.get_many_to_many( 2116 + "a.com", 2117 + "app.t.c", 2118 + ".subject.uri", 2119 + ".items[].uri", 2120 + 2, 2121 + page1.next, 2122 + &HashSet::new(), 2123 + &HashSet::new(), 2124 + )?; 2125 + assert_eq!(page2.items.len(), 1, "second page should have 1 item"); 2126 + assert_eq!(page2.next, None, "no more pages"); 2127 + 2128 + // Verify all 3 targets appear across pages with no duplicates 2129 + let mut all_targets: Vec<_> = page1 2130 + .items 2131 + .iter() 2132 + .chain(page2.items.iter()) 2133 + .map(|(_, t)| t.clone()) 2134 + .collect(); 2135 + all_targets.sort(); 2136 + assert_eq!( 2137 + all_targets, 2138 + vec!["x.com", "y.com", "z.com"], 2139 + "all forward targets should appear exactly once across pages" 1672 2140 ); 1673 2141 }); 1674 2142 }
+181 -15
constellation/src/storage/rocks_store.rs
··· 2 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 3 PagedOrderedCollection, StorageStats, 4 4 }; 5 + use crate::storage::CompositeCursor; 5 6 use crate::{CountsByCount, Did, RecordId}; 6 - use anyhow::{bail, Result}; 7 + 8 + use anyhow::{anyhow, bail, Result}; 7 9 use bincode::Options as BincodeOptions; 8 10 use links::CollectedLink; 9 11 use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; ··· 14 16 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 15 17 }; 16 18 use serde::{Deserialize, Serialize}; 19 + use tokio_util::sync::CancellationToken; 20 + 17 21 use std::collections::{BTreeMap, HashMap, HashSet}; 18 22 use std::io::Read; 19 23 use std::marker::PhantomData; ··· 24 28 }; 25 29 use std::thread; 26 30 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 27 - use tokio_util::sync::CancellationToken; 28 31 29 32 static DID_IDS_CF: &str = "did_ids"; 30 33 static TARGET_IDS_CF: &str = "target_ids"; ··· 1032 1035 1033 1036 // aand we can skip target ids that must be on future pages 1034 1037 // (this check continues after the did-lookup, which we have to do) 1035 - let page_is_full = grouped_counts.len() as u64 >= limit; 1038 + let page_is_full = grouped_counts.len() as u64 > limit; 1036 1039 if page_is_full { 1037 - let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh 1040 + let current_max = grouped_counts.keys().next_back().unwrap(); 1038 1041 if fwd_target > *current_max { 1039 1042 continue; 1040 1043 } ··· 1070 1073 } 1071 1074 } 1072 1075 1076 + // If we accumulated more than limit groups, there's another page. 1077 + // Pop the extra before building items so it doesn't appear in results. 1078 + let next = if grouped_counts.len() as u64 > limit { 1079 + grouped_counts.pop_last(); 1080 + grouped_counts 1081 + .keys() 1082 + .next_back() 1083 + .map(|k| format!("{}", k.0)) 1084 + } else { 1085 + None 1086 + }; 1087 + 1073 1088 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len()); 1074 1089 for (target_id, (n, dids)) in &grouped_counts { 1075 1090 let Some(target) = self ··· 1082 1097 items.push((target.0 .0, *n, dids.len() as u64)); 1083 1098 } 1084 1099 1085 - let next = if grouped_counts.len() as u64 >= limit { 1086 - // yeah.... it's a number saved as a string......sorry 1087 - grouped_counts 1088 - .keys() 1089 - .next_back() 1090 - .map(|k| format!("{}", k.0)) 1091 - } else { 1092 - None 1093 - }; 1094 - 1095 1100 Ok(PagedOrderedCollection { items, next }) 1096 1101 } 1097 1102 ··· 1120 1125 } else { 1121 1126 Ok(0) 1122 1127 } 1128 + } 1129 + 1130 + fn get_many_to_many( 1131 + &self, 1132 + target: &str, 1133 + collection: &str, 1134 + path: &str, 1135 + path_to_other: &str, 1136 + limit: u64, 1137 + after: Option<String>, 1138 + filter_dids: &HashSet<Did>, 1139 + filter_to_targets: &HashSet<String>, 1140 + ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1141 + // helper to resolve dids 1142 + let resolve_active_did = |did_id: &DidId| -> Result<Option<Did>> { 1143 + let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1144 + eprintln!("failed to look up did from did_id {did_id:?}"); 1145 + return Ok(None); 1146 + }; 1147 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1148 + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1149 + return Ok(None); 1150 + }; 1151 + Ok(active.then_some(did)) 1152 + }; 1153 + 1154 + // setup variables that we need later 1155 + let collection = Collection(collection.to_string()); 1156 + let path = RPath(path.to_string()); 1157 + 1158 + // extract parts form composite cursor 1159 + let cursor = match after { 1160 + Some(a) => { 1161 + let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 1162 + let b = b 1163 + .parse::<u64>() 1164 + .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 1165 + let f = f 1166 + .parse::<u64>() 1167 + .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 1168 + Some(CompositeCursor { 1169 + backward: b, 1170 + forward: f, 1171 + }) 1172 + } 1173 + None => None, 1174 + }; 1175 + 1176 + eprintln!("cursor: {:#?}", cursor); 1177 + 1178 + // (__active__) did ids and filter targets 1179 + let filter_did_ids: HashMap<DidId, bool> = filter_dids 1180 + .iter() 1181 + .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 1182 + .collect::<Result<Vec<DidIdValue>>>()? 1183 + .into_iter() 1184 + .map(|DidIdValue(id, active)| (id, active)) 1185 + .collect(); 1186 + let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 1187 + for t in filter_to_targets { 1188 + for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { 1189 + filter_to_target_ids.insert(target_id); 1190 + } 1191 + } 1192 + 1193 + let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1194 + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1195 + eprintln!("Target not found for {target_key:?}"); 1196 + return Ok(PagedOrderedCollection::empty()); 1197 + }; 1198 + let linkers = self.get_target_linkers(&target_id)?; 1199 + 1200 + let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 1201 + 1202 + // iterate backwards (who linked to the target?) 1203 + for (linker_idx, (did_id, rkey)) in 1204 + linkers.0.iter().enumerate().skip_while(|(linker_idx, _)| { 1205 + cursor.is_some_and(|c| *linker_idx < c.backward as usize) 1206 + }) 1207 + { 1208 + if did_id.is_empty() 1209 + || (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id)) 1210 + { 1211 + continue; 1212 + } 1213 + 1214 + let Some(links) = self.get_record_link_targets(&RecordLinkKey( 1215 + *did_id, 1216 + collection.clone(), 1217 + rkey.clone(), 1218 + ))? 1219 + else { 1220 + continue; 1221 + }; 1222 + 1223 + // iterate forward (which of these links point to the __other__ target?) 1224 + for (link_idx, RecordLinkTarget(_, fwd_target_id)) in links 1225 + .0 1226 + .into_iter() 1227 + .enumerate() 1228 + .filter(|(_, RecordLinkTarget(rpath, target_id))| { 1229 + eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0); 1230 + rpath.0 == path_to_other 1231 + && (filter_to_target_ids.is_empty() 1232 + || filter_to_target_ids.contains(target_id)) 1233 + }) 1234 + .skip_while(|(link_idx, _)| { 1235 + cursor.is_some_and(|c| { 1236 + linker_idx == c.backward as usize && *link_idx <= c.forward as usize 1237 + }) 1238 + }) 1239 + .take(limit as usize + 1 - items.len()) 1240 + { 1241 + // extract forward target did (target that links to the __other__ target) 1242 + let Some(did) = resolve_active_did(did_id)? else { 1243 + continue; 1244 + }; 1245 + // resolve to target string 1246 + let Some(fwd_target_key) = self 1247 + .target_id_table 1248 + .get_val_from_id(&self.db, fwd_target_id.0)? 1249 + else { 1250 + continue; 1251 + }; 1252 + 1253 + // link to be added 1254 + let record_id = RecordId { 1255 + did, 1256 + collection: collection.0.clone(), 1257 + rkey: rkey.0.clone(), 1258 + }; 1259 + items.push((linker_idx, link_idx, record_id, fwd_target_key.0 .0)); 1260 + } 1261 + 1262 + // page full - eject 1263 + if items.len() > limit as usize { 1264 + break; 1265 + } 1266 + } 1267 + 1268 + // We collect up to limit + 1 fully-resolved items. If we got more than 1269 + // limit, there are more results beyond this page. We truncate to limit 1270 + // items (the actual page) and build a composite cursor from the last 1271 + // item on the page โ€” a base64-encoded pair of (backlink_vec_idx, 1272 + // forward_link_idx). On the next request, skip_while advances past 1273 + // this position: backlinks before backlink_vec_idx are skipped entirely, 1274 + // and at backlink_vec_idx itself, forward links at or before 1275 + // forward_link_idx are skipped. This correctly resumes mid-record when 1276 + // a single backlinker has multiple forward links at path_to_other. 1277 + let next = (items.len() > limit as usize).then(|| { 1278 + let (l, f, _, _) = items[limit as usize - 1]; 1279 + format!("{l},{f}") 1280 + }); 1281 + 1282 + let items = items 1283 + .into_iter() 1284 + .take(limit as usize) 1285 + .map(|(_, _, rid, t)| (rid, t)) 1286 + .collect(); 1287 + 1288 + Ok(PagedOrderedCollection { items, next }) 1123 1289 } 1124 1290 1125 1291 fn get_links( ··· 1460 1626 } 1461 1627 1462 1628 // target ids 1463 - #[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1629 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1464 1630 struct TargetId(u64); // key 1465 1631 1466 1632 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
+58
constellation/templates/get-many-to-many.html.j2
··· 1 + {% extends "base.html.j2" %} 2 + {% import "try-it-macros.html.j2" as try_it %} 3 + 4 + {% block title %}Many-to-Many Links{% endblock %} 5 + {% block description %}All {{ query.source }} records with many-to-many links to {{ query.subject }} joining through {{ query.path_to_other }}{% endblock %} 6 + 7 + {% block content %} 8 + 9 + {% call try_it::get_many_to_many(query.subject, query.source, query.path_to_other, query.did, query.other_subject, query.limit) %} 10 + 11 + <h2> 12 + Many-to-many links to <code>{{ query.subject }}</code> 13 + {% if let Some(browseable_uri) = query.subject|to_browseable %} 14 + <small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small> 15 + {% endif %} 16 + </h2> 17 + 18 + <p><strong>Many-to-many links</strong> from <code>{{ query.source }}</code> joining through <code>{{ query.path_to_other }}</code></p> 19 + 20 + <ul> 21 + <li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li> 22 + </ul> 23 + 24 + <h3>Many-to-many links, most recent first:</h3> 25 + 26 + {% for item in items %} 27 + <pre style="display: block; margin: 1em 2em" class="code"><strong>Subject</strong>: <a href="/links/all?target={{ item.subject|urlencode }}">{{ item.subject }}</a> 28 + <strong>DID</strong>: {{ item.link.did().0 }} 29 + <strong>Collection</strong>: {{ item.link.collection }} 30 + <strong>RKey</strong>: {{ item.link.rkey }} 31 + -> <a href="https://pdsls.dev/at://{{ item.link.did().0 }}/{{ item.link.collection }}/{{ item.link.rkey }}">browse record</a></pre> 32 + {% endfor %} 33 + 34 + {% if let Some(c) = cursor %} 35 + <form method="get" action="/xrpc/blue.microcosm.links.getManyToMany"> 36 + <input type="hidden" name="subject" value="{{ query.subject }}" /> 37 + <input type="hidden" name="source" value="{{ query.source }}" /> 38 + <input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" /> 39 + {% for did in query.did %} 40 + <input type="hidden" name="did" value="{{ did }}" /> 41 + {% endfor %} 42 + {% for other in query.other_subject %} 43 + <input type="hidden" name="otherSubject" value="{{ other }}" /> 44 + {% endfor %} 45 + <input type="hidden" name="limit" value="{{ query.limit }}" /> 46 + <input type="hidden" name="cursor" value={{ c|json|safe }} /> 47 + <button type="submit">next page&hellip;</button> 48 + </form> 49 + {% else %} 50 + <button disabled><em>end of results</em></button> 51 + {% endif %} 52 + 53 + <details> 54 + <summary>Raw JSON response</summary> 55 + <pre class="code">{{ self|tojson }}</pre> 56 + </details> 57 + 58 + {% endblock %}
+19
constellation/templates/hello.html.j2
··· 83 83 ) %} 84 84 85 85 86 + <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToMany</code></h3> 87 + 88 + <p>A list of many-to-many join records linking to a target and a secondary target.</p> 89 + 90 + <h4>Query parameters:</h4> 91 + 92 + <ul> 93 + <li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li> 94 + <li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li> 95 + <li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li> 96 + <li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 97 + <li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple subjects. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li> 98 + <li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li> 99 + </ul> 100 + 101 + <p style="margin-bottom: 0"><strong>Try it:</strong></p> 102 + {% call try_it::get_many_to_many("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", "reply.parent.uri", [""], [""], 16) %} 103 + 104 + 86 105 <h3 class="route"><code>GET /links</code></h3> 87 106 88 107 <p>A list of records linking to a target.</p>
+30
constellation/templates/try-it-macros.html.j2
··· 68 68 </script> 69 69 {% endmacro %} 70 70 71 + {% macro get_many_to_many(subject, source, pathToOther, dids, otherSubjects, limit) %} 72 + <form method="get" action="/xrpc/blue.microcosm.links.getManyToMany"> 73 + <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToMany 74 + ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." /> 75 + &source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject" /> 76 + &pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing" /> 77 + {%- for did in dids %}{% if !did.is_empty() %} 78 + &did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %} 79 + <span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button> 80 + {%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %} 81 + &otherSubject= <input type="text" name="otherSubject" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %} 82 + <span id="m2m-other-placeholder"></span> <button id="m2m-add-other">+ other subject filter</button> 83 + &limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get many-to-many links</button></pre> 84 + </form> 85 + <script> 86 + const m2mAddDidButton = document.getElementById('m2m-add-did'); 87 + const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder'); 88 + m2mAddDidButton.addEventListener('click', e => { 89 + e.preventDefault(); 90 + const i = document.createElement('input'); 91 + i.placeholder = 'did:plc:...'; 92 + i.name = "did" 93 + const p = m2mAddDidButton.parentNode; 94 + p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder); 95 + p.insertBefore(i, m2mDidPlaceholder); 96 + p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder); 97 + }); 98 + </script> 99 + {% endmacro %} 100 + 71 101 {% macro links(target, collection, path, dids, limit) %} 72 102 <form method="get" action="/links"> 73 103 <pre class="code"><strong>GET</strong> /links
+104
lexicons/blue.microcosm/links/getManyToMany.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "blue.microcosm.links.getManyToMany", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "Get records that link to a primary subject along with the secondary subjects they also reference", 8 + "parameters": { 9 + "type": "params", 10 + "required": ["subject", "source", "pathToOther"], 11 + "properties": { 12 + "subject": { 13 + "type": "string", 14 + "format": "uri", 15 + "description": "the primary target being linked to (at-uri, did, or uri)" 16 + }, 17 + "source": { 18 + "type": "string", 19 + "description": "collection and path specification for the primary link (e.g., 'app.bsky.feed.like:subject.uri')" 20 + }, 21 + "pathToOther": { 22 + "type": "string", 23 + "description": "path to the secondary link in the many-to-many record (e.g., 'otherThing.uri')" 24 + }, 25 + "did": { 26 + "type": "array", 27 + "description": "filter links to those from specific users", 28 + "items": { 29 + "type": "string", 30 + "format": "did" 31 + } 32 + }, 33 + "otherSubject": { 34 + "type": "array", 35 + "description": "filter secondary links to specific subjects", 36 + "items": { 37 + "type": "string" 38 + } 39 + }, 40 + "limit": { 41 + "type": "integer", 42 + "minimum": 1, 43 + "maximum": 100, 44 + "default": 16, 45 + "description": "number of results to return" 46 + } 47 + } 48 + }, 49 + "output": { 50 + "encoding": "application/json", 51 + "schema": { 52 + "type": "object", 53 + "required": ["items"], 54 + "properties": { 55 + "items": { 56 + "type": "array", 57 + "items": { 58 + "type": "ref", 59 + "ref": "#item" 60 + } 61 + }, 62 + "cursor": { 63 + "type": "string" 64 + } 65 + } 66 + } 67 + } 68 + }, 69 + "item": { 70 + "type": "object", 71 + "required": ["link", "subject"], 72 + "properties": { 73 + "link": { 74 + "type": "ref", 75 + "ref": "#linkRecord" 76 + }, 77 + "subject": { 78 + "type": "string" 79 + } 80 + } 81 + }, 82 + "linkRecord": { 83 + "type": "object", 84 + "required": ["did", "collection", "rkey"], 85 + "description": "A record identifier consisting of a DID, collection, and record key", 86 + "properties": { 87 + "did": { 88 + "type": "string", 89 + "format": "did", 90 + "description": "the DID of the linking record's repository" 91 + }, 92 + "collection": { 93 + "type": "string", 94 + "format": "nsid", 95 + "description": "the collection of the linking record" 96 + }, 97 + "rkey": { 98 + "type": "string", 99 + "format": "record-key" 100 + } 101 + } 102 + } 103 + } 104 + }

History

8 rounds 13 comments
sign up or login to add to the discussion
11 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
Address feedback from fig
expand 0 comments
pull request successfully merged
10 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
expand 0 comments
8 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
expand 1 comment

Okay. I wrapped my head around the composite cursor you proposed and am working on refactoring both storage implementations towards that. I think I might re-submit another round tomorrow :)

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 3 comments

Found a bug in how we handle some of the pagination logic in cases where the number of items and the user selected limit are identical to very close too each other (already working on a fix)

thanks for the rebase! i tried to write things in the tiny text box but ended up needing to make a diagram: https://bsky.app/profile/did:plc:hdhoaan3xa3jiuq4fg4mefid/post/3mejuq44twc2t

key thing is that where the focus of getManyToManyCounts was the other subject (aggregation was against that, so grouping happened with it),

i think the focus of disagreggated many-to-many is on the linking records themselves

to me that takes me toward a few things

  • i don't think we should need to group the links by target (does the current code build up the full aggregation on every requested page? we should be able to avoid doing that)

  • i think the order of the response should actually be based on the linking record itself (since we have a row in the output), not the other subject, unlike with the aggregated/count version. this means you get eg. list items in order they were added instead of the order of the listed things being created. (i haven't fully wrapped my head around the grouping/ordering code here yet)

  • since any linking record can have a path_to_other with multiple links, i think a composite cursor could work here:

a 2-tuple of (backlink_vec_idx, forward_vec_idx).

for normal cases where the many-to-many record points to exactly one other subject, it would just be advancing backlink_vec_idx like normal backlinks

for cases where the many-to-many record actually has multiple foward links at the given path_to_other, the second part of the tuple would track progress through that list

i think that allows us to hold the necessary state between calls without needing to reconstruct too much in memory each time?

(also it's hard to write in this tiny tiny textbox and have a sense of whether what i'm saying makes sense)

Interesting approach! I have to think through this for a bit to be honest. Maybe I tried to follow the existing counts implementation too closely

Having said that, I added a new composite cursor to fix a couple of bugs that would arrive when hitting a couple of possible edge-cases in the pagination logic. This affects both the new get-many-to-many endpoint as well as the existing get-many-to-many-counts endpoint. As the changes are split over two distinct commits things should be straightforward to review.

Your assumption is still correct in the sense that we do indeed have to build up the aggregation again for every request. I have to double-check the get-backlinks endpoint to get a better sense of where you're going at.

Finally, I agree that the interface here doesn't necessarily make the whole thing easier to understand, unfortunately

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 2 comments

i think something got funky with a rebase or the way tangled is showing it -- some of my changes on main seem to be getting shown (reverted) in the diff.

i don't mind sorting it locally but will mostly get to it tomorrow, in case you want to see what's up before i do.

That's one on me, sorry! Rebased again on main and now everything seems fine

5 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
expand 5 comments

Rebased on main. As we discussed in the PR for the order query parameter, I didn't include this here as it's not a particular sensible fit.

i need to get into the code properly but my initial thought is that this endpoint should return a flat list of results, like

{
  "items": [
    {
      "link": { did, collection, rkey }, // the m2m link record
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "b.com"
    },
  ]
}

this will require a bit of tricks in the cursor to track pages across half-finished groups of links

(also this isn't an immediate change request, just getting it down for discussion!)

(and separately, i've also been wondering about moving more toward returning at-uris instead of broken-out did/collection/rkey objects. which isn't specifically about this PR, but if that happens then switching before releasing it is nice)

Hmm, I wonder how this would then work with the path_to_other parameter. Currently we have this nested grouping in order to show and disambiguate different relationships between different links.

For instance take the following query and it's results:

http://localhost:6789/xrpc/blue.microcosm.links.getManyToMany?subject=at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y&source=app.bsky.feed.post:reply.root.uri&pathToOther=reply.parent.uri&limit=16

This query asks: "Show me all posts in this thread, grouped by who they're responding to."

A flat list would just give us all the posts in the thread. The nested structure answers a richer question: who's talking to whom? Some posts are direct responses to the original article. Others are replies to other commenters, forming side conversations that branch off from the main thread.

The pathToOther grouping preserves that distinction. Without it, we'd lose the information about who's talking to whom.

{
  "linking_records": [
    {
      "subject": "at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y",
      "records": [
        {
          "did": "did:plc:lznqwrsbnyf6fdxohikqj6h3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd27pja7s2y"
        },
        {
          "did": "did:plc:uffx77au6hoauuuumkbuvqdr",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2tt5efc2a"
        },
        {
          "did": "did:plc:y7qyxzo7dns5m54dlq3youu3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2wtjxgc2d"
        },
        {
          "did": "did:plc:yaakslxyqydb76ybgkhrr4jk",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd35hyads22"
        },
        {
          "did": "did:plc:fia7w2kbnrdjwp6zvxywt7qv",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd37j3ldk2m"
        },
        {
          "did": "did:plc:xtecipifublblkomwau5x2ok",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd3dbtbz22n"
        },
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd42hpw7c2e"
        },
        {
          "did": "did:plc:fgquypfh32pewivn3bcmzseb",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd46jteoc2m"
        }
      ]
    },
    {
      "subject": "at://did:plc:3rhjxwwui6wwfokh4at3q2dl/app.bsky.feed.post/3mdczc7c4gk2i",
      "records": [
        {
          "did": "did:plc:3rhjxwwui6wwfokh4at3q2dl",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdczt7cwhk2i"
        }
      ]
    },
    {
      "subject": "at://did:plc:6buibzhkqr4vkqu75ezr2uv2/app.bsky.feed.post/3mdby25hbbk2v",
      "records": [
        {
          "did": "did:plc:fgeie2bmzlmx37iglj3xbzuj",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd26ulf4k2j"
        }
      ]
    },
    {
      "subject": "at://did:plc:lwgvv5oqh5stzb6dxa5d7z3n/app.bsky.feed.post/3mdcxqbkkfk2i",
      "records": [
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd45u56sk2e"
        }
      ]
    }
  ],
  "cursor": null
}

Correct me if I'm somehow wrong here!

Regarding returning at-uris: I think this might be a nice idea as users might be able to split these up when they feel the need to any way and it feels conceptually more complete. But, it might be easier to do this in a different PR over all existing XRPC endpoints. This would allow us to add this new endpoint already while working on the updated return values in the meantime. I'd like to avoid doing too much distinct stuff in one PR. :)

at-uris: totally fair, holding off for a follow-up.

flat list: i might have messed it up in my example but i think what i meant is actually equivalent to the grouped version: flattened, with the subject ("group by") included with every item in the flatted list.

clients can collect the flat list and group on subject to get back to your structured example, if they want.

my motivations are probably part sql-brain, part flat-list-enjoyer, and part cursor-related. i'm trying to disregard the first two, and i'm curious about your thoughts about how to handle the cursor:

with a flat list it's easy (from the client perspective at least) -- just keep chasing the cursor for as much of the data as you need. (cursors can happen in the middle of a subject)

with nested results grouped by subject it's less obvious to me. correct me if i'm wrong (need another block of time to actually get into the code) but i think the grouped item sub-list is unbounded size in the proposed code here? so cursors are only limiting the number of groups.

if we go with the grouped nested response, i think maybe we'd want something like:

  • a cursor at the end for fetching more groups, and
  • a cursor for each group-list that lets you fetch more items from just that group-list.

(i think this kind of nested paging is pretty neat!)

Interesting. Now that you mention it I feel I kinda get where you're going at!

I think the whole cursor thing, albeit possible for sure, is kinda creating more unnecessary complexity so I'll probably go with your suggestion.

It seems easier to create custom groupings on their own for most users (having more freedom is always great) and I think from an ergonomic perspective the two cursors might create more friction.

4 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
expand 1 comment

Added the missing lexicon entry for the new endpoint and changed the return type as well. Commented this wrongly at the other PR that I was working on. Sorry about that lol.

3 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
expand 1 comment

I think the existing get_many_to_many_counts handler and the new get_many_to_many handler are similar enough that we might extract the bulk of their logic in a shared piece of logic. Maybe a method that takes the existing identical function parameters and a new additional callback parameter (that handles what we do with found matches, i.e. calculate counts or join URIs) might be one way to go for it.

I am not too sure yet though if this is indeed the right thing to do as the new shared implementation might be a bit complicated. But given the strong similarities between the two I think it's worth at least considering.