Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Fix get_many_to_many pagination with composite cursor

The previous cursor was a plain subject string, which broke pagination
when multiple records shared the same secondary subject. Depending on
the comparison used in skip_while, this caused either duplicates (items
returned on both pages) or skipped items (items returned on neither
page) - these edge cases are now covered by the added collection of new
test.

The root cause is that a subject-only cursor cannot uniquely identify
a position in the result set when multiple (RecordId, subject) pairs
share the same subject value. Five issues were fixed:

1. Composite sort: items are now sorted by (subject, RecordId) instead
of subject alone, establishing a deterministic total order.

2. Composite cursor: the cursor is now a base64-encoded "did|rkey|subject"
string. Shared encode/decode helpers in storage/mod.rs ensure both
MemStorage and RocksDB produce and consume the same format. DID and
rkey are placed first in the encoding so that splitn(3, '|') correctly
handles subjects that may contain the delimiter.

3. Correct skip direction: skip_while now uses <= (skip items at or
before the cursor), fixing the comparison that previously caused
items to be re-included or over-skipped.

4. Post-filter cursor computation: the next cursor is now derived from
the last item after skip+take, not from the unfiltered list.

5. Fetch N+1: take limit+1 items, only emit a cursor when more than
limit items exist, then truncate. This avoids emitting a false cursor
that leads to an empty final page when items exactly equals limit.

authored by seoul.systems and committed by tangled.org 3d4f4b50 b3146a05

