Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Bikeshed: many-to-many small follow-up #16

merged opened by bad-example.com targeting main from m2m-bikeshed

the derives came in from base64 in tangled#7, whic was removed.

the cursor is a composite, but we might have other composite cursors that are different -- renaming this one to reflect its specific purpose.

Labels

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3mfk27yrzww22
+172 -122
Diff #4
+17 -2
constellation/src/lib.rs
··· 22 DeleteAccount(Did), 23 } 24 25 - #[derive(Debug, Hash, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] 26 pub struct Did(pub String); 27 28 impl<T: Into<String>> From<T> for Did { ··· 31 } 32 } 33 34 - #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)] 35 pub struct RecordId { 36 pub did: Did, 37 pub collection: String, ··· 48 pub fn rkey(&self) -> String { 49 self.rkey.clone() 50 } 51 } 52 53 /// maybe the worst type in this repo, and there are some bad types
··· 22 DeleteAccount(Did), 23 } 24 25 + #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] 26 pub struct Did(pub String); 27 28 impl<T: Into<String>> From<T> for Did { ··· 31 } 32 } 33 34 + #[derive(Debug, PartialEq, Serialize, Deserialize)] 35 pub struct RecordId { 36 pub did: Did, 37 pub collection: String, ··· 48 pub fn rkey(&self) -> String { 49 self.rkey.clone() 50 } 51 + pub fn uri(&self) -> String { 52 + let RecordId { 53 + did: Did(did), 54 + collection, 55 + rkey, 56 + } = self; 57 + format!("at://{did}/{collection}/{rkey}") 58 + } 59 + } 60 + 61 + #[derive(Debug, Serialize, PartialEq)] 62 + #[serde(rename_all = "camelCase")] 63 + pub struct ManyToManyItem { 64 + link_record: RecordId, 65 + other_subject: String, 66 } 67 68 /// maybe the worst type in this repo, and there are some bad types
+9 -18
constellation/src/server/mod.rs
··· 18 use tokio_util::sync::CancellationToken; 19 20 use crate::storage::{LinkReader, Order, StorageStats}; 21 - use crate::{CountsByCount, Did, RecordId}; 22 23 mod acceptable; 24 mod filters; ··· 292 /// path to the secondary link in the linking record 293 path_to_other: String, 294 /// filter to linking records (join of the m2m) by these DIDs 295 #[serde(default)] 296 did: Vec<String>, 297 /// filter to specific secondary records ··· 700 /// path to the secondary link in the linking record 701 path_to_other: String, 702 /// filter to linking records (join of the m2m) by these DIDs 703 #[serde(default)] 704 - did: Vec<String>, 705 /// filter to specific secondary records 706 #[serde(default)] 707 other_subject: Vec<String>, 708 cursor: Option<OpaqueApiCursor>, 709 #[serde(default = "get_default_cursor_limit")] 710 limit: u64, 711 - } 712 - #[derive(Debug, Serialize, Clone)] 713 - struct ManyToManyItem { 714 - link: RecordId, 715 - subject: String, 716 } 717 #[derive(Template, Serialize)] 718 #[template(path = "get-many-to-many.html.j2")] ··· 740 } 741 742 let filter_dids: HashSet<Did> = query 743 - .did 744 .iter() 745 .map(|d| d.trim()) 746 .filter(|d| !d.is_empty()) ··· 777 778 let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 779 780 - let items: Vec<ManyToManyItem> = paged 781 - .items 782 - .into_iter() 783 - .map(|(record_id, subject)| ManyToManyItem { 784 - link: record_id, 785 - subject, 786 - }) 787 - .collect(); 788 - 789 Ok(acceptable( 790 accept, 791 GetManyToManyItemsResponse { 792 - items, 793 cursor, 794 query: (*query).clone(), 795 },
··· 18 use tokio_util::sync::CancellationToken; 19 20 use crate::storage::{LinkReader, Order, StorageStats}; 21 + use crate::{CountsByCount, Did, ManyToManyItem, RecordId}; 22 23 mod acceptable; 24 mod filters; ··· 292 /// path to the secondary link in the linking record 293 path_to_other: String, 294 /// filter to linking records (join of the m2m) by these DIDs 295 + /// 296 + /// TODO: this should be called `link_did`, deprecate + add an alias 297 + /// TODO: should we have an `other_did` filter as well? 298 #[serde(default)] 299 did: Vec<String>, 300 /// filter to specific secondary records ··· 703 /// path to the secondary link in the linking record 704 path_to_other: String, 705 /// filter to linking records (join of the m2m) by these DIDs 706 + /// 707 + /// TODO: should we have an `other_did` filter as well? 708 #[serde(default)] 709 + link_did: Vec<String>, 710 /// filter to specific secondary records 711 #[serde(default)] 712 other_subject: Vec<String>, 713 cursor: Option<OpaqueApiCursor>, 714 #[serde(default = "get_default_cursor_limit")] 715 limit: u64, 716 } 717 #[derive(Template, Serialize)] 718 #[template(path = "get-many-to-many.html.j2")] ··· 740 } 741 742 let filter_dids: HashSet<Did> = query 743 + .link_did 744 .iter() 745 .map(|d| d.trim()) 746 .filter(|d| !d.is_empty()) ··· 777 778 let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 779 780 Ok(acceptable( 781 accept, 782 GetManyToManyItemsResponse { 783 + items: paged.items, 784 cursor, 785 query: (*query).clone(), 786 },
+28 -26
constellation/src/storage/mem_store.rs
··· 1 use super::{ 2 - LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 - use crate::storage::CompositeCursor; 5 - use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 6 7 use anyhow::{anyhow, Result}; 8 use links::CollectedLink; ··· 248 after: Option<String>, 249 filter_dids: &HashSet<Did>, 250 filter_targets: &HashSet<String>, 251 - ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 252 // setup variables that we need later 253 let path_to_other = RecordPath(path_to_other.to_string()); 254 let filter_targets: HashSet<Target> = ··· 257 // extract parts form composite cursor 258 let cursor = match after { 259 Some(a) => { 260 - let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 261 - let b = b 262 .parse::<u64>() 263 .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 264 - let f = f 265 .parse::<u64>() 266 .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 267 - Some(CompositeCursor { 268 - backward: b, 269 - forward: f, 270 }) 271 } 272 None => None, ··· 280 return Ok(PagedOrderedCollection::empty()); 281 }; 282 283 - let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 284 285 // iterate backwards (who linked to the target?) 286 - for (linker_idx, (did, rkey)) in linkers 287 .iter() 288 .enumerate() 289 .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v))) 290 - .skip_while(|(linker_idx, _)| cursor.is_some_and(|c| *linker_idx < c.backward as usize)) 291 .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(did)) 292 { 293 let Some(links) = data.links.get(did).and_then(|m| { ··· 299 continue; 300 }; 301 302 - // iterate forward (which of these links point to the __other__ target?) 303 - for (link_idx, (_, fwd_target)) in links 304 .iter() 305 .enumerate() 306 .filter(|(_, (p, t))| { 307 *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t)) 308 }) 309 - .skip_while(|(link_idx, _)| { 310 cursor.is_some_and(|c| { 311 - linker_idx == c.backward as usize && *link_idx <= c.forward as usize 312 }) 313 }) 314 .take(limit as usize + 1 - items.len()) 315 { 316 - items.push(( 317 - linker_idx, 318 - link_idx, 319 - RecordId { 320 did: did.clone(), 321 collection: collection.to_string(), 322 rkey: rkey.0.clone(), 323 }, 324 - fwd_target.0.clone(), 325 - )); 326 } 327 328 // page full - eject ··· 332 } 333 334 let next = (items.len() > limit as usize).then(|| { 335 - let (l, f, _, _) = items[limit as usize - 1]; 336 - format!("{l},{f}") 337 }); 338 339 let items = items 340 .into_iter() 341 .take(limit as usize) 342 - .map(|(_, _, rid, t)| (rid, t)) 343 .collect(); 344 345 Ok(PagedOrderedCollection { items, next })
··· 1 use super::{ 2 + LinkReader, LinkStorage, ManyToManyCursor, Order, PagedAppendingCollection, 3 + PagedOrderedCollection, StorageStats, 4 }; 5 + use crate::{ActionableEvent, CountsByCount, Did, ManyToManyItem, RecordId}; 6 7 use anyhow::{anyhow, Result}; 8 use links::CollectedLink; ··· 248 after: Option<String>, 249 filter_dids: &HashSet<Did>, 250 filter_targets: &HashSet<String>, 251 + ) -> Result<PagedOrderedCollection<ManyToManyItem, String>> { 252 // setup variables that we need later 253 let path_to_other = RecordPath(path_to_other.to_string()); 254 let filter_targets: HashSet<Target> = ··· 257 // extract parts form composite cursor 258 let cursor = match after { 259 Some(a) => { 260 + let (b, o) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 261 + let backlink_idx = b 262 .parse::<u64>() 263 .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 264 + let other_link_idx = o 265 .parse::<u64>() 266 .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 267 + Some(ManyToManyCursor { 268 + backlink_idx, 269 + other_link_idx, 270 }) 271 } 272 None => None, ··· 280 return Ok(PagedOrderedCollection::empty()); 281 }; 282 283 + let mut items: Vec<(usize, usize, ManyToManyItem)> = Vec::new(); 284 285 // iterate backwards (who linked to the target?) 286 + for (backlink_idx, (did, rkey)) in linkers 287 .iter() 288 .enumerate() 289 .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v))) 290 + .skip_while(|(backlink_idx, _)| { 291 + cursor.is_some_and(|c| *backlink_idx < c.backlink_idx as usize) 292 + }) 293 .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(did)) 294 { 295 let Some(links) = data.links.get(did).and_then(|m| { ··· 301 continue; 302 }; 303 304 + // iterate forward (which of these links point to the "other" target?) 305 + for (other_link_idx, (_, fwd_target)) in links 306 .iter() 307 .enumerate() 308 .filter(|(_, (p, t))| { 309 *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t)) 310 }) 311 + .skip_while(|(other_link_idx, _)| { 312 cursor.is_some_and(|c| { 313 + backlink_idx == c.backlink_idx as usize 314 + && *other_link_idx <= c.other_link_idx as usize 315 }) 316 }) 317 .take(limit as usize + 1 - items.len()) 318 { 319 + let item = ManyToManyItem { 320 + link_record: RecordId { 321 did: did.clone(), 322 collection: collection.to_string(), 323 rkey: rkey.0.clone(), 324 }, 325 + other_subject: fwd_target.0.clone(), 326 + }; 327 + items.push((backlink_idx, other_link_idx, item)); 328 } 329 330 // page full - eject ··· 334 } 335 336 let next = (items.len() > limit as usize).then(|| { 337 + let (b, o, _) = items[limit as usize - 1]; 338 + format!("{b},{o}") 339 }); 340 341 let items = items 342 .into_iter() 343 .take(limit as usize) 344 + .map(|(_, _, item)| item) 345 .collect(); 346 347 Ok(PagedOrderedCollection { items, next })
+53 -29
constellation/src/storage/mod.rs
··· 1 - use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 2 use anyhow::Result; 3 use serde::{Deserialize, Serialize}; 4 use std::collections::{HashMap, HashSet}; ··· 39 } 40 } 41 42 - // get-many-to-many composite cursor 43 #[derive(Copy, Clone, Debug)] 44 - struct CompositeCursor { 45 - backward: u64, 46 - forward: u64, 47 } 48 49 /// A paged collection whose keys are sorted instead of indexed ··· 153 after: Option<String>, 154 filter_dids: &HashSet<Did>, 155 filter_to_targets: &HashSet<String>, 156 - ) -> Result<PagedOrderedCollection<(RecordId, String), String>>; 157 158 fn get_all_counts( 159 &self, ··· 1818 2, 1819 "both forward links at path_to_other should be emitted" 1820 ); 1821 - let mut targets: Vec<_> = result.items.iter().map(|(_, t)| t.as_str()).collect(); 1822 targets.sort(); 1823 assert_eq!(targets, vec!["b.com", "c.com"]); 1824 assert!(result 1825 .items 1826 .iter() 1827 - .all(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1828 assert_eq!(result.next, None); 1829 }); 1830 ··· 1927 let b_items: Vec<_> = result 1928 .items 1929 .iter() 1930 - .filter(|(_, subject)| subject == "b.com") 1931 .collect(); 1932 assert_eq!(b_items.len(), 2); 1933 - assert!(b_items 1934 - .iter() 1935 - .any(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1936 - assert!(b_items 1937 - .iter() 1938 - .any(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf2")); 1939 // Check c.com items 1940 let c_items: Vec<_> = result 1941 .items 1942 .iter() 1943 - .filter(|(_, subject)| subject == "c.com") 1944 .collect(); 1945 assert_eq!(c_items.len(), 2); 1946 - assert!(c_items 1947 - .iter() 1948 - .any(|(r, _)| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa")); 1949 - assert!(c_items 1950 - .iter() 1951 - .any(|(r, _)| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa2")); 1952 1953 // Test with DID filter - should only get records from did:plc:fdsa 1954 let result = storage.get_many_to_many( ··· 1962 &HashSet::new(), 1963 )?; 1964 assert_eq!(result.items.len(), 2); 1965 - assert!(result.items.iter().all(|(_, subject)| subject == "c.com")); 1966 - assert!(result.items.iter().all(|(r, _)| r.did.0 == "did:plc:fdsa")); 1967 1968 // Test with target filter - should only get records linking to b.com 1969 let result = storage.get_many_to_many( ··· 1977 &HashSet::from_iter(["b.com".to_string()]), 1978 )?; 1979 assert_eq!(result.items.len(), 2); 1980 - assert!(result.items.iter().all(|(_, subject)| subject == "b.com")); 1981 - assert!(result.items.iter().all(|(r, _)| r.did.0 == "did:plc:asdf")); 1982 1983 // Pagination edge cases: we have 4 flat items 1984 ··· 2052 assert_eq!(result2.next, None, "next should be None on final page"); 2053 2054 // Verify we got all 4 unique items across both pages (no duplicates, no gaps) 2055 - let mut all_rkeys: Vec<_> = result.items.iter().map(|(r, _)| r.rkey.clone()).collect(); 2056 - all_rkeys.extend(result2.items.iter().map(|(r, _)| r.rkey.clone())); 2057 all_rkeys.sort(); 2058 assert_eq!( 2059 all_rkeys, ··· 2131 .items 2132 .iter() 2133 .chain(page2.items.iter()) 2134 - .map(|(_, t)| t.clone()) 2135 .collect(); 2136 all_targets.sort(); 2137 assert_eq!(
··· 1 + use crate::{ActionableEvent, CountsByCount, Did, ManyToManyItem, RecordId}; 2 use anyhow::Result; 3 use serde::{Deserialize, Serialize}; 4 use std::collections::{HashMap, HashSet}; ··· 39 } 40 } 41 42 #[derive(Copy, Clone, Debug)] 43 + struct ManyToManyCursor { 44 + backlink_idx: u64, 45 + other_link_idx: u64, 46 } 47 48 /// A paged collection whose keys are sorted instead of indexed ··· 152 after: Option<String>, 153 filter_dids: &HashSet<Did>, 154 filter_to_targets: &HashSet<String>, 155 + ) -> Result<PagedOrderedCollection<ManyToManyItem, String>>; 156 157 fn get_all_counts( 158 &self, ··· 1817 2, 1818 "both forward links at path_to_other should be emitted" 1819 ); 1820 + let mut targets: Vec<_> = result 1821 + .items 1822 + .iter() 1823 + .map(|item| item.other_subject.as_str()) 1824 + .collect(); 1825 targets.sort(); 1826 assert_eq!(targets, vec!["b.com", "c.com"]); 1827 assert!(result 1828 .items 1829 .iter() 1830 + .all(|item| item.link_record.uri() == "at://did:plc:asdf/app.t.c/asdf")); 1831 assert_eq!(result.next, None); 1832 }); 1833 ··· 1930 let b_items: Vec<_> = result 1931 .items 1932 .iter() 1933 + .filter(|item| item.other_subject == "b.com") 1934 .collect(); 1935 assert_eq!(b_items.len(), 2); 1936 + assert!(b_items.iter().any( 1937 + |item| item.link_record.did.0 == "did:plc:asdf" && item.link_record.rkey == "asdf" 1938 + )); 1939 + assert!(b_items.iter().any( 1940 + |item| item.link_record.did.0 == "did:plc:asdf" && item.link_record.rkey == "asdf2" 1941 + )); 1942 // Check c.com items 1943 let c_items: Vec<_> = result 1944 .items 1945 .iter() 1946 + .filter(|item| item.other_subject == "c.com") 1947 .collect(); 1948 assert_eq!(c_items.len(), 2); 1949 + assert!(c_items.iter().any( 1950 + |item| item.link_record.did.0 == "did:plc:fdsa" && item.link_record.rkey == "fdsa" 1951 + )); 1952 + assert!(c_items.iter().any( 1953 + |item| item.link_record.did.0 == "did:plc:fdsa" && item.link_record.rkey == "fdsa2" 1954 + )); 1955 1956 // Test with DID filter - should only get records from did:plc:fdsa 1957 let result = storage.get_many_to_many( ··· 1965 &HashSet::new(), 1966 )?; 1967 assert_eq!(result.items.len(), 2); 1968 + assert!(result 1969 + .items 1970 + .iter() 1971 + .all(|item| item.other_subject == "c.com")); 1972 + assert!(result 1973 + .items 1974 + .iter() 1975 + .all(|item| item.link_record.did.0 == "did:plc:fdsa")); 1976 1977 // Test with target filter - should only get records linking to b.com 1978 let result = storage.get_many_to_many( ··· 1986 &HashSet::from_iter(["b.com".to_string()]), 1987 )?; 1988 assert_eq!(result.items.len(), 2); 1989 + assert!(result 1990 + .items 1991 + .iter() 1992 + .all(|item| item.other_subject == "b.com")); 1993 + assert!(result 1994 + .items 1995 + .iter() 1996 + .all(|item| item.link_record.did.0 == "did:plc:asdf")); 1997 1998 // Pagination edge cases: we have 4 flat items 1999 ··· 2067 assert_eq!(result2.next, None, "next should be None on final page"); 2068 2069 // Verify we got all 4 unique items across both pages (no duplicates, no gaps) 2070 + let mut all_rkeys: Vec<_> = result 2071 + .items 2072 + .iter() 2073 + .map(|item| item.link_record.rkey.clone()) 2074 + .collect(); 2075 + all_rkeys.extend( 2076 + result2 2077 + .items 2078 + .iter() 2079 + .map(|item| item.link_record.rkey.clone()), 2080 + ); 2081 all_rkeys.sort(); 2082 assert_eq!( 2083 all_rkeys, ··· 2155 .items 2156 .iter() 2157 .chain(page2.items.iter()) 2158 + .map(|item| item.other_subject.clone()) 2159 .collect(); 2160 all_targets.sort(); 2161 assert_eq!(
+35 -32
constellation/src/storage/rocks_store.rs
··· 1 use super::{ 2 - ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 PagedOrderedCollection, StorageStats, 4 }; 5 - use crate::storage::CompositeCursor; 6 - use crate::{CountsByCount, Did, RecordId}; 7 8 use anyhow::{anyhow, bail, Result}; 9 use bincode::Options as BincodeOptions; ··· 946 path_to_other: &str, 947 limit: u64, 948 after: Option<String>, 949 - filter_dids: &HashSet<Did>, 950 filter_to_targets: &HashSet<String>, 951 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 952 let collection = Collection(collection.to_string()); ··· 962 let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?; 963 964 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 965 - eprintln!("nothin doin for this target, {target_key:?}"); 966 return Ok(PagedOrderedCollection::empty()); 967 }; 968 969 - let filter_did_ids: HashMap<DidId, bool> = filter_dids 970 .iter() 971 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 972 .collect::<Result<Vec<DidIdValue>>>()? ··· 1023 .take(1) 1024 .next() 1025 else { 1026 - eprintln!("no forward match"); 1027 continue; 1028 }; 1029 ··· 1135 path_to_other: &str, 1136 limit: u64, 1137 after: Option<String>, 1138 - filter_dids: &HashSet<Did>, 1139 filter_to_targets: &HashSet<String>, 1140 - ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1141 // helper to resolve dids 1142 let resolve_active_did = |did_id: &DidId| -> Result<Option<Did>> { 1143 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { ··· 1159 let cursor = match after { 1160 Some(a) => { 1161 let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 1162 - let b = b 1163 .parse::<u64>() 1164 .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 1165 - let f = f 1166 .parse::<u64>() 1167 .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 1168 - Some(CompositeCursor { 1169 - backward: b, 1170 - forward: f, 1171 }) 1172 } 1173 None => None, 1174 }; 1175 1176 - eprintln!("cursor: {:#?}", cursor); 1177 - 1178 // (__active__) did ids and filter targets 1179 - let filter_did_ids: HashMap<DidId, bool> = filter_dids 1180 .iter() 1181 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 1182 .collect::<Result<Vec<DidIdValue>>>()? ··· 1197 }; 1198 let linkers = self.get_target_linkers(&target_id)?; 1199 1200 - let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 1201 1202 - // iterate backwards (who linked to the target?) 1203 - for (linker_idx, (did_id, rkey)) in 1204 - linkers.0.iter().enumerate().skip_while(|(linker_idx, _)| { 1205 - cursor.is_some_and(|c| *linker_idx < c.backward as usize) 1206 - }) 1207 { 1208 if did_id.is_empty() 1209 || (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id)) ··· 1220 continue; 1221 }; 1222 1223 - // iterate forward (which of these links point to the __other__ target?) 1224 - for (link_idx, RecordLinkTarget(_, fwd_target_id)) in links 1225 .0 1226 .into_iter() 1227 .enumerate() 1228 .filter(|(_, RecordLinkTarget(rpath, target_id))| { 1229 - eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0); 1230 rpath.0 == path_to_other 1231 && (filter_to_target_ids.is_empty() 1232 || filter_to_target_ids.contains(target_id)) 1233 }) 1234 - .skip_while(|(link_idx, _)| { 1235 cursor.is_some_and(|c| { 1236 - linker_idx == c.backward as usize && *link_idx <= c.forward as usize 1237 }) 1238 }) 1239 .take(limit as usize + 1 - items.len()) ··· 1256 collection: collection.0.clone(), 1257 rkey: rkey.0.clone(), 1258 }; 1259 - items.push((linker_idx, link_idx, record_id, fwd_target_key.0 .0)); 1260 } 1261 1262 // page full - eject ··· 1275 // forward_link_idx are skipped. This correctly resumes mid-record when 1276 // a single backlinker has multiple forward links at path_to_other. 1277 let next = (items.len() > limit as usize).then(|| { 1278 - let (l, f, _, _) = items[limit as usize - 1]; 1279 - format!("{l},{f}") 1280 }); 1281 1282 let items = items 1283 .into_iter() 1284 .take(limit as usize) 1285 - .map(|(_, _, rid, t)| (rid, t)) 1286 .collect(); 1287 1288 Ok(PagedOrderedCollection { items, next })
··· 1 use super::{ 2 + ActionableEvent, LinkReader, LinkStorage, ManyToManyCursor, Order, PagedAppendingCollection, 3 PagedOrderedCollection, StorageStats, 4 }; 5 + use crate::{CountsByCount, Did, ManyToManyItem, RecordId}; 6 7 use anyhow::{anyhow, bail, Result}; 8 use bincode::Options as BincodeOptions; ··· 945 path_to_other: &str, 946 limit: u64, 947 after: Option<String>, 948 + filter_link_dids: &HashSet<Did>, 949 filter_to_targets: &HashSet<String>, 950 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 951 let collection = Collection(collection.to_string()); ··· 961 let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?; 962 963 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 964 return Ok(PagedOrderedCollection::empty()); 965 }; 966 967 + let filter_did_ids: HashMap<DidId, bool> = filter_link_dids 968 .iter() 969 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 970 .collect::<Result<Vec<DidIdValue>>>()? ··· 1021 .take(1) 1022 .next() 1023 else { 1024 continue; 1025 }; 1026 ··· 1132 path_to_other: &str, 1133 limit: u64, 1134 after: Option<String>, 1135 + filter_link_dids: &HashSet<Did>, 1136 filter_to_targets: &HashSet<String>, 1137 + ) -> Result<PagedOrderedCollection<ManyToManyItem, String>> { 1138 // helper to resolve dids 1139 let resolve_active_did = |did_id: &DidId| -> Result<Option<Did>> { 1140 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { ··· 1156 let cursor = match after { 1157 Some(a) => { 1158 let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 1159 + let backlink_idx = b 1160 .parse::<u64>() 1161 .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 1162 + let other_link_idx = f 1163 .parse::<u64>() 1164 .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 1165 + Some(ManyToManyCursor { 1166 + backlink_idx, 1167 + other_link_idx, 1168 }) 1169 } 1170 None => None, 1171 }; 1172 1173 // (__active__) did ids and filter targets 1174 + let filter_did_ids: HashMap<DidId, bool> = filter_link_dids 1175 .iter() 1176 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 1177 .collect::<Result<Vec<DidIdValue>>>()? ··· 1192 }; 1193 let linkers = self.get_target_linkers(&target_id)?; 1194 1195 + let mut items: Vec<(usize, usize, ManyToManyItem)> = Vec::new(); 1196 1197 + // iterate backlinks (who linked to the target?) 1198 + for (backlink_idx, (did_id, rkey)) in 1199 + linkers 1200 + .0 1201 + .iter() 1202 + .enumerate() 1203 + .skip_while(|(backlink_idx, _)| { 1204 + cursor.is_some_and(|c| *backlink_idx < c.backlink_idx as usize) 1205 + }) 1206 { 1207 if did_id.is_empty() 1208 || (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id)) ··· 1219 continue; 1220 }; 1221 1222 + // iterate fwd links (which of these links point to the "other" target?) 1223 + for (other_link_idx, RecordLinkTarget(_, fwd_target_id)) in links 1224 .0 1225 .into_iter() 1226 .enumerate() 1227 .filter(|(_, RecordLinkTarget(rpath, target_id))| { 1228 rpath.0 == path_to_other 1229 && (filter_to_target_ids.is_empty() 1230 || filter_to_target_ids.contains(target_id)) 1231 }) 1232 + .skip_while(|(other_link_idx, _)| { 1233 cursor.is_some_and(|c| { 1234 + backlink_idx == c.backlink_idx as usize 1235 + && *other_link_idx <= c.other_link_idx as usize 1236 }) 1237 }) 1238 .take(limit as usize + 1 - items.len()) ··· 1255 collection: collection.0.clone(), 1256 rkey: rkey.0.clone(), 1257 }; 1258 + let item = ManyToManyItem { 1259 + link_record: record_id, 1260 + other_subject: fwd_target_key.0 .0, 1261 + }; 1262 + items.push((backlink_idx, other_link_idx, item)); 1263 } 1264 1265 // page full - eject ··· 1278 // forward_link_idx are skipped. This correctly resumes mid-record when 1279 // a single backlinker has multiple forward links at path_to_other. 1280 let next = (items.len() > limit as usize).then(|| { 1281 + let (b, o, _) = items[limit as usize - 1]; 1282 + format!("{b},{o}") 1283 }); 1284 1285 let items = items 1286 .into_iter() 1287 .take(limit as usize) 1288 + .map(|(_, _, item)| item) 1289 .collect(); 1290 1291 Ok(PagedOrderedCollection { items, next })
+13 -7
constellation/templates/get-many-to-many.html.j2
··· 6 7 {% block content %} 8 9 - {% call try_it::get_many_to_many(query.subject, query.source, query.path_to_other, query.did, query.other_subject, query.limit) %} 10 11 <h2> 12 Many-to-many links to <code>{{ query.subject }}</code> ··· 19 20 <ul> 21 <li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li> 22 </ul> 23 24 <h3>Many-to-many links, most recent first:</h3> 25 26 {% for item in items %} 27 - <pre style="display: block; margin: 1em 2em" class="code"><strong>Subject</strong>: <a href="/links/all?target={{ item.subject|urlencode }}">{{ item.subject }}</a> 28 - <strong>DID</strong>: {{ item.link.did().0 }} 29 - <strong>Collection</strong>: {{ item.link.collection }} 30 - <strong>RKey</strong>: {{ item.link.rkey }} 31 - -> <a href="https://pdsls.dev/at://{{ item.link.did().0 }}/{{ item.link.collection }}/{{ item.link.rkey }}">browse record</a></pre> 32 {% endfor %} 33 34 {% if let Some(c) = cursor %} ··· 36 <input type="hidden" name="subject" value="{{ query.subject }}" /> 37 <input type="hidden" name="source" value="{{ query.source }}" /> 38 <input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" /> 39 - {% for did in query.did %} 40 <input type="hidden" name="did" value="{{ did }}" /> 41 {% endfor %} 42 {% for other in query.other_subject %}
··· 6 7 {% block content %} 8 9 + {% call try_it::get_many_to_many(query.subject, query.source, query.path_to_other, query.link_did, query.other_subject, query.limit) %} 10 11 <h2> 12 Many-to-many links to <code>{{ query.subject }}</code> ··· 19 20 <ul> 21 <li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li> 22 + {# todo: add link to see many-to-many counts #} 23 </ul> 24 25 <h3>Many-to-many links, most recent first:</h3> 26 27 {% for item in items %} 28 + <pre style="display: block; margin: 1em 2em" class="code"><strong>Linking record</strong>: 29 + {%- if let Some(uri) = item.link_record.uri().as_str()|to_browseable %} <a href="{{ uri }}">browse link record</a>{% endif %} 30 + DID: {{ item.link_record.did().0 }} 31 + Collection: {{ item.link_record.collection() }} 32 + RKey: {{ item.link_record.rkey() }} 33 + <strong>Other subject</strong>: {{ item.other_subject }} 34 + {%- if let Some(uri) = item.other_subject.as_str()|to_browseable %} 35 + -> <a href="{{ uri }}">browse subject</a> 36 + {%- endif %} 37 + </pre> 38 {% endfor %} 39 40 {% if let Some(c) = cursor %} ··· 42 <input type="hidden" name="subject" value="{{ query.subject }}" /> 43 <input type="hidden" name="source" value="{{ query.source }}" /> 44 <input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" /> 45 + {% for did in query.link_did %} 46 <input type="hidden" name="did" value="{{ did }}" /> 47 {% endfor %} 48 {% for other in query.other_subject %}
+8 -1
constellation/templates/hello.html.j2
··· 98 </ul> 99 100 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 101 - {% call try_it::get_many_to_many("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", "reply.parent.uri", [""], [""], 16) %} 102 103 <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getDistinct</code></h3> 104
··· 98 </ul> 99 100 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 101 + {% call try_it::get_many_to_many( 102 + "at://did:plc:uyauirpjzk6le4ygqzatcwnq/app.bsky.graph.list/3lzhg33t5bf2h", 103 + "app.bsky.graph.listitem:list", 104 + "subject", 105 + [""], 106 + [""], 107 + 16, 108 + ) %} 109 110 <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getDistinct</code></h3> 111
+9 -7
lexicons/blue.microcosm/links/getManyToMany.json
··· 4 "defs": { 5 "main": { 6 "type": "query", 7 - "description": "Get records that link to a primary subject along with the secondary subjects they also reference", 8 "parameters": { 9 "type": "params", 10 "required": ["subject", "source", "pathToOther"], ··· 22 "type": "string", 23 "description": "path to the secondary link in the many-to-many record (e.g., 'otherThing.uri')" 24 }, 25 - "did": { 26 "type": "array", 27 - "description": "filter links to those from specific users", 28 "items": { 29 "type": "string", 30 "format": "did" ··· 68 }, 69 "item": { 70 "type": "object", 71 - "required": ["link", "subject"], 72 "properties": { 73 - "link": { 74 "type": "ref", 75 - "ref": "#linkRecord" 76 }, 77 - "subject": { 78 "type": "string" 79 } 80 }
··· 4 "defs": { 5 "main": { 6 "type": "query", 7 + "description": "Get records that link out to both a primary and secondary subject", 8 "parameters": { 9 "type": "params", 10 "required": ["subject", "source", "pathToOther"], ··· 22 "type": "string", 23 "description": "path to the secondary link in the many-to-many record (e.g., 'otherThing.uri')" 24 }, 25 + "linkDid": { 26 "type": "array", 27 + "description": "filter linking records from specific users", 28 "items": { 29 "type": "string", 30 "format": "did" ··· 68 }, 69 "item": { 70 "type": "object", 71 + "required": ["linkRecord", "otherSubject"], 72 "properties": { 73 + "linkRecord": { 74 "type": "ref", 75 + "ref": "#linkRecord", 76 + "description": "reference to the link record itself" 77 }, 78 + "otherSubject": { 79 + "description": "the secondary subject from the link record", 80 "type": "string" 81 } 82 }

History

5 rounds 0 comments
sign up or login to add to the discussion
6 commits
expand
clean up dervive traits, specific cursor name
bikeshed did filter name for m2m queries
remove some prints
ManyToManyItem and set default "try-it": listitems
update lexicon
add _idx suffix for clarity (thanks maxh!) +fwd->o
expand 0 comments
pull request successfully merged
5 commits
expand
clean up dervive traits, specific cursor name
bikeshed did filter name for m2m queries
remove some prints
ManyToManyItem and set default "try-it": listitems
update lexicon
expand 0 comments
5 commits
expand
clean up dervive traits, specific cursor name
bikeshed did filter name for m2m queries
remove some prints
ManyToManyItem and set default "try-it": listitems
update lexicon
expand 0 comments
4 commits
expand
clean up dervive traits, specific cursor name
bikeshed did filter name for m2m queries
remove some prints
ManyToManyItem and set default "try-it": listitems
expand 0 comments
2 commits
expand
clean up dervive traits, specific cursor name
bikeshed did filter name for m2m queries
expand 0 comments