Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Add new get_many_to_many XRPC endpoint #7

merged opened by seoul.systems targeting main from seoul.systems/microcosm-rs: xrpc_many2many

Added a new XRPC API endpoint to fetch joined record URIs, termed get_many_to_many (we talked about this briefly on Discord already). It is implemented and functions almost identical to the existing get_many_to_many_counts endpoint and handler. Some of its possible flaws like the two step lookup to verify a matching DID is indeed active are duplicated as well. On the plus side, this should make the PR pretty straightforward to review and make it easier to modify both endpoints later on when a more efficient way to validate the status of DIDs is possible.

If you have comments remarks etc. I am happy to work on some parts again.

Labels

None yet.

Participants 2
AT URI
at://did:plc:53wellrw53o7sw4zlpfenvuh/sh.tangled.repo.pull/3mbkyehqooh22
+286 -260
Interdiff #5 โ†’ #6
Cargo.lock

This file has not been changed.

constellation/Cargo.toml

This file has not been changed.

constellation/src/lib.rs

This file has not been changed.

constellation/src/server/mod.rs

This file has not been changed.

+75 -88
constellation/src/storage/mem_store.rs
··· 1 use super::{ 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 - use crate::storage::{decode_m2m_cursor, encode_m2m_cursor}; 5 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 6 - use anyhow::Result; 7 use links::CollectedLink; 8 use std::collections::{HashMap, HashSet}; 9 use std::sync::{Arc, Mutex}; 10 ··· 245 limit: u64, 246 after: Option<String>, 247 filter_dids: &HashSet<Did>, 248 - filter_to_targets: &HashSet<String>, 249 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 250 - let empty_res = Ok(PagedOrderedCollection { 251 - items: Vec::new(), 252 - next: None, 253 - }); 254 255 - let data = self.0.lock().unwrap(); 256 257 let Some(sources) = data.targets.get(&Target::new(target)) else { 258 - return empty_res; 259 }; 260 let Some(linkers) = sources.get(&Source::new(collection, path)) else { 261 - return empty_res; 262 }; 263 - let path_to_other = RecordPath::new(path_to_other); 264 265 - // Convert filter_to_targets to Target objects for comparison 266 - let filter_to_target_objs: HashSet<Target> = 267 - HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s))); 268 269 - let mut grouped_links: HashMap<Target, Vec<RecordId>> = HashMap::new(); 270 - for (did, rkey) in linkers.iter().flatten().cloned() { 271 - // Filter by DID if filter is provided 272 - if !filter_dids.is_empty() && !filter_dids.contains(&did) { 273 - continue; 274 - } 275 - if let Some(fwd_target) = data 276 - .links 277 - .get(&did) 278 - .unwrap_or(&HashMap::new()) 279 - .get(&RepoId { 280 collection: collection.to_string(), 281 rkey: rkey.clone(), 282 }) 283 - .unwrap_or(&Vec::new()) 284 .iter() 285 - .find_map(|(path, target)| { 286 - if *path == path_to_other 287 - && (filter_to_target_objs.is_empty() 288 - || filter_to_target_objs.contains(target)) 289 - { 290 - Some(target) 291 - } else { 292 - None 293 - } 294 }) 295 { 296 - let record_ids = grouped_links.entry(fwd_target.clone()).or_default(); 297 - record_ids.push(RecordId { 298 - did, 299 - collection: collection.to_string(), 300 - rkey: rkey.0, 301 - }); 302 } 303 - } 304 305 - let mut items = grouped_links 306 - .into_iter() 307 - .flat_map(|(target, records)| { 308 - records 309 - .iter() 310 - .map(move |r| (r.clone(), target.0.clone())) 311 - .collect::<Vec<_>>() 312 - }) 313 - .collect::<Vec<_>>(); 314 - 315 - // first try to sort by subject, then by did, collection and finally rkey 316 - items.sort_by(|a, b| { 317 - if a.1 == b.1 { 318 - a.0.cmp(&b.0) 319 - } else { 320 - a.1.cmp(&b.1) 321 } 322 - }); 323 324 - // Parse cursor if provided (malformed cursor silently ignored) 325 - let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok()); 326 - 327 - // Apply cursor: skip everything up to and including the cursor position 328 - items = items 329 - .into_iter() 330 - .skip_while(|item| { 331 - let Some((after_did, after_rkey, after_subject)) = &after_cursor else { 332 - return false; 333 - }; 334 - 335 - if &item.1 == after_subject { 336 - // Same subject โ€” compare by RecordId to find our position 337 - let cursor_id = RecordId { 338 - did: Did(after_did.clone()), 339 - collection: collection.to_string(), 340 - rkey: after_rkey.clone(), 341 - }; 342 - item.0.cmp(&cursor_id).is_le() 343 - } else { 344 - // Different subject โ€” compare subjects directly 345 - item.1.cmp(after_subject).is_le() 346 - } 347 - }) 348 - .take(limit as usize + 1) 349 - .collect(); 350 - 351 - // Build the new cursor from last item, if needed 352 let next = if items.len() as u64 > limit { 353 items.truncate(limit as usize); 354 items 355 .last() 356 - .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1)) 357 } else { 358 None 359 }; 360 361 Ok(PagedOrderedCollection { items, next }) 362 } 363
··· 1 use super::{ 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 + 6 + use anyhow::{anyhow, Result}; 7 + use base64::engine::general_purpose as b64; 8 + use base64::Engine as _; 9 use links::CollectedLink; 10 + 11 use std::collections::{HashMap, HashSet}; 12 use std::sync::{Arc, Mutex}; 13 ··· 248 limit: u64, 249 after: Option<String>, 250 filter_dids: &HashSet<Did>, 251 + filter_targets: &HashSet<String>, 252 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 253 + // setup variables that we need later 254 + let path_to_other = RecordPath(path_to_other.to_string()); 255 + let filter_targets: HashSet<Target> = 256 + HashSet::from_iter(filter_targets.iter().map(|s| Target::new(s))); 257 258 + // extract parts form composite cursor 259 + let (backward_idx, forward_idx) = match after { 260 + Some(a) => { 261 + let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?; 262 + let (b, f) = after_str 263 + .split_once(',') 264 + .ok_or_else(|| anyhow!("invalid cursor format"))?; 265 + ( 266 + (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?, 267 + (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?, 268 + ) 269 + } 270 + None => (None, None), 271 + }; 272 273 + let data = self.0.lock().unwrap(); 274 let Some(sources) = data.targets.get(&Target::new(target)) else { 275 + return Ok(PagedOrderedCollection::empty()); 276 }; 277 let Some(linkers) = sources.get(&Source::new(collection, path)) else { 278 + return Ok(PagedOrderedCollection::empty()); 279 }; 280 281 + let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 282 283 + // iterate backwards (who linked to the target?) 284 + for (linker_idx, (did, rkey)) in linkers 285 + .iter() 286 + .enumerate() 287 + .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v))) 288 + .skip_while(|(linker_idx, _)| { 289 + backward_idx.is_some_and(|idx| match forward_idx { 290 + Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping 291 + None => *linker_idx <= idx as usize, // exclusive: skip right here 292 + }) 293 + }) 294 + .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(&did)) 295 + { 296 + let Some(links) = data.links.get(&did).and_then(|m| { 297 + m.get(&RepoId { 298 collection: collection.to_string(), 299 rkey: rkey.clone(), 300 }) 301 + }) else { 302 + continue; 303 + }; 304 + 305 + // iterate forward (which of these links point to the __other__ target?) 306 + for (link_idx, (_, fwd_target)) in links 307 .iter() 308 + .enumerate() 309 + .filter(|(_, (p, t))| { 310 + *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t)) 311 }) 312 + .skip_while(|(link_idx, _)| { 313 + backward_idx.is_some_and(|bl_idx| { 314 + linker_idx == bl_idx as usize 315 + && forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize) 316 + }) 317 + }) 318 + .take(limit as usize + 1 - items.len()) 319 { 320 + items.push(( 321 + linker_idx, 322 + link_idx, 323 + RecordId { 324 + did: did.clone(), 325 + collection: collection.to_string(), 326 + rkey: rkey.0.clone(), 327 + }, 328 + fwd_target.0.clone(), 329 + )); 330 } 331 332 + // page full - eject 333 + if items.len() > limit as usize { 334 + break; 335 } 336 + } 337 338 let next = if items.len() as u64 > limit { 339 items.truncate(limit as usize); 340 items 341 .last() 342 + .map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64))) 343 } else { 344 None 345 }; 346 347 + let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect(); 348 Ok(PagedOrderedCollection { items, next }) 349 } 350
+105 -57
constellation/src/storage/mod.rs
··· 6 pub mod mem_store; 7 pub use mem_store::MemStorage; 8 9 - use anyhow::anyhow; 10 - 11 - use base64::engine::general_purpose as b64; 12 - use base64::Engine as _; 13 - 14 #[cfg(feature = "rocks")] 15 pub mod rocks_store; 16 #[cfg(feature = "rocks")] ··· 161 fn get_stats(&self) -> Result<StorageStats>; 162 } 163 164 - // Shared helpers 165 - 166 - /// Decode a base64 cursor into its component parts (did, rkey, subject). 167 - /// The subject is placed last because it may contain '|' characters. 168 - pub(crate) fn decode_m2m_cursor(cursor: &str) -> Result<(String, String, String)> { 169 - let decoded = String::from_utf8(b64::URL_SAFE.decode(cursor)?)?; 170 - let mut parts = decoded.splitn(3, '|').map(String::from); 171 - 172 - // Using .next() to pull each part out of the iterator in order. 173 - // This avoids collecting into a Vec just to index and clone back out. 174 - let did = parts 175 - .next() 176 - .ok_or_else(|| anyhow!("missing did in cursor"))?; 177 - let rkey = parts 178 - .next() 179 - .ok_or_else(|| anyhow!("missing rkey in cursor"))?; 180 - let subject = parts 181 - .next() 182 - .ok_or_else(|| anyhow!("missing subject in cursor"))?; 183 - 184 - Ok((did, rkey, subject)) 185 - } 186 - 187 - /// Encode cursor components into a base64 string. 188 - pub(crate) fn encode_m2m_cursor(did: &str, rkey: &str, subject: &str) -> String { 189 - let raw = format!("{did}|{rkey}|{subject}"); 190 - b64::URL_SAFE.encode(&raw) 191 - } 192 - 193 #[cfg(test)] 194 195 ··· 1802 }); 1803 1804 test_each_storage!(get_m2m_single, |storage| { 1805 storage.push( 1806 &ActionableEvent::CreateLinks { 1807 record_id: RecordId { ··· 1819 path: ".def.uri".into(), 1820 }, 1821 CollectedLink { 1822 - target: Link::Uri("b.com".into()), 1823 - path: ".ghi.uri".into(), 1824 }, 1825 ], 1826 }, 1827 0, 1828 )?; 1829 assert_eq!( 1830 - storage.get_many_to_many( 1831 - "a.com", 1832 - "app.t.c", 1833 - ".abc.uri", 1834 - ".def.uri", 1835 - 10, 1836 - None, 1837 - &HashSet::new(), 1838 - &HashSet::new(), 1839 - )?, 1840 - PagedOrderedCollection { 1841 - items: vec![( 1842 - RecordId { 1843 - did: "did:plc:asdf".into(), 1844 - collection: "app.t.c".into(), 1845 - rkey: "asdf".into(), 1846 - }, 1847 - "b.com".to_string(), 1848 - )], 1849 - next: None, 1850 - } 1851 ); 1852 }); 1853 1854 test_each_storage!(get_m2m_filters, |storage| { ··· 2082 all_rkeys, 2083 vec!["asdf", "asdf2", "fdsa", "fdsa2"], 2084 "should have all 4 records across both pages" 2085 ); 2086 }); 2087 }
··· 6 pub mod mem_store; 7 pub use mem_store::MemStorage; 8 9 #[cfg(feature = "rocks")] 10 pub mod rocks_store; 11 #[cfg(feature = "rocks")] ··· 156 fn get_stats(&self) -> Result<StorageStats>; 157 } 158 159 #[cfg(test)] 160 161 ··· 1768 }); 1769 1770 test_each_storage!(get_m2m_single, |storage| { 1771 + // One record linking to a.com (backward), with two forward links at 1772 + // the same path_to_other (.def.uri) pointing to b.com and c.com. 1773 + // Both forward targets must appear in the output. 1774 storage.push( 1775 &ActionableEvent::CreateLinks { 1776 record_id: RecordId { ··· 1788 path: ".def.uri".into(), 1789 }, 1790 CollectedLink { 1791 + target: Link::Uri("c.com".into()), 1792 + path: ".def.uri".into(), 1793 }, 1794 ], 1795 }, 1796 0, 1797 )?; 1798 + let result = storage.get_many_to_many( 1799 + "a.com", 1800 + "app.t.c", 1801 + ".abc.uri", 1802 + ".def.uri", 1803 + 10, 1804 + None, 1805 + &HashSet::new(), 1806 + &HashSet::new(), 1807 + )?; 1808 assert_eq!( 1809 + result.items.len(), 1810 + 2, 1811 + "both forward links at path_to_other should be emitted" 1812 ); 1813 + let mut targets: Vec<_> = result.items.iter().map(|(_, t)| t.as_str()).collect(); 1814 + targets.sort(); 1815 + assert_eq!(targets, vec!["b.com", "c.com"]); 1816 + assert!(result 1817 + .items 1818 + .iter() 1819 + .all(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1820 + assert_eq!(result.next, None); 1821 }); 1822 1823 test_each_storage!(get_m2m_filters, |storage| { ··· 2051 all_rkeys, 2052 vec!["asdf", "asdf2", "fdsa", "fdsa2"], 2053 "should have all 4 records across both pages" 2054 + ); 2055 + }); 2056 + 2057 + // Pagination that splits across forward links within a single backlinker. 2058 + // The cursor should correctly resume mid-record on the next page. 2059 + test_each_storage!(get_m2m_paginate_within_forward_links, |storage| { 2060 + // Record with 1 backward link and 3 forward links at the same path 2061 + storage.push( 2062 + &ActionableEvent::CreateLinks { 2063 + record_id: RecordId { 2064 + did: "did:plc:lister".into(), 2065 + collection: "app.t.c".into(), 2066 + rkey: "list1".into(), 2067 + }, 2068 + links: vec![ 2069 + CollectedLink { 2070 + target: Link::Uri("a.com".into()), 2071 + path: ".subject.uri".into(), 2072 + }, 2073 + CollectedLink { 2074 + target: Link::Uri("x.com".into()), 2075 + path: ".items[].uri".into(), 2076 + }, 2077 + CollectedLink { 2078 + target: Link::Uri("y.com".into()), 2079 + path: ".items[].uri".into(), 2080 + }, 2081 + CollectedLink { 2082 + target: Link::Uri("z.com".into()), 2083 + path: ".items[].uri".into(), 2084 + }, 2085 + ], 2086 + }, 2087 + 0, 2088 + )?; 2089 + 2090 + // Page 1: limit=2, should get 2 of the 3 forward links 2091 + let page1 = storage.get_many_to_many( 2092 + "a.com", 2093 + "app.t.c", 2094 + ".subject.uri", 2095 + ".items[].uri", 2096 + 2, 2097 + None, 2098 + &HashSet::new(), 2099 + &HashSet::new(), 2100 + )?; 2101 + assert_eq!(page1.items.len(), 2, "first page should have 2 items"); 2102 + assert!( 2103 + page1.next.is_some(), 2104 + "should have a next cursor for remaining item" 2105 + ); 2106 + 2107 + // Page 2: should get the remaining 1 forward link 2108 + let page2 = storage.get_many_to_many( 2109 + "a.com", 2110 + "app.t.c", 2111 + ".subject.uri", 2112 + ".items[].uri", 2113 + 2, 2114 + page1.next, 2115 + &HashSet::new(), 2116 + &HashSet::new(), 2117 + )?; 2118 + assert_eq!(page2.items.len(), 1, "second page should have 1 item"); 2119 + assert_eq!(page2.next, None, "no more pages"); 2120 + 2121 + // Verify all 3 targets appear across pages with no duplicates 2122 + let mut all_targets: Vec<_> = page1 2123 + .items 2124 + .iter() 2125 + .chain(page2.items.iter()) 2126 + .map(|(_, t)| t.clone()) 2127 + .collect(); 2128 + all_targets.sort(); 2129 + assert_eq!( 2130 + all_targets, 2131 + vec!["x.com", "y.com", "z.com"], 2132 + "all forward targets should appear exactly once across pages" 2133 ); 2134 }); 2135 }
+106 -115
constellation/src/storage/rocks_store.rs
··· 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 PagedOrderedCollection, StorageStats, 4 }; 5 - use crate::storage::{decode_m2m_cursor, encode_m2m_cursor}; 6 use crate::{CountsByCount, Did, RecordId}; 7 - use anyhow::{bail, Result}; 8 use bincode::Options as BincodeOptions; 9 use links::CollectedLink; 10 use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; ··· 15 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 16 }; 17 use serde::{Deserialize, Serialize}; 18 use std::collections::{BTreeMap, HashMap, HashSet}; 19 use std::io::Read; 20 use std::marker::PhantomData; ··· 25 }; 26 use std::thread; 27 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 28 - use tokio_util::sync::CancellationToken; 29 30 static DID_IDS_CF: &str = "did_ids"; 31 static TARGET_IDS_CF: &str = "target_ids"; ··· 1136 filter_dids: &HashSet<Did>, 1137 filter_to_targets: &HashSet<String>, 1138 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1139 let collection = Collection(collection.to_string()); 1140 let path = RPath(path.to_string()); 1141 1142 - let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1143 1144 - // Parse cursor if provided (malformed cursor silently ignored) 1145 - let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok()); 1146 1147 - let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1148 - eprintln!("Target not found for {target_key:?}"); 1149 - return Ok(PagedOrderedCollection::empty()); 1150 }; 1151 1152 let filter_did_ids: HashMap<DidId, bool> = filter_dids 1153 .iter() 1154 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) ··· 1156 .into_iter() 1157 .map(|DidIdValue(id, active)| (id, active)) 1158 .collect(); 1159 - 1160 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 1161 for t in filter_to_targets { 1162 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { ··· 1164 } 1165 } 1166 1167 let linkers = self.get_target_linkers(&target_id)?; 1168 1169 - // we want to provide many to many which effectively means that we want to show a specific 1170 - // list of reords that is linked to by a specific number of linkers 1171 - let mut grouped_links: BTreeMap<TargetId, Vec<RecordId>> = BTreeMap::new(); 1172 - for (did_id, rkey) in linkers.0 { 1173 - if did_id.is_empty() { 1174 - continue; 1175 - } 1176 1177 - if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) { 1178 continue; 1179 } 1180 1181 - // Make sure the current did is active 1182 - let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1183 - eprintln!("failed to look up did from did_id {did_id:?}"); 1184 continue; 1185 }; 1186 - let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1187 - eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1188 - continue; 1189 - }; 1190 - if !active { 1191 - continue; 1192 - } 1193 1194 - let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey.clone()); 1195 - let Some(targets) = self.get_record_link_targets(&record_link_key)? else { 1196 - continue; 1197 - }; 1198 - 1199 - let Some(fwd_target) = targets 1200 .0 1201 .into_iter() 1202 - .filter_map(|RecordLinkTarget(rpath, target_id)| { 1203 - if rpath.0 == path_to_other 1204 && (filter_to_target_ids.is_empty() 1205 - || filter_to_target_ids.contains(&target_id)) 1206 - { 1207 - Some(target_id) 1208 - } else { 1209 - None 1210 - } 1211 }) 1212 - .take(1) 1213 - .next() 1214 - else { 1215 - eprintln!("no forward match found."); 1216 - continue; 1217 - }; 1218 - 1219 - let page_is_full = grouped_links.len() as u64 >= limit; 1220 - if page_is_full { 1221 - let current_max = grouped_links.keys().next_back().unwrap(); 1222 - if fwd_target > *current_max { 1223 continue; 1224 - } 1225 - } 1226 - 1227 - // link to be added 1228 - let record_id = RecordId { 1229 - did, 1230 - collection: collection.0.clone(), 1231 - rkey: rkey.0, 1232 - }; 1233 - 1234 - // pagination: 1235 - if after_cursor.is_some() { 1236 - // extract composite-cursor parts 1237 - let Some((after_did, after_rkey, after_subject)) = &after_cursor else { 1238 - continue; 1239 }; 1240 - 1241 let Some(fwd_target_key) = self 1242 .target_id_table 1243 - .get_val_from_id(&self.db, fwd_target.0)? 1244 else { 1245 - eprintln!("failed to look up target from target_id {fwd_target:?}"); 1246 continue; 1247 }; 1248 1249 - // first try and compare by subject only 1250 - if &fwd_target_key.0 .0 != after_subject 1251 - && fwd_target_key.0 .0.cmp(after_subject).is_le() 1252 - { 1253 - continue; 1254 - } 1255 - 1256 - // then, if needed, we compare by record id 1257 - let cursor_id = RecordId { 1258 - did: Did(after_did.clone()), 1259 collection: collection.0.clone(), 1260 - rkey: after_rkey.clone(), 1261 }; 1262 - if record_id.cmp(&cursor_id).is_le() { 1263 - continue; 1264 - } 1265 } 1266 1267 - // pagination, continued 1268 - let mut should_evict = false; 1269 - let entry = grouped_links.entry(fwd_target.clone()).or_insert_with(|| { 1270 - should_evict = page_is_full; 1271 - Vec::default() 1272 - }); 1273 - entry.push(record_id); 1274 - 1275 - if should_evict { 1276 - grouped_links.pop_last(); 1277 } 1278 } 1279 1280 - let mut items: Vec<(RecordId, String)> = Vec::with_capacity(grouped_links.len()); 1281 - for (fwd_target_id, records) in &grouped_links { 1282 - let Some(target_key) = self 1283 - .target_id_table 1284 - .get_val_from_id(&self.db, fwd_target_id.0)? 1285 - else { 1286 - eprintln!("failed to look up target from target_id {fwd_target_id:?}"); 1287 - continue; 1288 - }; 1289 - 1290 - let target_string = target_key.0 .0; 1291 - 1292 - records 1293 - .iter() 1294 - .for_each(|r| items.push((r.clone(), target_string.clone()))); 1295 - } 1296 - 1297 - // Build new cursor from last the item, if needed 1298 let next = if items.len() as u64 > limit { 1299 items.truncate(limit as usize); 1300 items 1301 .last() 1302 - .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1)) 1303 } else { 1304 None 1305 }; 1306 1307 Ok(PagedOrderedCollection { items, next }) 1308 } 1309 ··· 1645 } 1646 1647 // target ids 1648 - #[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1649 struct TargetId(u64); // key 1650 1651 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
··· 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 PagedOrderedCollection, StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 + 7 + use anyhow::{anyhow, bail, Result}; 8 + use base64::engine::general_purpose as b64; 9 + use base64::Engine as _; 10 use bincode::Options as BincodeOptions; 11 use links::CollectedLink; 12 use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; ··· 17 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 18 }; 19 use serde::{Deserialize, Serialize}; 20 + use tokio_util::sync::CancellationToken; 21 + 22 use std::collections::{BTreeMap, HashMap, HashSet}; 23 use std::io::Read; 24 use std::marker::PhantomData; ··· 29 }; 30 use std::thread; 31 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 32 33 static DID_IDS_CF: &str = "did_ids"; 34 static TARGET_IDS_CF: &str = "target_ids"; ··· 1139 filter_dids: &HashSet<Did>, 1140 filter_to_targets: &HashSet<String>, 1141 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1142 + // helper to resolve dids 1143 + let resolve_active_did = |did_id: &DidId| -> Option<Did> { 1144 + let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0).ok()? else { 1145 + eprintln!("failed to look up did from did_id {did_id:?}"); 1146 + return None; 1147 + }; 1148 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did).ok()? 1149 + else { 1150 + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1151 + return None; 1152 + }; 1153 + active.then_some(did) 1154 + }; 1155 + 1156 + // setup variables that we need later 1157 let collection = Collection(collection.to_string()); 1158 let path = RPath(path.to_string()); 1159 1160 + // extract parts form composite cursor 1161 + let (backward_idx, forward_idx) = match after { 1162 + Some(a) => { 1163 + eprintln!("a: {:#?}", a); 1164 1165 + let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?; 1166 1167 + eprintln!("after_str: {:#?}", after_str); 1168 + let (b, f) = after_str 1169 + .split_once(',') 1170 + .ok_or_else(|| anyhow!("invalid cursor format"))?; 1171 + ( 1172 + (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?, 1173 + (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?, 1174 + ) 1175 + } 1176 + None => (None, None), 1177 }; 1178 1179 + eprintln!("backward_idx: {:#?}", backward_idx); 1180 + eprintln!("forward_idx: {:#?}", forward_idx); 1181 + 1182 + // (__active__) did ids and filter targets 1183 let filter_did_ids: HashMap<DidId, bool> = filter_dids 1184 .iter() 1185 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) ··· 1187 .into_iter() 1188 .map(|DidIdValue(id, active)| (id, active)) 1189 .collect(); 1190 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 1191 for t in filter_to_targets { 1192 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { ··· 1194 } 1195 } 1196 1197 + let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1198 + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1199 + eprintln!("Target not found for {target_key:?}"); 1200 + return Ok(PagedOrderedCollection::empty()); 1201 + }; 1202 let linkers = self.get_target_linkers(&target_id)?; 1203 1204 + let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 1205 1206 + // iterate backwards (who linked to the target?) 1207 + for (linker_idx, (did_id, rkey)) in 1208 + linkers.0.iter().enumerate().skip_while(|(linker_idx, _)| { 1209 + backward_idx.is_some_and(|idx| match forward_idx { 1210 + Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping 1211 + None => *linker_idx <= idx as usize, // exclusive: skip right here 1212 + }) 1213 + }) 1214 + { 1215 + if did_id.is_empty() 1216 + || (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id)) 1217 + { 1218 continue; 1219 } 1220 1221 + let Some(links) = self.get_record_link_targets(&RecordLinkKey( 1222 + *did_id, 1223 + collection.clone(), 1224 + rkey.clone(), 1225 + ))? 1226 + else { 1227 continue; 1228 }; 1229 1230 + // iterate forward (which of these links point to the __other__ target?) 1231 + for (link_idx, RecordLinkTarget(_, fwd_target_id)) in links 1232 .0 1233 .into_iter() 1234 + .enumerate() 1235 + .filter(|(_, RecordLinkTarget(rpath, target_id))| { 1236 + eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0); 1237 + rpath.0 == path_to_other 1238 && (filter_to_target_ids.is_empty() 1239 + || filter_to_target_ids.contains(target_id)) 1240 }) 1241 + .skip_while(|(link_idx, _)| { 1242 + backward_idx.is_some_and(|bl_idx| { 1243 + linker_idx == bl_idx as usize 1244 + && forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize) 1245 + }) 1246 + }) 1247 + .take(limit as usize + 1 - items.len()) 1248 + { 1249 + // extract forward target did (target that links to the __other__ target) 1250 + let Some(did) = resolve_active_did(did_id) else { 1251 continue; 1252 }; 1253 + // resolve to target string 1254 let Some(fwd_target_key) = self 1255 .target_id_table 1256 + .get_val_from_id(&self.db, fwd_target_id.0) 1257 + .ok() 1258 + .flatten() 1259 else { 1260 continue; 1261 }; 1262 1263 + // link to be added 1264 + let record_id = RecordId { 1265 + did, 1266 collection: collection.0.clone(), 1267 + rkey: rkey.0.clone(), 1268 }; 1269 + items.push((linker_idx, link_idx, record_id, fwd_target_key.0 .0)); 1270 } 1271 1272 + // page full - eject 1273 + if items.len() > limit as usize { 1274 + break; 1275 } 1276 } 1277 1278 + // We collect up to limit + 1 fully-resolved items. If we got more than 1279 + // limit, there are more results beyond this page. We truncate to limit 1280 + // items (the actual page) and build a composite cursor from the last 1281 + // item on the page โ€” a base64-encoded pair of (backlink_vec_idx, 1282 + // forward_link_idx). On the next request, skip_while advances past 1283 + // this position: backlinks before backlink_vec_idx are skipped entirely, 1284 + // and at backlink_vec_idx itself, forward links at or before 1285 + // forward_link_idx are skipped. This correctly resumes mid-record when 1286 + // a single backlinker has multiple forward links at path_to_other. 1287 let next = if items.len() as u64 > limit { 1288 items.truncate(limit as usize); 1289 items 1290 .last() 1291 + .map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64))) 1292 } else { 1293 None 1294 }; 1295 1296 + let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect(); 1297 + 1298 Ok(PagedOrderedCollection { items, next }) 1299 } 1300 ··· 1636 } 1637 1638 // target ids 1639 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1640 struct TargetId(u64); // key 1641 1642 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
constellation/templates/get-many-to-many.html.j2

