Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

paging refactor + cleanups

slice the links list via Iterator::skip and ::take. i changed some of the bounds setup too. not sure if it's clearer or if i just have a better understanding of it at this specific moment.

throw an `empty` static function on some paging things for less verbose early return and tests.

make get_links_paged test a bit clearer.

+137 -194
+47 -72
constellation/src/storage/mem_store.rs
··· 147 147 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 148 148 let data = self.0.lock().unwrap(); 149 149 let Some(paths) = data.targets.get(&Target::new(target)) else { 150 - return Ok(PagedOrderedCollection::default()); 150 + return Ok(PagedOrderedCollection::empty()); 151 151 }; 152 152 let Some(linkers) = paths.get(&Source::new(collection, path)) else { 153 - return Ok(PagedOrderedCollection::default()); 153 + return Ok(PagedOrderedCollection::empty()); 154 154 }; 155 155 156 156 let path_to_other = RecordPath::new(path_to_other); ··· 241 241 path: &str, 242 242 order: Order, 243 243 limit: u64, 244 - until: Option<u64>, 244 + until: Option<u64>, // paged iteration endpoint 245 245 filter_dids: &HashSet<Did>, 246 246 ) -> Result<PagedAppendingCollection<RecordId>> { 247 247 let data = self.0.lock().unwrap(); 248 248 let Some(paths) = data.targets.get(&Target::new(target)) else { 249 - return Ok(PagedAppendingCollection { 250 - version: (0, 0), 251 - items: Vec::new(), 252 - next: None, 253 - total: 0, 254 - }); 249 + return Ok(PagedAppendingCollection::empty()); 255 250 }; 256 251 let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 257 - return Ok(PagedAppendingCollection { 258 - version: (0, 0), 259 - items: Vec::new(), 260 - next: None, 261 - total: 0, 262 - }); 252 + return Ok(PagedAppendingCollection::empty()); 263 253 }; 264 254 265 255 let did_rkeys: Vec<_> = if !filter_dids.is_empty() { ··· 276 266 did_rkeys.to_vec() 277 267 }; 278 268 279 - let total = did_rkeys.len(); 280 - 281 - let begin: usize; 282 - let end: usize; 283 - let next: Option<u64>; 269 + let total = did_rkeys.len() as u64; 284 270 285 - match order { 286 - // OldestToNewest: start from the beginning, paginate forward 271 + // backlinks are stored oldest-to-newest (ascending index with increasing age) 272 + let (start, take, next_until) = match order { 287 273 Order::OldestToNewest => { 288 - begin = until.map(|u| (u) as usize).unwrap_or(0); 289 - end = std::cmp::min(begin + limit as usize, total); 290 - 291 - next = if end < total { 292 - Some(end as u64 + 1) 293 - } else { 294 - None 295 - }; 274 + let start = until.unwrap_or(0); 275 + let next = start + limit + 1; 276 + let next_until = if next < total { Some(next) } else { None }; 277 + (start, limit, next_until) 296 278 } 297 - // NewestToOldest: start from the end, paginate backward 298 279 Order::NewestToOldest => { 299 - end = until 300 - .map(|u| std::cmp::min(u as usize, total)) 301 - .unwrap_or(total); 302 - begin = end.saturating_sub(limit as usize); 303 - next = if begin == 0 { None } else { Some(begin as u64) }; 280 + let until = until.unwrap_or(total); 281 + match until.checked_sub(limit) { 282 + Some(s) if s > 0 => (s, limit, Some(s)), 283 + Some(s) => (s, limit, None), 284 + None => (0, until, None), 285 + } 304 286 } 305 - } 287 + }; 306 288 307 - let alive = did_rkeys.iter().flatten().count(); 289 + let alive = did_rkeys.iter().flatten().count() as u64; 308 290 let gone = total - alive; 309 291 310 - let mut items: Vec<_> = did_rkeys[begin..end] 292 + let items = did_rkeys 311 293 .iter() 312 - .rev() 294 + .skip(start as usize) 295 + .take(take as usize) 313 296 .flatten() 314 297 .filter(|(did, _)| *data.dids.get(did).expect("did must be in dids")) 315 298 .map(|(did, rkey)| RecordId { 316 299 did: did.clone(), 317 300 rkey: rkey.0.clone(), 318 301 collection: collection.to_string(), 319 - }) 320 - .collect(); 302 + }); 321 303 322 - // For OldestToNewest, reverse the items to maintain forward chronological order 323 - if order == Order::OldestToNewest { 324 - items.reverse(); 325 - } 304 + let items: Vec<_> = match order { 305 + Order::OldestToNewest => items.collect(), // links are stored oldest first 306 + Order::NewestToOldest => items.rev().collect(), 307 + }; 326 308 327 309 Ok(PagedAppendingCollection { 328 - version: (total as u64, gone as u64), 310 + version: (total, gone), 329 311 items, 330 - next, 331 - total: alive as u64, 312 + next: next_until, 313 + total: alive, 332 314 }) 333 315 } 334 316 ··· 342 324 ) -> Result<PagedAppendingCollection<Did>> { 343 325 let data = self.0.lock().unwrap(); 344 326 let Some(paths) = data.targets.get(&Target::new(target)) else { 345 - return Ok(PagedAppendingCollection { 346 - version: (0, 0), 347 - items: Vec::new(), 348 - next: None, 349 - total: 0, 350 - }); 327 + return Ok(PagedAppendingCollection::empty()); 351 328 }; 352 329 let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 353 - return Ok(PagedAppendingCollection { 354 - version: (0, 0), 355 - items: Vec::new(), 356 - next: None, 357 - total: 0, 358 - }); 330 + return Ok(PagedAppendingCollection::empty()); 359 331 }; 360 332 361 333 let dids: Vec<Option<Did>> = { ··· 375 347 .collect() 376 348 }; 377 349 378 - let total = dids.len(); 379 - let end = until 380 - .map(|u| std::cmp::min(u as usize, total)) 381 - .unwrap_or(total); 382 - let begin = end.saturating_sub(limit as usize); 383 - let next = if begin == 0 { None } else { Some(begin as u64) }; 350 + let total = dids.len() as u64; 351 + let until = until.unwrap_or(total); 352 + let (start, take, next_until) = match until.checked_sub(limit) { 353 + Some(s) if s > 0 => (s, limit, Some(s)), 354 + Some(s) => (s, limit, None), 355 + None => (0, until, None), 356 + }; 384 357 385 - let alive = dids.iter().flatten().count(); 358 + let alive = dids.iter().flatten().count() as u64; 386 359 let gone = total - alive; 387 360 388 - let items: Vec<Did> = dids[begin..end] 361 + let items: Vec<Did> = dids 389 362 .iter() 363 + .skip(start as usize) 364 + .take(take as usize) 390 365 .rev() 391 366 .flatten() 392 367 .filter(|did| *data.dids.get(did).expect("did must be in dids")) ··· 394 369 .collect(); 395 370 396 371 Ok(PagedAppendingCollection { 397 - version: (total as u64, gone as u64), 372 + version: (total, gone), 398 373 items, 399 - next, 400 - total: alive as u64, 374 + next: next_until, 375 + total: alive, 401 376 }) 402 377 } 403 378
+59 -80
constellation/src/storage/mod.rs
··· 20 20 OldestToNewest, 21 21 } 22 22 23 - #[derive(Debug, PartialEq)] 23 + #[derive(Debug, Default, PartialEq)] 24 24 pub struct PagedAppendingCollection<T> { 25 25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 26 26 pub items: Vec<T>, ··· 28 28 pub total: u64, 29 29 } 30 30 31 + impl<T> PagedAppendingCollection<T> { 32 + pub(crate) fn empty() -> Self { 33 + Self { 34 + version: (0, 0), 35 + items: Vec::new(), 36 + next: None, 37 + total: 0, 38 + } 39 + } 40 + } 41 + 31 42 /// A paged collection whose keys are sorted instead of indexed 32 43 /// 33 44 /// this has weaker guarantees than PagedAppendingCollection: it might 34 45 /// return a totally consistent snapshot. but it should avoid duplicates 35 46 /// and each page should at least be internally consistent. 36 - #[derive(Debug, PartialEq, Default)] 47 + #[derive(Debug, PartialEq)] 37 48 pub struct PagedOrderedCollection<T, K: Ord> { 38 49 pub items: Vec<T>, 39 50 pub next: Option<K>, 51 + } 52 + 53 + impl<T, K: Ord> PagedOrderedCollection<T, K> { 54 + pub(crate) fn empty() -> Self { 55 + Self { 56 + items: Vec::new(), 57 + next: None, 58 + } 59 + } 40 60 } 41 61 42 62 #[derive(Debug, Deserialize, Serialize, PartialEq)] ··· 91 111 92 112 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 93 113 114 + #[allow(clippy::too_many_arguments)] 94 115 fn get_links( 95 116 &self, 96 117 target: &str, ··· 195 216 None, 196 217 &HashSet::default() 197 218 )?, 198 - PagedAppendingCollection { 199 - version: (0, 0), 200 - items: vec![], 201 - next: None, 202 - total: 0, 203 - } 219 + PagedAppendingCollection::empty() 204 220 ); 205 221 assert_eq!( 206 222 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 207 - PagedAppendingCollection { 208 - version: (0, 0), 209 - items: vec![], 210 - next: None, 211 - total: 0, 212 - } 223 + PagedAppendingCollection::empty() 213 224 ); 214 225 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 215 226 assert_eq!( ··· 739 750 0, 740 751 )?; 741 752 } 742 - let links = storage.get_links( 743 - "a.com", 744 - "app.t.c", 745 - ".abc.uri", 746 - Order::NewestToOldest, 747 - 2, 748 - None, 749 - &HashSet::default(), 750 - )?; 751 - let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?; 753 + 754 + let sub = "a.com"; 755 + let col = "app.t.c"; 756 + let path = ".abc.uri"; 757 + let order = Order::NewestToOldest; 758 + let dids_filter = HashSet::new(); 759 + 760 + // --- --- round one! --- --- // 761 + // all backlinks 762 + let links = storage.get_links(sub, col, path, order, 2, None, &dids_filter)?; 752 763 assert_eq!( 753 764 links, 754 765 PagedAppendingCollection { ··· 756 767 items: vec![ 757 768 RecordId { 758 769 did: "did:plc:asdf-5".into(), 759 - collection: "app.t.c".into(), 770 + collection: col.into(), 760 771 rkey: "asdf".into(), 761 772 }, 762 773 RecordId { 763 774 did: "did:plc:asdf-4".into(), 764 - collection: "app.t.c".into(), 775 + collection: col.into(), 765 776 rkey: "asdf".into(), 766 777 }, 767 778 ], ··· 769 780 total: 5, 770 781 } 771 782 ); 783 + // distinct dids 784 + let dids = storage.get_distinct_dids(sub, col, path, 2, None)?; 772 785 assert_eq!( 773 786 dids, 774 787 PagedAppendingCollection { ··· 778 791 total: 5, 779 792 } 780 793 ); 781 - let links = storage.get_links( 782 - "a.com", 783 - "app.t.c", 784 - ".abc.uri", 785 - Order::NewestToOldest, 786 - 2, 787 - links.next, 788 - &HashSet::default(), 789 - )?; 790 - let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 794 + 795 + // --- --- round two! --- --- // 796 + // all backlinks 797 + let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 791 798 assert_eq!( 792 799 links, 793 800 PagedAppendingCollection { ··· 795 802 items: vec![ 796 803 RecordId { 797 804 did: "did:plc:asdf-3".into(), 798 - collection: "app.t.c".into(), 805 + collection: col.into(), 799 806 rkey: "asdf".into(), 800 807 }, 801 808 RecordId { 802 809 did: "did:plc:asdf-2".into(), 803 - collection: "app.t.c".into(), 810 + collection: col.into(), 804 811 rkey: "asdf".into(), 805 812 }, 806 813 ], ··· 808 815 total: 5, 809 816 } 810 817 ); 818 + // distinct dids 819 + let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 811 820 assert_eq!( 812 821 dids, 813 822 PagedAppendingCollection { ··· 817 826 total: 5, 818 827 } 819 828 ); 820 - let links = storage.get_links( 821 - "a.com", 822 - "app.t.c", 823 - ".abc.uri", 824 - Order::NewestToOldest, 825 - 2, 826 - links.next, 827 - &HashSet::default(), 828 - )?; 829 - let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 829 + 830 + // --- --- round three! --- --- // 831 + // all backlinks 832 + let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 830 833 assert_eq!( 831 834 links, 832 835 PagedAppendingCollection { 833 836 version: (5, 0), 834 837 items: vec![RecordId { 835 838 did: "did:plc:asdf-1".into(), 836 - collection: "app.t.c".into(), 839 + collection: col.into(), 837 840 rkey: "asdf".into(), 838 841 },], 839 842 next: None, 840 843 total: 5, 841 844 } 842 845 ); 846 + // distinct dids 847 + let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 843 848 assert_eq!( 844 849 dids, 845 850 PagedAppendingCollection { ··· 849 854 total: 5, 850 855 } 851 856 ); 857 + 852 858 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 853 859 }); 854 860 ··· 943 949 None, 944 950 &HashSet::from([Did("did:plc:linker".to_string())]), 945 951 )?; 946 - assert_eq!( 947 - links, 948 - PagedAppendingCollection { 949 - version: (0, 0), 950 - items: vec![], 951 - next: None, 952 - total: 0, 953 - } 954 - ); 952 + assert_eq!(links, PagedAppendingCollection::empty()); 955 953 956 954 storage.push( 957 955 &ActionableEvent::CreateLinks { ··· 1000 998 None, 1001 999 &HashSet::from([Did("did:plc:someone-else".to_string())]), 1002 1000 )?; 1003 - assert_eq!( 1004 - links, 1005 - PagedAppendingCollection { 1006 - version: (0, 0), 1007 - items: vec![], 1008 - next: None, 1009 - total: 0, 1010 - } 1011 - ); 1001 + assert_eq!(links, PagedAppendingCollection::empty()); 1012 1002 1013 1003 storage.push( 1014 1004 &ActionableEvent::CreateLinks { ··· 1111 1101 None, 1112 1102 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), 1113 1103 )?; 1114 - assert_eq!( 1115 - links, 1116 - PagedAppendingCollection { 1117 - version: (0, 0), 1118 - items: vec![], 1119 - next: None, 1120 - total: 0, 1121 - } 1122 - ); 1104 + assert_eq!(links, PagedAppendingCollection::empty()); 1123 1105 }); 1124 1106 1125 1107 test_each_storage!(get_links_exact_multiple, |storage| { ··· 1512 1494 &HashSet::new(), 1513 1495 &HashSet::new(), 1514 1496 )?, 1515 - PagedOrderedCollection { 1516 - items: vec![], 1517 - next: None, 1518 - } 1497 + PagedOrderedCollection::empty() 1519 1498 ); 1520 1499 }); 1521 1500
+31 -42
constellation/src/storage/rocks_store.rs
··· 960 960 961 961 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 962 962 eprintln!("nothin doin for this target, {target_key:?}"); 963 - return Ok(Default::default()); 963 + return Ok(PagedOrderedCollection::empty()); 964 964 }; 965 965 966 966 let filter_did_ids: HashMap<DidId, bool> = filter_dids ··· 1139 1139 ); 1140 1140 1141 1141 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1142 - return Ok(PagedAppendingCollection { 1143 - version: (0, 0), 1144 - items: Vec::new(), 1145 - next: None, 1146 - total: 0, 1147 - }); 1142 + return Ok(PagedAppendingCollection::empty()); 1148 1143 }; 1149 1144 1150 1145 let mut linkers = self.get_target_linkers(&target_id)?; ··· 1169 1164 let (alive, gone) = linkers.count(); 1170 1165 let total = alive + gone; 1171 1166 1172 - let end: usize; 1173 - let begin: usize; 1174 - let next: Option<u64>; 1175 - 1176 - match order { 1167 + let (start, take, next_until) = match order { 1177 1168 // OldestToNewest: start from the beginning, paginate forward 1178 1169 Order::OldestToNewest => { 1179 - begin = until.map(|u| (u - 1) as usize).unwrap_or(0); 1180 - end = std::cmp::min(begin + limit as usize, total as usize); 1181 - 1182 - next = if end < total as usize { 1183 - Some(end as u64 + 1) 1184 - } else { 1185 - None 1186 - } 1170 + let start = until.unwrap_or(0); 1171 + let next = start + limit + 1; 1172 + let next_until = if next < total { Some(next) } else { None }; 1173 + (start, limit, next_until) 1187 1174 } 1188 1175 // NewestToOldest: start from the end, paginate backward 1189 1176 Order::NewestToOldest => { 1190 - end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1191 - begin = end.saturating_sub(limit as usize); 1192 - next = if begin == 0 { None } else { Some(begin as u64) }; 1177 + let until = until.unwrap_or(total); 1178 + match until.checked_sub(limit) { 1179 + Some(s) if s > 0 => (s, limit, Some(s)), 1180 + Some(s) => (s, limit, None), 1181 + None => (0, until, None), 1182 + } 1193 1183 } 1194 - } 1184 + }; 1195 1185 1196 - let mut did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1197 - 1198 - // For OldestToNewest, reverse the items to maintain forward chronological order 1199 - if order == Order::OldestToNewest { 1200 - did_id_rkeys.reverse(); 1201 - } 1186 + let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize); 1187 + let did_id_rkeys: Vec<_> = match order { 1188 + Order::OldestToNewest => did_id_rkeys.collect(), 1189 + Order::NewestToOldest => did_id_rkeys.rev().collect(), 1190 + }; 1202 1191 1203 1192 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1204 1193 // TODO: use get-many (or multi-get or whatever it's called) ··· 1228 1217 Ok(PagedAppendingCollection { 1229 1218 version: (total, gone), 1230 1219 items, 1231 - next, 1220 + next: next_until, 1232 1221 total: alive, 1233 1222 }) 1234 1223 } ··· 1248 1237 ); 1249 1238 1250 1239 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1251 - return Ok(PagedAppendingCollection { 1252 - version: (0, 0), 1253 - items: Vec::new(), 1254 - next: None, 1255 - total: 0, 1256 - }); 1240 + return Ok(PagedAppendingCollection::empty()); 1257 1241 }; 1258 1242 1259 1243 let linkers = self.get_distinct_target_linkers(&target_id)?; 1260 1244 1261 1245 let (alive, gone) = linkers.count(); 1262 1246 let total = alive + gone; 1263 - let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1264 - let begin = end.saturating_sub(limit as usize); 1265 - let next = if begin == 0 { None } else { Some(begin as u64) }; 1247 + 1248 + let until = until.unwrap_or(total); 1249 + let (start, take, next_until) = match until.checked_sub(limit) { 1250 + Some(s) if s > 0 => (s, limit, Some(s)), 1251 + Some(s) => (s, limit, None), 1252 + None => (0, until, None), 1253 + }; 1266 1254 1267 - let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1255 + let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize); 1256 + let did_id_rkeys: Vec<_> = did_id_rkeys.rev().collect(); 1268 1257 1269 1258 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1270 1259 // TODO: use get-many (or multi-get or whatever it's called) ··· 1290 1279 Ok(PagedAppendingCollection { 1291 1280 version: (total, gone), 1292 1281 items, 1293 - next, 1282 + next: next_until, 1294 1283 total: alive, 1295 1284 }) 1296 1285 }