+207 -24
+1
Cargo.lock
··· 1058 1058 "axum", 1059 1059 "axum-extra", 1060 1060 "axum-metrics", 1061 + "base64 0.22.1", 1061 1062 "bincode 1.3.3", 1062 1063 "clap", 1063 1064 "ctrlc",
+1
constellation/Cargo.toml
··· 10 10 axum = "0.8.1" 11 11 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 12 12 axum-metrics = "0.2" 13 + base64 = "0.22.1" 13 14 bincode = "1.3.3" 14 15 clap = { workspace = true } 15 16 ctrlc = "3.4.5"
+2 -2
constellation/src/lib.rs
··· 22 22 DeleteAccount(Did), 23 23 } 24 24 25 - #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] 25 + #[derive(Debug, Hash, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] 26 26 pub struct Did(pub String); 27 27 28 28 impl<T: Into<String>> From<T> for Did { ··· 31 31 } 32 32 } 33 33 34 - #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] 34 + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)] 35 35 pub struct RecordId { 36 36 pub did: Did, 37 37 pub collection: String,
+38 -10
constellation/src/storage/mem_store.rs
··· 1 1 use super::{ 2 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 3 }; 4 + use crate::storage::{decode_m2m_cursor, encode_m2m_cursor}; 4 5 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 6 use anyhow::Result; 6 7 use links::CollectedLink; ··· 250 251 next: None, 251 252 }); 252 253 253 - // struct MemStorageData { 254 - // dids: HashMap<Did, bool>, 255 - // targets: HashMap<Target, HashMap<Source, Linkers>>, 256 - // links: HashMap<Did, HashMap<RepoId, Vec<(RecordPath, Target)>>>, 257 - // } 258 254 let data = self.0.lock().unwrap(); 259 255 260 256 let Some(sources) = data.targets.get(&Target::new(target)) else { ··· 315 311 }) 316 312 .collect::<Vec<_>>(); 317 313 318 - items.sort_by(|a: &(RecordId, String), b| a.1.cmp(&b.1)); 314 + // first try to sort by subject, then by did, collection and finally rkey 315 + items.sort_by(|a, b| { 316 + if a.1 == b.1 { 317 + a.0.cmp(&b.0) 318 + } else { 319 + a.1.cmp(&b.1) 320 + } 321 + }); 322 + 323 + // Parse cursor if provided (malformed cursor silently ignored) 324 + let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok()); 319 325 326 + // Apply cursor: skip everything up to and including the cursor position 320 327 items = items 321 328 .into_iter() 322 - .skip_while(|item| after.as_ref().map(|a| &item.1 <= a).unwrap_or(false)) 323 - .take(limit as usize) 329 + .skip_while(|item| { 330 + let Some((after_did, after_rkey, after_subject)) = &after_cursor else { 331 + return false; 332 + }; 333 + 334 + if &item.1 == after_subject { 335 + // Same subject — compare by RecordId to find our position 336 + let cursor_id = RecordId { 337 + did: Did(after_did.clone()), 338 + collection: collection.to_string(), 339 + rkey: after_rkey.clone(), 340 + }; 341 + item.0.cmp(&cursor_id).is_le() 342 + } else { 343 + // Different subject — compare subjects directly 344 + item.1.cmp(after_subject).is_le() 345 + } 346 + }) 347 + .take(limit as usize + 1) 324 348 .collect(); 325 349 326 - let next = if items.len() as u64 >= limit { 327 - items.last().map(|item| item.1.clone()) 350 + // Build the new cursor from last item, if needed 351 + let next = if items.len() as u64 > limit { 352 + items.truncate(limit as usize); 353 + items 354 + .last() 355 + .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1)) 328 356 } else { 329 357 None 330 358 };
+115
constellation/src/storage/mod.rs
··· 6 6 pub mod mem_store; 7 7 pub use mem_store::MemStorage; 8 8 9 + use anyhow::anyhow; 10 + 11 + use base64::engine::general_purpose as b64; 12 + use base64::Engine as _; 13 + 9 14 #[cfg(feature = "rocks")] 10 15 pub mod rocks_store; 11 16 #[cfg(feature = "rocks")] ··· 154 159 155 160 /// assume all stats are estimates, since exact counts are very challenging for LSMs 156 161 fn get_stats(&self) -> Result<StorageStats>; 162 + } 163 + 164 + // Shared helpers 165 + 166 + /// Decode a base64 cursor into its component parts (did, rkey, subject). 167 + /// The subject is placed last because it may contain '|' characters. 168 + pub(crate) fn decode_m2m_cursor(cursor: &str) -> Result<(String, String, String)> { 169 + let decoded = String::from_utf8(b64::URL_SAFE.decode(cursor)?)?; 170 + let mut parts = decoded.splitn(3, '|').map(String::from); 171 + 172 + // Using .next() to pull each part out of the iterator in order. 173 + // This avoids collecting into a Vec just to index and clone back out. 174 + let did = parts 175 + .next() 176 + .ok_or_else(|| anyhow!("missing did in cursor"))?; 177 + let rkey = parts 178 + .next() 179 + .ok_or_else(|| anyhow!("missing rkey in cursor"))?; 180 + let subject = parts 181 + .next() 182 + .ok_or_else(|| anyhow!("missing subject in cursor"))?; 183 + 184 + Ok((did, rkey, subject)) 185 + } 186 + 187 + /// Encode cursor components into a base64 string. 188 + pub(crate) fn encode_m2m_cursor(did: &str, rkey: &str, subject: &str) -> String { 189 + let raw = format!("{did}|{rkey}|{subject}"); 190 + b64::URL_SAFE.encode(&raw) 157 191 } 158 192 159 193 #[cfg(test)] ··· 1904 1938 assert_eq!(result.items.len(), 2); 1905 1939 assert!(result.items.iter().all(|(_, subject)| subject == "b.com")); 1906 1940 assert!(result.items.iter().all(|(r, _)| r.did.0 == "did:plc:asdf")); 1941 + 1942 + // Pagination edge cases: we have 4 flat items 1943 + 1944 + // Case 1: limit > items (limit=10, items=4) -> next should be None 1945 + let result = storage.get_many_to_many( 1946 + "a.com", 1947 + "app.t.c", 1948 + ".abc.uri", 1949 + ".def.uri", 1950 + 10, 1951 + None, 1952 + &HashSet::new(), 1953 + &HashSet::new(), 1954 + )?; 1955 + assert_eq!(result.items.len(), 4); 1956 + assert_eq!(result.next, None, "next should be None when items < limit"); 1957 + 1958 + // Case 2: limit == items (limit=4, items=4) -> next should be None 1959 + let result = storage.get_many_to_many( 1960 + "a.com", 1961 + "app.t.c", 1962 + ".abc.uri", 1963 + ".def.uri", 1964 + 4, 1965 + None, 1966 + &HashSet::new(), 1967 + &HashSet::new(), 1968 + )?; 1969 + assert_eq!(result.items.len(), 4); 1970 + assert_eq!( 1971 + result.next, None, 1972 + "next should be None when items == limit (no more pages)" 1973 + ); 1974 + 1975 + // Case 3: limit < items (limit=3, items=4) -> next should be Some 1976 + let result = storage.get_many_to_many( 1977 + "a.com", 1978 + "app.t.c", 1979 + ".abc.uri", 1980 + ".def.uri", 1981 + 3, 1982 + None, 1983 + &HashSet::new(), 1984 + &HashSet::new(), 1985 + )?; 1986 + assert_eq!(result.items.len(), 3); 1987 + assert!( 1988 + result.next.is_some(), 1989 + "next should be Some when items > limit" 1990 + ); 1991 + 1992 + // Verify second page returns remaining item with no cursor. 1993 + // This now works correctly because we use a composite cursor that includes 1994 + // (target, did, rkey), allowing pagination even when multiple records share 1995 + // the same target string. 1996 + let result2 = storage.get_many_to_many( 1997 + "a.com", 1998 + "app.t.c", 1999 + ".abc.uri", 2000 + ".def.uri", 2001 + 3, 2002 + result.next, 2003 + &HashSet::new(), 2004 + &HashSet::new(), 2005 + )?; 2006 + assert_eq!( 2007 + result2.items.len(), 2008 + 1, 2009 + "second page should have 1 remaining item" 2010 + ); 2011 + assert_eq!(result2.next, None, "next should be None on final page"); 2012 + 2013 + // Verify we got all 4 unique items across both pages (no duplicates, no gaps) 2014 + let mut all_rkeys: Vec<_> = result.items.iter().map(|(r, _)| r.rkey.clone()).collect(); 2015 + all_rkeys.extend(result2.items.iter().map(|(r, _)| r.rkey.clone())); 2016 + all_rkeys.sort(); 2017 + assert_eq!( 2018 + all_rkeys, 2019 + vec!["asdf", "asdf2", "fdsa", "fdsa2"], 2020 + "should have all 4 records across both pages" 2021 + ); 1907 2022 }); 1908 2023 }
+50 -12
constellation/src/storage/rocks_store.rs
··· 2 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 3 PagedOrderedCollection, StorageStats, 4 4 }; 5 + use crate::storage::{decode_m2m_cursor, encode_m2m_cursor}; 5 6 use crate::{CountsByCount, Did, RecordId}; 6 7 use anyhow::{bail, Result}; 7 8 use bincode::Options as BincodeOptions; ··· 1138 1139 1139 1140 let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1140 1141 1141 - let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?; 1142 + // Parse cursor if provided (malformed cursor silently ignored) 1143 + let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok()); 1142 1144 1143 1145 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1144 1146 eprintln!("Target not found for {target_key:?}"); ··· 1212 1214 continue; 1213 1215 }; 1214 1216 1215 - // pagination logic mirrors what is currently done in get_many_to_many_counts 1216 - if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) { 1217 - continue; 1218 - } 1219 1217 let page_is_full = grouped_links.len() as u64 >= limit; 1220 1218 if page_is_full { 1221 1219 let current_max = grouped_links.keys().next_back().unwrap(); ··· 1224 1222 } 1225 1223 } 1226 1224 1225 + // link to be added 1226 + let record_id = RecordId { 1227 + did, 1228 + collection: collection.0.clone(), 1229 + rkey: rkey.0, 1230 + }; 1231 + 1232 + // pagination: 1233 + if after_cursor.is_some() { 1234 + // extract composite-cursor parts 1235 + let Some((after_did, after_rkey, after_subject)) = &after_cursor else { 1236 + continue; 1237 + }; 1238 + 1239 + let Some(fwd_target_key) = self 1240 + .target_id_table 1241 + .get_val_from_id(&self.db, fwd_target.0)? 1242 + else { 1243 + eprintln!("failed to look up target from target_id {fwd_target:?}"); 1244 + continue; 1245 + }; 1246 + 1247 + // first try and compare by subject only 1248 + if &fwd_target_key.0 .0 != after_subject 1249 + && fwd_target_key.0 .0.cmp(after_subject).is_le() 1250 + { 1251 + continue; 1252 + } 1253 + 1254 + // then, if needed, we compare by record id 1255 + let cursor_id = RecordId { 1256 + did: Did(after_did.clone()), 1257 + collection: collection.0.clone(), 1258 + rkey: after_rkey.clone(), 1259 + }; 1260 + if record_id.cmp(&cursor_id).is_le() { 1261 + continue; 1262 + } 1263 + } 1264 + 1227 1265 // pagination, continued 1228 1266 let mut should_evict = false; 1229 1267 let entry = grouped_links.entry(fwd_target.clone()).or_insert_with(|| { 1230 1268 should_evict = page_is_full; 1231 1269 Vec::default() 1232 1270 }); 1233 - entry.push(RecordId { 1234 - did, 1235 - collection: collection.0.clone(), 1236 - rkey: rkey.0, 1237 - }); 1271 + entry.push(record_id); 1238 1272 1239 1273 if should_evict { 1240 1274 grouped_links.pop_last(); ··· 1258 1292 .for_each(|r| items.push((r.clone(), target_string.clone()))); 1259 1293 } 1260 1294 1261 - let next = if grouped_links.len() as u64 >= limit { 1262 - grouped_links.keys().next_back().map(|k| format!("{}", k.0)) 1295 + // Build new cursor from last the item, if needed 1296 + let next = if items.len() as u64 > limit { 1297 + items.truncate(limit as usize); 1298 + items 1299 + .last() 1300 + .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1)) 1263 1301 } else { 1264 1302 None 1265 1303 };