This file has not been changed.

constellation/templates/hello.html.j2

This file has not been changed.

constellation/templates/try-it-macros.html.j2

This file has not been changed.

lexicons/blue.microcosm/links/getManyToMany.json

This file has not been changed.

History

8 rounds 13 comments
sign up or login to add to the discussion
11 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
Address feedback from fig
expand 0 comments
pull request successfully merged
10 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
expand 0 comments
8 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
expand 1 comment

Okay. I wrapped my head around the composite cursor you proposed and am working on refactoring both storage implementations towards that. I think I might re-submit another round tomorrow :)

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 3 comments

Found a bug in how we handle some of the pagination logic in cases where the number of items and the user selected limit are identical to very close too each other (already working on a fix)

thanks for the rebase! i tried to write things in the tiny text box but ended up needing to make a diagram: https://bsky.app/profile/did:plc:hdhoaan3xa3jiuq4fg4mefid/post/3mejuq44twc2t

key thing is that where the focus of getManyToManyCounts was the other subject (aggregation was against that, so grouping happened with it),

i think the focus of disagreggated many-to-many is on the linking records themselves

to me that takes me toward a few things

  • i don't think we should need to group the links by target (does the current code build up the full aggregation on every requested page? we should be able to avoid doing that)

  • i think the order of the response should actually be based on the linking record itself (since we have a row in the output), not the other subject, unlike with the aggregated/count version. this means you get eg. list items in order they were added instead of the order of the listed things being created. (i haven't fully wrapped my head around the grouping/ordering code here yet)

  • since any linking record can have a path_to_other with multiple links, i think a composite cursor could work here:

a 2-tuple of (backlink_vec_idx, forward_vec_idx).

for normal cases where the many-to-many record points to exactly one other subject, it would just be advancing backlink_vec_idx like normal backlinks

for cases where the many-to-many record actually has multiple foward links at the given path_to_other, the second part of the tuple would track progress through that list

i think that allows us to hold the necessary state between calls without needing to reconstruct too much in memory each time?

(also it's hard to write in this tiny tiny textbox and have a sense of whether what i'm saying makes sense)

Interesting approach! I have to think through this for a bit to be honest. Maybe I tried to follow the existing counts implementation too closely

Having said that, I added a new composite cursor to fix a couple of bugs that would arrive when hitting a couple of possible edge-cases in the pagination logic. This affects both the new get-many-to-many endpoint as well as the existing get-many-to-many-counts endpoint. As the changes are split over two distinct commits things should be straightforward to review.

Your assumption is still correct in the sense that we do indeed have to build up the aggregation again for every request. I have to double-check the get-backlinks endpoint to get a better sense of where you're going at.

Finally, I agree that the interface here doesn't necessarily make the whole thing easier to understand, unfortunately

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 2 comments

i think something got funky with a rebase or the way tangled is showing it -- some of my changes on main seem to be getting shown (reverted) in the diff.

i don't mind sorting it locally but will mostly get to it tomorrow, in case you want to see what's up before i do.

That's one on me, sorry! Rebased again on main and now everything seems fine

5 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
expand 5 comments

Rebased on main. As we discussed in the PR for the order query parameter, I didn't include this here as it's not a particular sensible fit.

i need to get into the code properly but my initial thought is that this endpoint should return a flat list of results, like

{
  "items": [
    {
      "link": { did, collection, rkey }, // the m2m link record
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "b.com"
    },
  ]
}

this will require a bit of tricks in the cursor to track pages across half-finished groups of links

(also this isn't an immediate change request, just getting it down for discussion!)

(and separately, i've also been wondering about moving more toward returning at-uris instead of broken-out did/collection/rkey objects. which isn't specifically about this PR, but if that happens then switching before releasing it is nice)

Hmm, I wonder how this would then work with the path_to_other parameter. Currently we have this nested grouping in order to show and disambiguate different relationships between different links.

For instance take the following query and it's results:

http://localhost:6789/xrpc/blue.microcosm.links.getManyToMany?subject=at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y&source=app.bsky.feed.post:reply.root.uri&pathToOther=reply.parent.uri&limit=16

This query asks: "Show me all posts in this thread, grouped by who they're responding to."

A flat list would just give us all the posts in the thread. The nested structure answers a richer question: who's talking to whom? Some posts are direct responses to the original article. Others are replies to other commenters, forming side conversations that branch off from the main thread.

The pathToOther grouping preserves that distinction. Without it, we'd lose the information about who's talking to whom.

{
  "linking_records": [
    {
      "subject": "at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y",
      "records": [
        {
          "did": "did:plc:lznqwrsbnyf6fdxohikqj6h3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd27pja7s2y"
        },
        {
          "did": "did:plc:uffx77au6hoauuuumkbuvqdr",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2tt5efc2a"
        },
        {
          "did": "did:plc:y7qyxzo7dns5m54dlq3youu3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2wtjxgc2d"
        },
        {
          "did": "did:plc:yaakslxyqydb76ybgkhrr4jk",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd35hyads22"
        },
        {
          "did": "did:plc:fia7w2kbnrdjwp6zvxywt7qv",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd37j3ldk2m"
        },
        {
          "did": "did:plc:xtecipifublblkomwau5x2ok",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd3dbtbz22n"
        },
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd42hpw7c2e"
        },
        {
          "did": "did:plc:fgquypfh32pewivn3bcmzseb",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd46jteoc2m"
        }
      ]
    },
    {
      "subject": "at://did:plc:3rhjxwwui6wwfokh4at3q2dl/app.bsky.feed.post/3mdczc7c4gk2i",
      "records": [
        {
          "did": "did:plc:3rhjxwwui6wwfokh4at3q2dl",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdczt7cwhk2i"
        }
      ]
    },
    {
      "subject": "at://did:plc:6buibzhkqr4vkqu75ezr2uv2/app.bsky.feed.post/3mdby25hbbk2v",
      "records": [
        {
          "did": "did:plc:fgeie2bmzlmx37iglj3xbzuj",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd26ulf4k2j"
        }
      ]
    },
    {
      "subject": "at://did:plc:lwgvv5oqh5stzb6dxa5d7z3n/app.bsky.feed.post/3mdcxqbkkfk2i",
      "records": [
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd45u56sk2e"
        }
      ]
    }
  ],
  "cursor": null
}

