Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Replace boolean reverse parameter with Order enum

Refactor link query APIs to use an explicit Order enum
(OldestToNewest, NewestToOldest) instead of a boolean
reverse flag for better clarity and maintainability.

+120 -82
+22 -4
constellation/src/server/mod.rs
··· 17 use tokio::task::spawn_blocking; 18 use tokio_util::sync::CancellationToken; 19 20 - use crate::storage::{LinkReader, StorageStats}; 21 use crate::{CountsByCount, Did, RecordId}; 22 23 mod acceptable; ··· 298 299 let path_to_other = format!(".{}", query.path_to_other); 300 301 let paged = store 302 .get_many_to_many_counts( 303 &query.subject, 304 collection, 305 &path, 306 &path_to_other, 307 - query.reverse, 308 limit, 309 cursor_key, 310 &filter_dids, ··· 461 }; 462 let path = format!(".{path}"); 463 464 let paged = store 465 .get_links( 466 &query.subject, 467 collection, 468 &path, 469 - query.reverse, 470 limit, 471 until, 472 &filter_dids, ··· 566 } 567 } 568 569 let paged = store 570 .get_links( 571 &query.target, 572 &query.collection, 573 &query.path, 574 - query.reverse, 575 limit, 576 until, 577 &filter_dids,
··· 17 use tokio::task::spawn_blocking; 18 use tokio_util::sync::CancellationToken; 19 20 + use crate::storage::{LinkReader, Order, StorageStats}; 21 use crate::{CountsByCount, Did, RecordId}; 22 23 mod acceptable; ··· 298 299 let path_to_other = format!(".{}", query.path_to_other); 300 301 + let order = if query.reverse { 302 + Order::OldestToNewest 303 + } else { 304 + Order::NewestToOldest 305 + }; 306 + 307 let paged = store 308 .get_many_to_many_counts( 309 &query.subject, 310 collection, 311 &path, 312 &path_to_other, 313 + order, 314 limit, 315 cursor_key, 316 &filter_dids, ··· 467 }; 468 let path = format!(".{path}"); 469 470 + let order = if query.reverse { 471 + Order::OldestToNewest 472 + } else { 473 + Order::NewestToOldest 474 + }; 475 + 476 let paged = store 477 .get_links( 478 &query.subject, 479 collection, 480 &path, 481 + order, 482 limit, 483 until, 484 &filter_dids, ··· 578 } 579 } 580 581 + let order = if query.reverse { 582 + Order::OldestToNewest 583 + } else { 584 + Order::NewestToOldest 585 + }; 586 + 587 let paged = store 588 .get_links( 589 &query.target, 590 &query.collection, 591 &query.path, 592 + order, 593 limit, 594 until, 595 &filter_dids,
+29 -23
constellation/src/storage/mem_store.rs
··· 1 use super::{ 2 - LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 use anyhow::Result; ··· 140 collection: &str, 141 path: &str, 142 path_to_other: &str, 143 - reverse: bool, 144 limit: u64, 145 after: Option<String>, 146 filter_dids: &HashSet<Did>, ··· 199 .iter() 200 .map(|(k, (n, u, _))| (k.0.clone(), *n, u.len() as u64)) 201 .collect(); 202 - // sort in reverse order to show entries from oldest to newest 203 - if reverse { 204 - items.sort_by(|a, b| b.cmp(a)); 205 - } else { 206 - items.sort(); 207 } 208 items = items 209 .into_iter() ··· 250 target: &str, 251 collection: &str, 252 path: &str, 253 - reverse: bool, 254 limit: u64, 255 until: Option<u64>, 256 filter_dids: &HashSet<Did>, ··· 293 let end: usize; 294 let next: Option<u64>; 295 296 - if reverse { 297 - begin = until.map(|u| (u) as usize).unwrap_or(0); 298 - end = std::cmp::min(begin + limit as usize, total); 299 300 - next = if end < total { 301 - Some(end as u64 + 1) 302 - } else { 303 - None 304 - }; 305 - } else { 306 - end = until 307 - .map(|u| std::cmp::min(u as usize, total)) 308 - .unwrap_or(total); 309 - begin = end.saturating_sub(limit as usize); 310 - next = if begin == 0 { None } else { Some(begin as u64) }; 311 } 312 313 let alive = did_rkeys.iter().flatten().count(); ··· 325 }) 326 .collect(); 327 328 - if reverse { 329 items.reverse(); 330 } 331
··· 1 use super::{ 2 + LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, 3 + StorageStats, 4 }; 5 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 6 use anyhow::Result; ··· 141 collection: &str, 142 path: &str, 143 path_to_other: &str, 144 + order: Order, 145 limit: u64, 146 after: Option<String>, 147 filter_dids: &HashSet<Did>, ··· 200 .iter() 201 .map(|(k, (n, u, _))| (k.0.clone(), *n, u.len() as u64)) 202 .collect(); 203 + // Sort based on order: OldestToNewest uses descending order, NewestToOldest uses ascending 204 + match order { 205 + Order::OldestToNewest => items.sort_by(|a, b| b.cmp(a)), 206 + Order::NewestToOldest => items.sort(), 207 } 208 items = items 209 .into_iter() ··· 250 target: &str, 251 collection: &str, 252 path: &str, 253 + order: Order, 254 limit: u64, 255 until: Option<u64>, 256 filter_dids: &HashSet<Did>, ··· 293 let end: usize; 294 let next: Option<u64>; 295 296 + match order { 297 + // OldestToNewest: start from the beginning, paginate forward 298 + Order::OldestToNewest => { 299 + begin = until.map(|u| (u) as usize).unwrap_or(0); 300 + end = std::cmp::min(begin + limit as usize, total); 301 302 + next = if end < total { 303 + Some(end as u64 + 1) 304 + } else { 305 + None 306 + }; 307 + } 308 + // NewestToOldest: start from the end, paginate backward 309 + Order::NewestToOldest => { 310 + end = until 311 + .map(|u| std::cmp::min(u as usize, total)) 312 + .unwrap_or(total); 313 + begin = end.saturating_sub(limit as usize); 314 + next = if begin == 0 { None } else { Some(begin as u64) }; 315 + } 316 } 317 318 let alive = did_rkeys.iter().flatten().count(); ··· 330 }) 331 .collect(); 332 333 + // For OldestToNewest, reverse the items to maintain forward chronological order 334 + if order == Order::OldestToNewest { 335 items.reverse(); 336 } 337
+43 -34
constellation/src/storage/mod.rs
··· 11 #[cfg(feature = "rocks")] 12 pub use rocks_store::RocksStorage; 13 14 #[derive(Debug, PartialEq)] 15 pub struct PagedAppendingCollection<T> { 16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" ··· 72 collection: &str, 73 path: &str, 74 path_to_other: &str, 75 - reverse: bool, 76 limit: u64, 77 after: Option<String>, 78 filter_dids: &HashSet<Did>, ··· 88 target: &str, 89 collection: &str, 90 path: &str, 91 - reverse: bool, 92 limit: u64, 93 until: Option<u64>, 94 filter_dids: &HashSet<Did>, ··· 182 "a.com", 183 "app.t.c", 184 ".abc.uri", 185 - false, 186 100, 187 None, 188 &HashSet::default() ··· 686 "a.com", 687 "app.t.c", 688 ".abc.uri", 689 - false, 690 100, 691 None, 692 &HashSet::default() ··· 735 "a.com", 736 "app.t.c", 737 ".abc.uri", 738 - false, 739 2, 740 None, 741 &HashSet::default(), ··· 774 "a.com", 775 "app.t.c", 776 ".abc.uri", 777 - false, 778 2, 779 links.next, 780 &HashSet::default(), ··· 813 "a.com", 814 "app.t.c", 815 ".abc.uri", 816 - false, 817 2, 818 links.next, 819 &HashSet::default(), ··· 862 )?; 863 } 864 865 - // Test reverse: true (oldest first) 866 let links = storage.get_links( 867 "a.com", 868 "app.t.c", 869 ".abc.uri", 870 - true, 871 2, 872 None, 873 &HashSet::default(), ··· 892 total: 5, 893 } 894 ); 895 - // Test reverse: false (newest first) 896 let links = storage.get_links( 897 "a.com", 898 "app.t.c", 899 ".abc.uri", 900 - false, 901 2, 902 None, 903 &HashSet::default(), ··· 930 "a.com", 931 "app.t.c", 932 ".abc.uri", 933 - false, 934 2, 935 None, 936 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 964 "a.com", 965 "app.t.c", 966 ".abc.uri", 967 - false, 968 2, 969 None, 970 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 987 "a.com", 988 "app.t.c", 989 ".abc.uri", 990 - false, 991 2, 992 None, 993 &HashSet::from([Did("did:plc:someone-else".to_string())]), ··· 1035 "a.com", 1036 "app.t.c", 1037 ".abc.uri", 1038 - false, 1039 2, 1040 None, 1041 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 1065 "a.com", 1066 "app.t.c", 1067 ".abc.uri", 1068 - false, 1069 2, 1070 None, 1071 &HashSet::from([ ··· 1098 "a.com", 1099 "app.t.c", 1100 ".abc.uri", 1101 - false, 1102 2, 1103 None, 1104 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), ··· 1135 "a.com", 1136 "app.t.c", 1137 ".abc.uri", 1138 - false, 1139 2, 1140 None, 1141 &HashSet::default(), ··· 1164 "a.com", 1165 "app.t.c", 1166 ".abc.uri", 1167 - false, 1168 2, 1169 links.next, 1170 &HashSet::default(), ··· 1213 "a.com", 1214 "app.t.c", 1215 ".abc.uri", 1216 - false, 1217 2, 1218 None, 1219 &HashSet::default(), ··· 1256 "a.com", 1257 "app.t.c", 1258 ".abc.uri", 1259 - false, 1260 2, 1261 links.next, 1262 &HashSet::default(), ··· 1305 "a.com", 1306 "app.t.c", 1307 ".abc.uri", 1308 - false, 1309 2, 1310 None, 1311 &HashSet::default(), ··· 1342 "a.com", 1343 "app.t.c", 1344 ".abc.uri", 1345 - false, 1346 2, 1347 links.next, 1348 &HashSet::default(), ··· 1384 "a.com", 1385 "app.t.c", 1386 ".abc.uri", 1387 - false, 1388 2, 1389 None, 1390 &HashSet::default(), ··· 1417 "a.com", 1418 "app.t.c", 1419 ".abc.uri", 1420 - false, 1421 2, 1422 links.next, 1423 &HashSet::default(), ··· 1499 "a.b.c", 1500 ".d.e", 1501 ".f.g", 1502 - false, 1503 10, 1504 None, 1505 &HashSet::new(), ··· 1543 "app.t.c", 1544 ".abc.uri", 1545 ".def.uri", 1546 - false, 1547 10, 1548 None, 1549 &HashSet::new(), ··· 1643 "app.t.c", 1644 ".abc.uri", 1645 ".def.uri", 1646 - false, 1647 10, 1648 None, 1649 &HashSet::new(), ··· 1660 "app.t.c", 1661 ".abc.uri", 1662 ".def.uri", 1663 - false, 1664 10, 1665 None, 1666 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), ··· 1677 "app.t.c", 1678 ".abc.uri", 1679 ".def.uri", 1680 - false, 1681 10, 1682 None, 1683 &HashSet::new(), ··· 1753 2, 1754 )?; 1755 1756 - // Test reverse: false (default order - by target ascending) 1757 let counts = storage.get_many_to_many_counts( 1758 "a.com", 1759 "app.t.c", 1760 ".abc.uri", 1761 ".def.uri", 1762 - false, 1763 10, 1764 None, 1765 &HashSet::new(), ··· 1771 assert_eq!(counts.items[1].0, "c.com"); 1772 assert_eq!(counts.items[2].0, "d.com"); 1773 1774 - // Test reverse: true (descending order - by target descending) 1775 let counts = storage.get_many_to_many_counts( 1776 "a.com", 1777 "app.t.c", 1778 ".abc.uri", 1779 ".def.uri", 1780 - true, 1781 10, 1782 None, 1783 &HashSet::new(),
··· 11 #[cfg(feature = "rocks")] 12 pub use rocks_store::RocksStorage; 13 14 + /// Ordering for paginated link queries 15 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 16 + pub enum Order { 17 + /// Newest links first (default) 18 + NewestToOldest, 19 + /// Oldest links first 20 + OldestToNewest, 21 + } 22 + 23 #[derive(Debug, PartialEq)] 24 pub struct PagedAppendingCollection<T> { 25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" ··· 81 collection: &str, 82 path: &str, 83 path_to_other: &str, 84 + order: Order, 85 limit: u64, 86 after: Option<String>, 87 filter_dids: &HashSet<Did>, ··· 97 target: &str, 98 collection: &str, 99 path: &str, 100 + order: Order, 101 limit: u64, 102 until: Option<u64>, 103 filter_dids: &HashSet<Did>, ··· 191 "a.com", 192 "app.t.c", 193 ".abc.uri", 194 + Order::NewestToOldest, 195 100, 196 None, 197 &HashSet::default() ··· 695 "a.com", 696 "app.t.c", 697 ".abc.uri", 698 + Order::NewestToOldest, 699 100, 700 None, 701 &HashSet::default() ··· 744 "a.com", 745 "app.t.c", 746 ".abc.uri", 747 + Order::NewestToOldest, 748 2, 749 None, 750 &HashSet::default(), ··· 783 "a.com", 784 "app.t.c", 785 ".abc.uri", 786 + Order::NewestToOldest, 787 2, 788 links.next, 789 &HashSet::default(), ··· 822 "a.com", 823 "app.t.c", 824 ".abc.uri", 825 + Order::NewestToOldest, 826 2, 827 links.next, 828 &HashSet::default(), ··· 871 )?; 872 } 873 874 + // Test OldestToNewest order (oldest first) 875 let links = storage.get_links( 876 "a.com", 877 "app.t.c", 878 ".abc.uri", 879 + Order::OldestToNewest, 880 2, 881 None, 882 &HashSet::default(), ··· 901 total: 5, 902 } 903 ); 904 + // Test NewestToOldest order (newest first) 905 let links = storage.get_links( 906 "a.com", 907 "app.t.c", 908 ".abc.uri", 909 + Order::NewestToOldest, 910 2, 911 None, 912 &HashSet::default(), ··· 939 "a.com", 940 "app.t.c", 941 ".abc.uri", 942 + Order::NewestToOldest, 943 2, 944 None, 945 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 973 "a.com", 974 "app.t.c", 975 ".abc.uri", 976 + Order::NewestToOldest, 977 2, 978 None, 979 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 996 "a.com", 997 "app.t.c", 998 ".abc.uri", 999 + Order::NewestToOldest, 1000 2, 1001 None, 1002 &HashSet::from([Did("did:plc:someone-else".to_string())]), ··· 1044 "a.com", 1045 "app.t.c", 1046 ".abc.uri", 1047 + Order::NewestToOldest, 1048 2, 1049 None, 1050 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 1074 "a.com", 1075 "app.t.c", 1076 ".abc.uri", 1077 + Order::NewestToOldest, 1078 2, 1079 None, 1080 &HashSet::from([ ··· 1107 "a.com", 1108 "app.t.c", 1109 ".abc.uri", 1110 + Order::NewestToOldest, 1111 2, 1112 None, 1113 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), ··· 1144 "a.com", 1145 "app.t.c", 1146 ".abc.uri", 1147 + Order::NewestToOldest, 1148 2, 1149 None, 1150 &HashSet::default(), ··· 1173 "a.com", 1174 "app.t.c", 1175 ".abc.uri", 1176 + Order::NewestToOldest, 1177 2, 1178 links.next, 1179 &HashSet::default(), ··· 1222 "a.com", 1223 "app.t.c", 1224 ".abc.uri", 1225 + Order::NewestToOldest, 1226 2, 1227 None, 1228 &HashSet::default(), ··· 1265 "a.com", 1266 "app.t.c", 1267 ".abc.uri", 1268 + Order::NewestToOldest, 1269 2, 1270 links.next, 1271 &HashSet::default(), ··· 1314 "a.com", 1315 "app.t.c", 1316 ".abc.uri", 1317 + Order::NewestToOldest, 1318 2, 1319 None, 1320 &HashSet::default(), ··· 1351 "a.com", 1352 "app.t.c", 1353 ".abc.uri", 1354 + Order::NewestToOldest, 1355 2, 1356 links.next, 1357 &HashSet::default(), ··· 1393 "a.com", 1394 "app.t.c", 1395 ".abc.uri", 1396 + Order::NewestToOldest, 1397 2, 1398 None, 1399 &HashSet::default(), ··· 1426 "a.com", 1427 "app.t.c", 1428 ".abc.uri", 1429 + Order::NewestToOldest, 1430 2, 1431 links.next, 1432 &HashSet::default(), ··· 1508 "a.b.c", 1509 ".d.e", 1510 ".f.g", 1511 + Order::NewestToOldest, 1512 10, 1513 None, 1514 &HashSet::new(), ··· 1552 "app.t.c", 1553 ".abc.uri", 1554 ".def.uri", 1555 + Order::NewestToOldest, 1556 10, 1557 None, 1558 &HashSet::new(), ··· 1652 "app.t.c", 1653 ".abc.uri", 1654 ".def.uri", 1655 + Order::NewestToOldest, 1656 10, 1657 None, 1658 &HashSet::new(), ··· 1669 "app.t.c", 1670 ".abc.uri", 1671 ".def.uri", 1672 + Order::NewestToOldest, 1673 10, 1674 None, 1675 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), ··· 1686 "app.t.c", 1687 ".abc.uri", 1688 ".def.uri", 1689 + Order::NewestToOldest, 1690 10, 1691 None, 1692 &HashSet::new(), ··· 1762 2, 1763 )?; 1764 1765 + // Test NewestToOldest order (default order - by target ascending) 1766 let counts = storage.get_many_to_many_counts( 1767 "a.com", 1768 "app.t.c", 1769 ".abc.uri", 1770 ".def.uri", 1771 + Order::NewestToOldest, 1772 10, 1773 None, 1774 &HashSet::new(), ··· 1780 assert_eq!(counts.items[1].0, "c.com"); 1781 assert_eq!(counts.items[2].0, "d.com"); 1782 1783 + // Test OldestToNewest order (descending order - by target descending) 1784 let counts = storage.get_many_to_many_counts( 1785 "a.com", 1786 "app.t.c", 1787 ".abc.uri", 1788 ".def.uri", 1789 + Order::OldestToNewest, 1790 10, 1791 None, 1792 &HashSet::new(),
+26 -21
constellation/src/storage/rocks_store.rs
··· 1 use super::{ 2 - ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, 3 - StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 use anyhow::{bail, Result}; ··· 941 collection: &str, 942 path: &str, 943 path_to_other: &str, 944 - reverse: bool, 945 limit: u64, 946 after: Option<String>, 947 filter_dids: &HashSet<Did>, ··· 1084 items.push((target.0 .0, *n, dids.len() as u64)); 1085 } 1086 1087 - // Sort in desired direction 1088 - if reverse { 1089 - items.sort_by(|a, b| b.cmp(a)); // descending 1090 - } else { 1091 - items.sort(); // ascending 1092 } 1093 1094 let next = if grouped_counts.len() as u64 >= limit { ··· 1136 target: &str, 1137 collection: &str, 1138 path: &str, 1139 - reverse: bool, 1140 limit: u64, 1141 until: Option<u64>, 1142 filter_dids: &HashSet<Did>, ··· 1182 let begin: usize; 1183 let next: Option<u64>; 1184 1185 - if reverse { 1186 - begin = until.map(|u| (u - 1) as usize).unwrap_or(0); 1187 - end = std::cmp::min(begin + limit as usize, total as usize); 1188 1189 - next = if end < total as usize { 1190 - Some(end as u64 + 1) 1191 - } else { 1192 - None 1193 } 1194 - } else { 1195 - end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1196 - begin = end.saturating_sub(limit as usize); 1197 - next = if begin == 0 { None } else { Some(begin as u64) }; 1198 } 1199 1200 let mut did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1201 1202 - if reverse { 1203 did_id_rkeys.reverse(); 1204 } 1205
··· 1 use super::{ 2 + ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 + PagedOrderedCollection, StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 use anyhow::{bail, Result}; ··· 941 collection: &str, 942 path: &str, 943 path_to_other: &str, 944 + order: Order, 945 limit: u64, 946 after: Option<String>, 947 filter_dids: &HashSet<Did>, ··· 1084 items.push((target.0 .0, *n, dids.len() as u64)); 1085 } 1086 1087 + // Sort based on order: OldestToNewest uses descending order, NewestToOldest uses ascending 1088 + match order { 1089 + Order::OldestToNewest => items.sort_by(|a, b| b.cmp(a)), // descending 1090 + Order::NewestToOldest => items.sort(), // ascending 1091 } 1092 1093 let next = if grouped_counts.len() as u64 >= limit { ··· 1135 target: &str, 1136 collection: &str, 1137 path: &str, 1138 + order: Order, 1139 limit: u64, 1140 until: Option<u64>, 1141 filter_dids: &HashSet<Did>, ··· 1181 let begin: usize; 1182 let next: Option<u64>; 1183 1184 + match order { 1185 + // OldestToNewest: start from the beginning, paginate forward 1186 + Order::OldestToNewest => { 1187 + begin = until.map(|u| (u - 1) as usize).unwrap_or(0); 1188 + end = std::cmp::min(begin + limit as usize, total as usize); 1189 1190 + next = if end < total as usize { 1191 + Some(end as u64 + 1) 1192 + } else { 1193 + None 1194 + } 1195 } 1196 + // NewestToOldest: start from the end, paginate backward 1197 + Order::NewestToOldest => { 1198 + end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1199 + begin = end.saturating_sub(limit as usize); 1200 + next = if begin == 0 { None } else { Some(begin as u64) }; 1201 + } 1202 } 1203 1204 let mut did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1205 1206 + // For OldestToNewest, reverse the items to maintain forward chronological order 1207 + if order == Order::OldestToNewest { 1208 did_id_rkeys.reverse(); 1209 } 1210