Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Fix rocks-store to match mem-store composite cursor

authored by seoul.systems and committed by tangled.org 7db6c6fc 627eaa85

+162 -106
+105 -23
constellation/src/storage/mod.rs
··· 1768 1768 }); 1769 1769 1770 1770 test_each_storage!(get_m2m_single, |storage| { 1771 + // One record linking to a.com (backward), with two forward links at 1772 + // the same path_to_other (.def.uri) pointing to b.com and c.com. 1773 + // Both forward targets must appear in the output. 1771 1774 storage.push( 1772 1775 &ActionableEvent::CreateLinks { 1773 1776 record_id: RecordId { ··· 1785 1788 path: ".def.uri".into(), 1786 1789 }, 1787 1790 CollectedLink { 1788 - target: Link::Uri("b.com".into()), 1789 - path: ".ghi.uri".into(), 1791 + target: Link::Uri("c.com".into()), 1792 + path: ".def.uri".into(), 1790 1793 }, 1791 1794 ], 1792 1795 }, 1793 1796 0, 1794 1797 )?; 1798 + let result = storage.get_many_to_many( 1799 + "a.com", 1800 + "app.t.c", 1801 + ".abc.uri", 1802 + ".def.uri", 1803 + 10, 1804 + None, 1805 + &HashSet::new(), 1806 + &HashSet::new(), 1807 + )?; 1795 1808 assert_eq!( 1796 - storage.get_many_to_many( 1797 - "a.com", 1798 - "app.t.c", 1799 - ".abc.uri", 1800 - ".def.uri", 1801 - 10, 1802 - None, 1803 - &HashSet::new(), 1804 - &HashSet::new(), 1805 - )?, 1806 - PagedOrderedCollection { 1807 - items: vec![( 1808 - RecordId { 1809 - did: "did:plc:asdf".into(), 1810 - collection: "app.t.c".into(), 1811 - rkey: "asdf".into(), 1812 - }, 1813 - "b.com".to_string(), 1814 - )], 1815 - next: None, 1816 - } 1809 + result.items.len(), 1810 + 2, 1811 + "both forward links at path_to_other should be emitted" 1817 1812 ); 1813 + let mut targets: Vec<_> = result.items.iter().map(|(_, t)| t.as_str()).collect(); 1814 + targets.sort(); 1815 + assert_eq!(targets, vec!["b.com", "c.com"]); 1816 + assert!(result 1817 + .items 1818 + .iter() 1819 + .all(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1820 + assert_eq!(result.next, None); 1818 1821 }); 1819 1822 1820 1823 test_each_storage!(get_m2m_filters, |storage| { ··· 2048 2051 all_rkeys, 2049 2052 vec!["asdf", "asdf2", "fdsa", "fdsa2"], 2050 2053 "should have all 4 records across both pages" 2054 + ); 2055 + }); 2056 + 2057 + // Pagination that splits across forward links within a single backlinker. 2058 + // The cursor should correctly resume mid-record on the next page. 2059 + test_each_storage!(get_m2m_paginate_within_forward_links, |storage| { 2060 + // Record with 1 backward link and 3 forward links at the same path 2061 + storage.push( 2062 + &ActionableEvent::CreateLinks { 2063 + record_id: RecordId { 2064 + did: "did:plc:lister".into(), 2065 + collection: "app.t.c".into(), 2066 + rkey: "list1".into(), 2067 + }, 2068 + links: vec![ 2069 + CollectedLink { 2070 + target: Link::Uri("a.com".into()), 2071 + path: ".subject.uri".into(), 2072 + }, 2073 + CollectedLink { 2074 + target: Link::Uri("x.com".into()), 2075 + path: ".items[].uri".into(), 2076 + }, 2077 + CollectedLink { 2078 + target: Link::Uri("y.com".into()), 2079 + path: ".items[].uri".into(), 2080 + }, 2081 + CollectedLink { 2082 + target: Link::Uri("z.com".into()), 2083 + path: ".items[].uri".into(), 2084 + }, 2085 + ], 2086 + }, 2087 + 0, 2088 + )?; 2089 + 2090 + // Page 1: limit=2, should get 2 of the 3 forward links 2091 + let page1 = storage.get_many_to_many( 2092 + "a.com", 2093 + "app.t.c", 2094 + ".subject.uri", 2095 + ".items[].uri", 2096 + 2, 2097 + None, 2098 + &HashSet::new(), 2099 + &HashSet::new(), 2100 + )?; 2101 + assert_eq!(page1.items.len(), 2, "first page should have 2 items"); 2102 + assert!( 2103 + page1.next.is_some(), 2104 + "should have a next cursor for remaining item" 2105 + ); 2106 + 2107 + // Page 2: should get the remaining 1 forward link 2108 + let page2 = storage.get_many_to_many( 2109 + "a.com", 2110 + "app.t.c", 2111 + ".subject.uri", 2112 + ".items[].uri", 2113 + 2, 2114 + page1.next, 2115 + &HashSet::new(), 2116 + &HashSet::new(), 2117 + )?; 2118 + assert_eq!(page2.items.len(), 1, "second page should have 1 item"); 2119 + assert_eq!(page2.next, None, "no more pages"); 2120 + 2121 + // Verify all 3 targets appear across pages with no duplicates 2122 + let mut all_targets: Vec<_> = page1 2123 + .items 2124 + .iter() 2125 + .chain(page2.items.iter()) 2126 + .map(|(_, t)| t.clone()) 2127 + .collect(); 2128 + all_targets.sort(); 2129 + assert_eq!( 2130 + all_targets, 2131 + vec!["x.com", "y.com", "z.com"], 2132 + "all forward targets should appear exactly once across pages" 2051 2133 ); 2052 2134 }); 2053 2135 }
+57 -83
constellation/src/storage/rocks_store.rs
··· 1199 1199 eprintln!("Target not found for {target_key:?}"); 1200 1200 return Ok(PagedOrderedCollection::empty()); 1201 1201 }; 1202 - 1203 1202 let linkers = self.get_target_linkers(&target_id)?; 1204 1203 1205 - eprintln!("linkers: {:#?}", linkers); 1206 - 1207 - let mut items: Vec<(usize, TargetId, RecordId)> = Vec::new(); 1204 + let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 1208 1205 1209 1206 // iterate backwards (who linked to the target?) 1210 1207 for (linker_idx, (did_id, rkey)) in ··· 1215 1212 }) 1216 1213 }) 1217 1214 { 1218 - // filter target did 1219 1215 if did_id.is_empty() 1220 - || (!filter_did_ids.is_empty() && filter_did_ids.get(&did_id).is_none()) 1216 + || (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id)) 1221 1217 { 1222 1218 continue; 1223 1219 } 1224 1220 1225 - eprintln!("did_did: {:#?}", did_id); 1226 - 1227 - let Some(targets) = self.get_record_link_targets(&RecordLinkKey( 1221 + let Some(links) = self.get_record_link_targets(&RecordLinkKey( 1228 1222 *did_id, 1229 1223 collection.clone(), 1230 1224 rkey.clone(), ··· 1232 1226 else { 1233 1227 continue; 1234 1228 }; 1235 - let Some(fwd_target_id) = 1236 - targets 1237 - .0 1238 - .into_iter() 1239 - .find_map(|RecordLinkTarget(rpath, target_id)| { 1240 - eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0); 1241 - if rpath.0 == path_to_other 1242 - && (filter_to_target_ids.is_empty() 1243 - || filter_to_target_ids.contains(&target_id)) 1244 - { 1245 - Some(target_id) 1246 - } else { 1247 - None 1248 - } 1249 - }) 1250 - else { 1251 - continue; 1252 - }; 1253 1229 1254 - if backward_idx.is_some_and(|bl_idx| { 1255 - linker_idx == bl_idx as usize 1256 - && forward_idx.is_some_and(|fwd_idx| fwd_target_id.0 <= fwd_idx) 1257 - }) { 1258 - continue; 1259 - } 1260 - 1261 - let page_is_full = items.len() as u64 >= limit; 1262 - 1263 - eprintln!( 1264 - "page_is_full: {page_is_full} for items.len(): {}", 1265 - items.len() 1266 - ); 1267 - 1268 - if page_is_full { 1269 - let current_max = items.iter().next_back().unwrap().1; 1270 - if fwd_target_id > current_max { 1230 + // iterate forward (which of these links point to the __other__ target?) 1231 + for (link_idx, RecordLinkTarget(_, fwd_target_id)) in links 1232 + .0 1233 + .into_iter() 1234 + .enumerate() 1235 + .filter(|(_, RecordLinkTarget(rpath, target_id))| { 1236 + eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0); 1237 + rpath.0 == path_to_other 1238 + && (filter_to_target_ids.is_empty() 1239 + || filter_to_target_ids.contains(target_id)) 1240 + }) 1241 + .skip_while(|(link_idx, _)| { 1242 + backward_idx.is_some_and(|bl_idx| { 1243 + linker_idx == bl_idx as usize 1244 + && forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize) 1245 + }) 1246 + }) 1247 + .take(limit as usize + 1 - items.len()) 1248 + { 1249 + // extract forward target did (target that links to the __other__ target) 1250 + let Some(did) = resolve_active_did(did_id) else { 1271 1251 continue; 1272 - } 1273 - } 1274 - 1275 - // extract forward target did (target that links to the __other__ target) 1276 - let Some(did) = resolve_active_did(did_id) else { 1277 - continue; 1278 - }; 1279 - 1280 - // link to be added 1281 - let record_id = RecordId { 1282 - did, 1283 - collection: collection.0.clone(), 1284 - rkey: rkey.0.clone(), 1285 - }; 1286 - items.push((linker_idx, fwd_target_id, record_id)); 1287 - } 1288 - 1289 - let mut backward_idx = None; 1290 - let mut forward_idx = None; 1291 - let mut items: Vec<_> = items 1292 - .iter() 1293 - .filter_map(|(b_idx, fwd_target_id, record)| { 1294 - let Some(target_key) = self 1252 + }; 1253 + // resolve to target string 1254 + let Some(fwd_target_key) = self 1295 1255 .target_id_table 1296 1256 .get_val_from_id(&self.db, fwd_target_id.0) 1297 - .ok()? 1257 + .ok() 1258 + .flatten() 1298 1259 else { 1299 - eprintln!("failed to look up target from target_id {fwd_target_id:?}"); 1300 - return None; 1260 + continue; 1301 1261 }; 1302 1262 1303 - backward_idx = Some(b_idx); 1304 - forward_idx = Some(fwd_target_id.0 - 1); 1263 + // link to be added 1264 + let record_id = RecordId { 1265 + did, 1266 + collection: collection.0.clone(), 1267 + rkey: rkey.0.clone(), 1268 + }; 1269 + items.push((linker_idx, link_idx, record_id, fwd_target_key.0 .0)); 1270 + } 1305 1271 1306 - Some((record.clone(), target_key.0 .0)) 1307 - }) 1308 - .collect(); 1272 + // page full - eject 1273 + if items.len() > limit as usize { 1274 + break; 1275 + } 1276 + } 1309 1277 1310 - // Build new cursor from last the item, if needed 1278 + // We collect up to limit + 1 fully-resolved items. If we got more than 1279 + // limit, there are more results beyond this page. We truncate to limit 1280 + // items (the actual page) and build a composite cursor from the last 1281 + // item on the page — a base64-encoded pair of (backlink_vec_idx, 1282 + // forward_link_idx). On the next request, skip_while advances past 1283 + // this position: backlinks before backlink_vec_idx are skipped entirely, 1284 + // and at backlink_vec_idx itself, forward links at or before 1285 + // forward_link_idx are skipped. This correctly resumes mid-record when 1286 + // a single backlinker has multiple forward links at path_to_other. 1311 1287 let next = if items.len() as u64 > limit { 1312 1288 items.truncate(limit as usize); 1313 - items.last().and_then(|_| { 1314 - Some(b64::URL_SAFE.encode(format!( 1315 - "{},{}", 1316 - backward_idx?.to_string(), 1317 - forward_idx?.to_string() 1318 - ))) 1319 - }) 1289 + items 1290 + .last() 1291 + .map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64))) 1320 1292 } else { 1321 1293 None 1322 1294 }; 1295 + 1296 + let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect(); 1323 1297 1324 1298 Ok(PagedOrderedCollection { items, next }) 1325 1299 }