Correct me if I'm somehow wrong here!

Regarding returning at-uris: I think this might be a nice idea as users might be able to split these up when they feel the need to any way and it feels conceptually more complete. But, it might be easier to do this in a different PR over all existing XRPC endpoints. This would allow us to add this new endpoint already while working on the updated return values in the meantime. I'd like to avoid doing too much distinct stuff in one PR. :)

at-uris: totally fair, holding off for a follow-up.

flat list: i might have messed it up in my example but i think what i meant is actually equivalent to the grouped version: flattened, with the subject ("group by") included with every item in the flatted list.

clients can collect the flat list and group on subject to get back to your structured example, if they want.

my motivations are probably part sql-brain, part flat-list-enjoyer, and part cursor-related. i'm trying to disregard the first two, and i'm curious about your thoughts about how to handle the cursor:

with a flat list it's easy (from the client perspective at least) -- just keep chasing the cursor for as much of the data as you need. (cursors can happen in the middle of a subject)

with nested results grouped by subject it's less obvious to me. correct me if i'm wrong (need another block of time to actually get into the code) but i think the grouped item sub-list is unbounded size in the proposed code here? so cursors are only limiting the number of groups.

if we go with the grouped nested response, i think maybe we'd want something like:

  • a cursor at the end for fetching more groups, and
  • a cursor for each group-list that lets you fetch more items from just that group-list.

(i think this kind of nested paging is pretty neat!)

Interesting. Now that you mention it I feel I kinda get where you're going at!

I think the whole cursor thing, albeit possible for sure, is kinda creating more unnecessary complexity so I'll probably go with your suggestion.

It seems easier to create custom groupings on their own for most users (having more freedom is always great) and I think from an ergonomic perspective the two cursors might create more friction.

4 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
expand 1 comment

Added the missing lexicon entry for the new endpoint and changed the return type as well. Commented this wrongly at the other PR that I was working on. Sorry about that lol.

3 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
expand 1 comment

I think the existing get_many_to_many_counts handler and the new get_many_to_many handler are similar enough that we might extract the bulk of their logic in a shared piece of logic. Maybe a method that takes the existing identical function parameters and a new additional callback parameter (that handles what we do with found matches, i.e. calculate counts or join URIs) might be one way to go for it.

I am not too sure yet though if this is indeed the right thing to do as the new shared implementation might be a bit complicated. But given the strong similarities between the two I think it's worth at least considering.