Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Add new get_many_to_many XRPC endpoint #7

merged opened by seoul.systems targeting main from seoul.systems/microcosm-rs: xrpc_many2many

Added a new XRPC API endpoint to fetch joined record URIs, termed get_many_to_many (we talked about this briefly on Discord already). It is implemented and functions almost identical to the existing get_many_to_many_counts endpoint and handler. Some of its possible flaws like the two step lookup to verify a matching DID is indeed active are duplicated as well. On the plus side, this should make the PR pretty straightforward to review and make it easier to modify both endpoints later on when a more efficient way to validate the status of DIDs is possible.

If you have comments remarks etc. I am happy to work on some parts again.

Labels

None yet.

Participants 2
AT URI
at://did:plc:53wellrw53o7sw4zlpfenvuh/sh.tangled.repo.pull/3mbkyehqooh22
+895 -608
Interdiff #3 โ†’ #4
+1
.gitignore
··· 1 1 /target 2 2 local/ 3 + rocks.test
+4
.prettierrc
··· 1 + { 2 + "tabWidth": 2, 3 + "useTabs": false 4 + }
+175 -352
Cargo.lock
··· 24 24 checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" 25 25 dependencies = [ 26 26 "cfg-if", 27 - "getrandom 0.2.15", 28 27 "once_cell", 29 28 "version_check", 30 29 "zerocopy 0.7.35", ··· 123 122 checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223" 124 123 125 124 [[package]] 126 - name = "arc-swap" 127 - version = "1.7.1" 128 - source = "registry+https://github.com/rust-lang/crates.io-index" 129 - checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 130 - 131 - [[package]] 132 125 name = "arrayvec" 133 126 version = "0.7.6" 134 127 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 162 155 "proc-macro2", 163 156 "quote", 164 157 "serde", 158 + "syn", 165 - "syn 2.0.106", 166 159 ] 167 160 168 161 [[package]] ··· 204 197 dependencies = [ 205 198 "proc-macro2", 206 199 "quote", 200 + "syn", 207 - "syn 2.0.106", 208 201 "synstructure", 209 202 ] 210 203 ··· 216 209 dependencies = [ 217 210 "proc-macro2", 218 211 "quote", 212 + "syn", 219 - "syn 2.0.106", 220 - ] 221 - 222 - [[package]] 223 - name = "async-channel" 224 - version = "2.5.0" 225 - source = "registry+https://github.com/rust-lang/crates.io-index" 226 - checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" 227 - dependencies = [ 228 - "concurrent-queue", 229 - "event-listener-strategy", 230 - "futures-core", 231 - "pin-project-lite", 232 213 ] 233 214 234 215 [[package]] ··· 274 255 dependencies = [ 275 256 "proc-macro2", 276 257 "quote", 258 + "syn", 277 - "syn 2.0.106", 278 259 ] 279 - 280 - [[package]] 281 - name = "async-task" 282 - version = "4.7.1" 283 - source = "registry+https://github.com/rust-lang/crates.io-index" 284 - checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" 285 260 286 261 [[package]] 287 262 name = "async-trait" ··· 291 266 dependencies = [ 292 267 "proc-macro2", 293 268 "quote", 269 + "syn", 294 - "syn 2.0.106", 295 270 ] 296 271 297 272 [[package]] ··· 496 471 "serde_json", 497 472 "thiserror 1.0.69", 498 473 "trait-variant", 499 - ] 500 - 501 - [[package]] 502 - name = "auto_enums" 503 - version = "0.8.7" 504 - source = "registry+https://github.com/rust-lang/crates.io-index" 505 - checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781" 506 - dependencies = [ 507 - "derive_utils", 508 - "proc-macro2", 509 - "quote", 510 - "syn 2.0.106", 511 474 ] 512 475 513 476 [[package]] ··· 749 712 "regex", 750 713 "rustc-hash 1.1.0", 751 714 "shlex", 715 + "syn", 752 - "syn 2.0.106", 753 716 "which", 754 717 ] 755 718 ··· 762 725 "bitflags", 763 726 "cexpr", 764 727 "clang-sys", 728 + "itertools 0.12.1", 765 - "itertools 0.13.0", 766 729 "proc-macro2", 767 730 "quote", 768 731 "regex", 769 732 "rustc-hash 1.1.0", 770 733 "shlex", 734 + "syn", 771 - "syn 2.0.106", 772 735 ] 773 736 774 737 [[package]] ··· 780 743 "bitflags", 781 744 "cexpr", 782 745 "clang-sys", 746 + "itertools 0.12.1", 783 - "itertools 0.13.0", 784 747 "proc-macro2", 785 748 "quote", 786 749 "regex", 787 750 "rustc-hash 2.1.1", 788 751 "shlex", 752 + "syn", 789 - "syn 2.0.106", 790 753 ] 791 754 792 755 [[package]] ··· 840 803 841 804 [[package]] 842 805 name = "bytes" 806 + version = "1.11.1" 843 - version = "1.10.1" 844 807 source = "registry+https://github.com/rust-lang/crates.io-index" 808 + checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" 845 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 846 809 847 810 [[package]] 848 811 name = "byteview" ··· 992 955 993 956 [[package]] 994 957 name = "clap" 958 + version = "4.5.56" 995 - version = "4.5.48" 996 959 source = "registry+https://github.com/rust-lang/crates.io-index" 960 + checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" 997 - checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" 998 961 dependencies = [ 999 962 "clap_builder", 1000 963 "clap_derive", ··· 1002 965 1003 966 [[package]] 1004 967 name = "clap_builder" 968 + version = "4.5.56" 1005 - version = "4.5.48" 1006 969 source = "registry+https://github.com/rust-lang/crates.io-index" 970 + checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" 1007 - checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" 1008 971 dependencies = [ 1009 972 "anstream", 1010 973 "anstyle", 1011 974 "clap_lex", 975 + "strsim", 1012 - "strsim 0.11.1", 1013 976 ] 1014 977 1015 978 [[package]] 1016 979 name = "clap_derive" 980 + version = "4.5.55" 1017 - version = "4.5.47" 1018 981 source = "registry+https://github.com/rust-lang/crates.io-index" 982 + checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" 1019 - checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" 1020 983 dependencies = [ 1021 984 "heck", 1022 985 "proc-macro2", 1023 986 "quote", 987 + "syn", 1024 - "syn 2.0.106", 1025 988 ] 1026 989 1027 990 [[package]] ··· 1173 1136 ] 1174 1137 1175 1138 [[package]] 1139 + name = "core_affinity" 1140 + version = "0.8.3" 1141 + source = "registry+https://github.com/rust-lang/crates.io-index" 1142 + checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" 1143 + dependencies = [ 1144 + "libc", 1145 + "num_cpus", 1146 + "winapi", 1147 + ] 1148 + 1149 + [[package]] 1176 1150 name = "cpufeatures" 1177 1151 version = "0.2.17" 1178 1152 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1270 1244 1271 1245 [[package]] 1272 1246 name = "darling" 1273 - version = "0.14.4" 1274 - source = "registry+https://github.com/rust-lang/crates.io-index" 1275 - checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" 1276 - dependencies = [ 1277 - "darling_core 0.14.4", 1278 - "darling_macro 0.14.4", 1279 - ] 1280 - 1281 - [[package]] 1282 - name = "darling" 1283 1247 version = "0.20.11" 1284 1248 source = "registry+https://github.com/rust-lang/crates.io-index" 1285 1249 checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" 1286 1250 dependencies = [ 1251 + "darling_core", 1252 + "darling_macro", 1287 - "darling_core 0.20.11", 1288 - "darling_macro 0.20.11", 1289 - ] 1290 - 1291 - [[package]] 1292 - name = "darling_core" 1293 - version = "0.14.4" 1294 - source = "registry+https://github.com/rust-lang/crates.io-index" 1295 - checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" 1296 - dependencies = [ 1297 - "fnv", 1298 - "ident_case", 1299 - "proc-macro2", 1300 - "quote", 1301 - "strsim 0.10.0", 1302 - "syn 1.0.109", 1303 1253 ] 1304 1254 1305 1255 [[package]] ··· 1312 1262 "ident_case", 1313 1263 "proc-macro2", 1314 1264 "quote", 1265 + "strsim", 1266 + "syn", 1315 - "strsim 0.11.1", 1316 - "syn 2.0.106", 1317 - ] 1318 - 1319 - [[package]] 1320 - name = "darling_macro" 1321 - version = "0.14.4" 1322 - source = "registry+https://github.com/rust-lang/crates.io-index" 1323 - checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" 1324 - dependencies = [ 1325 - "darling_core 0.14.4", 1326 - "quote", 1327 - "syn 1.0.109", 1328 1267 ] 1329 1268 1330 1269 [[package]] ··· 1333 1272 source = "registry+https://github.com/rust-lang/crates.io-index" 1334 1273 checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" 1335 1274 dependencies = [ 1275 + "darling_core", 1336 - "darling_core 0.20.11", 1337 1276 "quote", 1277 + "syn", 1338 - "syn 2.0.106", 1339 1278 ] 1340 1279 1341 1280 [[package]] ··· 1375 1314 checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" 1376 1315 dependencies = [ 1377 1316 "data-encoding", 1317 + "syn", 1378 - "syn 2.0.106", 1379 1318 ] 1380 1319 1381 1320 [[package]] ··· 1434 1373 source = "registry+https://github.com/rust-lang/crates.io-index" 1435 1374 checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" 1436 1375 dependencies = [ 1376 + "darling", 1437 - "darling 0.20.11", 1438 1377 "proc-macro2", 1439 1378 "quote", 1379 + "syn", 1440 - "syn 2.0.106", 1441 1380 ] 1442 1381 1443 1382 [[package]] ··· 1447 1386 checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" 1448 1387 dependencies = [ 1449 1388 "derive_builder_core", 1389 + "syn", 1450 - "syn 2.0.106", 1451 1390 ] 1452 1391 1453 1392 [[package]] ··· 1467 1406 dependencies = [ 1468 1407 "proc-macro2", 1469 1408 "quote", 1409 + "syn", 1470 - "syn 2.0.106", 1471 1410 "unicode-xid", 1472 1411 ] 1473 1412 1474 1413 [[package]] 1475 - name = "derive_utils" 1476 - version = "0.15.0" 1477 - source = "registry+https://github.com/rust-lang/crates.io-index" 1478 - checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" 1479 - dependencies = [ 1480 - "proc-macro2", 1481 - "quote", 1482 - "syn 2.0.106", 1483 - ] 1484 - 1485 - [[package]] 1486 1414 name = "digest" 1487 1415 version = "0.10.7" 1488 1416 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1523 1451 dependencies = [ 1524 1452 "proc-macro2", 1525 1453 "quote", 1454 + "syn", 1526 - "syn 2.0.106", 1527 1455 ] 1528 1456 1529 1457 [[package]] ··· 1531 1459 version = "0.1.0" 1532 1460 source = "registry+https://github.com/rust-lang/crates.io-index" 1533 1461 checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" 1534 - 1535 - [[package]] 1536 - name = "downcast-rs" 1537 - version = "1.2.1" 1538 - source = "registry+https://github.com/rust-lang/crates.io-index" 1539 - checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" 1540 1462 1541 1463 [[package]] 1542 1464 name = "dropshot" ··· 1582 1504 "thiserror 2.0.16", 1583 1505 "tokio", 1584 1506 "tokio-rustls 0.25.0", 1507 + "toml", 1585 - "toml 0.9.7", 1586 1508 "uuid", 1587 1509 "version_check", 1588 1510 "waitgroup", ··· 1600 1522 "semver", 1601 1523 "serde", 1602 1524 "serde_tokenstream", 1525 + "syn", 1603 - "syn 2.0.106", 1604 1526 ] 1605 1527 1606 1528 [[package]] ··· 1673 1595 "heck", 1674 1596 "proc-macro2", 1675 1597 "quote", 1598 + "syn", 1676 - "syn 2.0.106", 1677 1599 ] 1678 1600 1679 1601 [[package]] ··· 1685 1607 "once_cell", 1686 1608 "proc-macro2", 1687 1609 "quote", 1610 + "syn", 1688 - "syn 2.0.106", 1689 1611 ] 1690 1612 1691 1613 [[package]] ··· 1759 1681 version = "0.1.9" 1760 1682 source = "registry+https://github.com/rust-lang/crates.io-index" 1761 1683 checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 1684 + 1685 + [[package]] 1686 + name = "fastant" 1687 + version = "0.1.11" 1688 + source = "registry+https://github.com/rust-lang/crates.io-index" 1689 + checksum = "2e825441bfb2d831c47c97d05821552db8832479f44c571b97fededbf0099c07" 1690 + dependencies = [ 1691 + "small_ctor", 1692 + "web-time", 1693 + ] 1762 1694 1763 1695 [[package]] 1764 1696 name = "fastrand" ··· 1835 1767 source = "registry+https://github.com/rust-lang/crates.io-index" 1836 1768 checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" 1837 1769 dependencies = [ 1838 - "futures-core", 1839 - "futures-sink", 1840 - "nanorand", 1841 1770 "spin", 1842 1771 ] 1843 1772 ··· 1854 1783 checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 1855 1784 1856 1785 [[package]] 1786 + name = "foldhash" 1787 + version = "0.2.0" 1788 + source = "registry+https://github.com/rust-lang/crates.io-index" 1789 + checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 1790 + 1791 + [[package]] 1857 1792 name = "foreign-types" 1858 1793 version = "0.3.2" 1859 1794 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1879 1814 1880 1815 [[package]] 1881 1816 name = "foyer" 1817 + version = "0.22.3" 1882 - version = "0.18.0" 1883 1818 source = "registry+https://github.com/rust-lang/crates.io-index" 1819 + checksum = "3b0abc0b87814989efa711f9becd9f26969820e2d3905db27d10969c4bd45890" 1884 - checksum = "0b4d8e96374206ff1b4265f2e2e6e1f80bc3048957b2a1e7fdeef929d68f318f" 1885 1820 dependencies = [ 1821 + "anyhow", 1886 1822 "equivalent", 1887 1823 "foyer-common", 1888 1824 "foyer-memory", 1889 1825 "foyer-storage", 1826 + "foyer-tokio", 1827 + "futures-util", 1828 + "mea", 1890 - "madsim-tokio", 1891 1829 "mixtrics", 1892 1830 "pin-project", 1893 1831 "serde", 1894 - "thiserror 2.0.16", 1895 - "tokio", 1896 1832 "tracing", 1897 1833 ] 1898 1834 1899 1835 [[package]] 1900 1836 name = "foyer-common" 1837 + version = "0.22.3" 1901 - version = "0.18.0" 1902 1838 source = "registry+https://github.com/rust-lang/crates.io-index" 1839 + checksum = "a3db80d5dece93adb7ad709c84578794724a9cba342a7e566c3551c7ec626789" 1903 - checksum = "911b8e3f23d5fe55b0b240f75af1d2fa5cb7261d3f9b38ef1c57bbc9f0449317" 1904 1840 dependencies = [ 1841 + "anyhow", 1905 1842 "bincode 1.3.3", 1906 1843 "bytes", 1907 1844 "cfg-if", 1845 + "foyer-tokio", 1908 - "itertools 0.14.0", 1909 - "madsim-tokio", 1910 1846 "mixtrics", 1911 1847 "parking_lot", 1912 1848 "pin-project", 1913 1849 "serde", 1914 - "thiserror 2.0.16", 1915 - "tokio", 1916 1850 "twox-hash", 1917 1851 ] 1918 1852 ··· 1927 1861 1928 1862 [[package]] 1929 1863 name = "foyer-memory" 1864 + version = "0.22.3" 1930 - version = "0.18.0" 1931 1865 source = "registry+https://github.com/rust-lang/crates.io-index" 1866 + checksum = "db907f40a527ca2aa2f40a5f68b32ea58aa70f050cd233518e9ffd402cfba6ce" 1932 - checksum = "506883d5a8500dea1b1662f7180f3534bdcbfa718d3253db7179552ef83612fa" 1933 1867 dependencies = [ 1868 + "anyhow", 1934 - "arc-swap", 1935 1869 "bitflags", 1936 1870 "cmsketch", 1937 1871 "equivalent", 1938 1872 "foyer-common", 1939 1873 "foyer-intrusive-collections", 1874 + "foyer-tokio", 1875 + "futures-util", 1876 + "hashbrown 0.16.1", 1940 - "hashbrown 0.15.2", 1941 1877 "itertools 0.14.0", 1878 + "mea", 1942 - "madsim-tokio", 1943 1879 "mixtrics", 1944 1880 "parking_lot", 1881 + "paste", 1945 1882 "pin-project", 1946 1883 "serde", 1947 - "thiserror 2.0.16", 1948 - "tokio", 1949 1884 "tracing", 1950 1885 ] 1951 1886 1952 1887 [[package]] 1953 1888 name = "foyer-storage" 1889 + version = "0.22.3" 1954 - version = "0.18.0" 1955 1890 source = "registry+https://github.com/rust-lang/crates.io-index" 1891 + checksum = "1983f1db3d0710e9c9d5fc116d9202dccd41a2d1e032572224f1aff5520aa958" 1956 - checksum = "1ba8403a54a2f2032fb647e49c442e5feeb33f3989f7024f1b178341a016f06d" 1957 1892 dependencies = [ 1958 1893 "allocator-api2", 1959 1894 "anyhow", 1960 - "auto_enums", 1961 1895 "bytes", 1896 + "core_affinity", 1962 1897 "equivalent", 1898 + "fastant", 1963 - "flume", 1964 1899 "foyer-common", 1965 1900 "foyer-memory", 1901 + "foyer-tokio", 1966 1902 "fs4 0.13.1", 1967 1903 "futures-core", 1968 1904 "futures-util", 1905 + "hashbrown 0.16.1", 1906 + "io-uring", 1969 1907 "itertools 0.14.0", 1970 1908 "libc", 1971 1909 "lz4", 1910 + "mea", 1972 - "madsim-tokio", 1973 - "ordered_hash_map", 1974 1911 "parking_lot", 1975 - "paste", 1976 1912 "pin-project", 1977 1913 "rand 0.9.1", 1978 1914 "serde", 1979 - "thiserror 2.0.16", 1980 - "tokio", 1981 1915 "tracing", 1982 1916 "twox-hash", 1983 1917 "zstd", 1918 + ] 1919 + 1920 + [[package]] 1921 + name = "foyer-tokio" 1922 + version = "0.22.3" 1923 + source = "registry+https://github.com/rust-lang/crates.io-index" 1924 + checksum = "f6577b05a7ffad0db555aedf00bfe52af818220fc4c1c3a7a12520896fc38627" 1925 + dependencies = [ 1926 + "tokio", 1984 1927 ] 1985 1928 1986 1929 [[package]] ··· 2065 2008 dependencies = [ 2066 2009 "proc-macro2", 2067 2010 "quote", 2011 + "syn", 2068 - "syn 2.0.106", 2069 2012 ] 2070 2013 2071 2014 [[package]] ··· 2229 2172 version = "0.12.3" 2230 2173 source = "registry+https://github.com/rust-lang/crates.io-index" 2231 2174 checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" 2232 - 2233 - [[package]] 2234 - name = "hashbrown" 2235 - version = "0.13.2" 2236 - source = "registry+https://github.com/rust-lang/crates.io-index" 2237 - checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" 2238 - dependencies = [ 2239 - "ahash", 2240 - ] 2241 2175 2242 2176 [[package]] 2243 2177 name = "hashbrown" ··· 2253 2187 dependencies = [ 2254 2188 "allocator-api2", 2255 2189 "equivalent", 2190 + "foldhash 0.1.5", 2191 + ] 2192 + 2193 + [[package]] 2194 + name = "hashbrown" 2195 + version = "0.16.1" 2196 + source = "registry+https://github.com/rust-lang/crates.io-index" 2197 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 2198 + dependencies = [ 2199 + "allocator-api2", 2200 + "equivalent", 2201 + "foldhash 0.2.0", 2256 - "foldhash", 2257 2202 ] 2258 2203 2259 2204 [[package]] ··· 2687 2632 dependencies = [ 2688 2633 "proc-macro2", 2689 2634 "quote", 2635 + "syn", 2690 - "syn 2.0.106", 2691 2636 ] 2692 2637 2693 2638 [[package]] ··· 2827 2772 2828 2773 [[package]] 2829 2774 name = "itertools" 2830 - version = "0.13.0" 2831 - source = "registry+https://github.com/rust-lang/crates.io-index" 2832 - checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" 2833 - dependencies = [ 2834 - "either", 2835 - ] 2836 - 2837 - [[package]] 2838 - name = "itertools" 2839 2775 version = "0.14.0" 2840 2776 source = "registry+https://github.com/rust-lang/crates.io-index" 2841 2777 checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" ··· 2891 2827 dependencies = [ 2892 2828 "proc-macro2", 2893 2829 "quote", 2830 + "syn", 2894 - "syn 2.0.106", 2895 2831 ] 2896 2832 2897 2833 [[package]] ··· 3243 3179 ] 3244 3180 3245 3181 [[package]] 3246 - name = "madsim" 3247 - version = "0.2.32" 3248 - source = "registry+https://github.com/rust-lang/crates.io-index" 3249 - checksum = "db6694555643da293dfb89e33c2880a13b62711d64b6588bc7df6ce4110b27f1" 3250 - dependencies = [ 3251 - "ahash", 3252 - "async-channel", 3253 - "async-stream", 3254 - "async-task", 3255 - "bincode 1.3.3", 3256 - "bytes", 3257 - "downcast-rs", 3258 - "futures-util", 3259 - "lazy_static", 3260 - "libc", 3261 - "madsim-macros", 3262 - "naive-timer", 3263 - "panic-message", 3264 - "rand 0.8.5", 3265 - "rand_xoshiro 0.6.0", 3266 - "rustversion", 3267 - "serde", 3268 - "spin", 3269 - "tokio", 3270 - "tokio-util", 3271 - "toml 0.8.23", 3272 - "tracing", 3273 - "tracing-subscriber", 3274 - ] 3275 - 3276 - [[package]] 3277 - name = "madsim-macros" 3278 - version = "0.2.12" 3279 - source = "registry+https://github.com/rust-lang/crates.io-index" 3280 - checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" 3281 - dependencies = [ 3282 - "darling 0.14.4", 3283 - "proc-macro2", 3284 - "quote", 3285 - "syn 1.0.109", 3286 - ] 3287 - 3288 - [[package]] 3289 - name = "madsim-tokio" 3290 - version = "0.2.30" 3291 - source = "registry+https://github.com/rust-lang/crates.io-index" 3292 - checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5" 3293 - dependencies = [ 3294 - "madsim", 3295 - "spin", 3296 - "tokio", 3297 - ] 3298 - 3299 - [[package]] 3300 3182 name = "match_cfg" 3301 3183 version = "0.1.0" 3302 3184 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3316 3198 version = "0.8.4" 3317 3199 source = "registry+https://github.com/rust-lang/crates.io-index" 3318 3200 checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" 3201 + 3202 + [[package]] 3203 + name = "mea" 3204 + version = "0.6.3" 3205 + source = "registry+https://github.com/rust-lang/crates.io-index" 3206 + checksum = "6747f54621d156e1b47eb6b25f39a941b9fc347f98f67d25d8881ff99e8ed832" 3207 + dependencies = [ 3208 + "slab", 3209 + ] 3319 3210 3320 3211 [[package]] 3321 3212 name = "mediatype" ··· 3552 3443 ] 3553 3444 3554 3445 [[package]] 3555 - name = "naive-timer" 3556 - version = "0.2.0" 3557 - source = "registry+https://github.com/rust-lang/crates.io-index" 3558 - checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" 3559 - 3560 - [[package]] 3561 - name = "nanorand" 3562 - version = "0.7.0" 3563 - source = "registry+https://github.com/rust-lang/crates.io-index" 3564 - checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" 3565 - dependencies = [ 3566 - "getrandom 0.2.15", 3567 - ] 3568 - 3569 - [[package]] 3570 3446 name = "native-tls" 3571 3447 version = "0.2.14" 3572 3448 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3626 3502 3627 3503 [[package]] 3628 3504 name = "num-bigint-dig" 3505 + version = "0.8.6" 3629 - version = "0.8.4" 3630 3506 source = "registry+https://github.com/rust-lang/crates.io-index" 3507 + checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7" 3631 - checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" 3632 3508 dependencies = [ 3633 - "byteorder", 3634 3509 "lazy_static", 3635 3510 "libm", 3636 3511 "num-integer", ··· 3703 3578 ] 3704 3579 3705 3580 [[package]] 3581 + name = "num_cpus" 3582 + version = "1.17.0" 3583 + source = "registry+https://github.com/rust-lang/crates.io-index" 3584 + checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" 3585 + dependencies = [ 3586 + "hermit-abi", 3587 + "libc", 3588 + ] 3589 + 3590 + [[package]] 3706 3591 name = "num_threads" 3707 3592 version = "0.1.7" 3708 3593 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3773 3658 dependencies = [ 3774 3659 "proc-macro2", 3775 3660 "quote", 3661 + "syn", 3776 - "syn 2.0.106", 3777 3662 ] 3778 3663 3779 3664 [[package]] ··· 3805 3690 ] 3806 3691 3807 3692 [[package]] 3808 - name = "ordered_hash_map" 3809 - version = "0.4.0" 3810 - source = "registry+https://github.com/rust-lang/crates.io-index" 3811 - checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176" 3812 - dependencies = [ 3813 - "hashbrown 0.13.2", 3814 - ] 3815 - 3816 - [[package]] 3817 3693 name = "p256" 3818 3694 version = "0.13.2" 3819 3695 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3836 3712 ] 3837 3713 3838 3714 [[package]] 3839 - name = "panic-message" 3840 - version = "0.3.0" 3841 - source = "registry+https://github.com/rust-lang/crates.io-index" 3842 - checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" 3843 - 3844 - [[package]] 3845 3715 name = "parking" 3846 3716 version = "2.2.1" 3847 3717 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3950 3820 "pest_meta", 3951 3821 "proc-macro2", 3952 3822 "quote", 3823 + "syn", 3953 - "syn 2.0.106", 3954 3824 ] 3955 3825 3956 3826 [[package]] ··· 3980 3850 dependencies = [ 3981 3851 "proc-macro2", 3982 3852 "quote", 3853 + "syn", 3983 - "syn 2.0.106", 3984 3854 ] 3985 3855 3986 3856 [[package]] ··· 4098 3968 "proc-macro-crate", 4099 3969 "proc-macro2", 4100 3970 "quote", 3971 + "syn", 4101 - "syn 2.0.106", 4102 3972 ] 4103 3973 4104 3974 [[package]] ··· 4133 4003 source = "registry+https://github.com/rust-lang/crates.io-index" 4134 4004 checksum = "41273b691a3d467a8c44d05506afba9f7b6bd56c9cdf80123de13fe52d7ec587" 4135 4005 dependencies = [ 4006 + "darling", 4136 - "darling 0.20.11", 4137 4007 "http", 4138 4008 "indexmap 2.11.4", 4139 4009 "mime", ··· 4141 4011 "proc-macro2", 4142 4012 "quote", 4143 4013 "regex", 4014 + "syn", 4144 - "syn 2.0.106", 4145 4015 "thiserror 2.0.16", 4146 4016 ] 4147 4017 ··· 4182 4052 checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55" 4183 4053 dependencies = [ 4184 4054 "proc-macro2", 4055 + "syn", 4185 - "syn 2.0.106", 4186 4056 ] 4187 4057 4188 4058 [[package]] ··· 4493 4363 dependencies = [ 4494 4364 "proc-macro2", 4495 4365 "quote", 4366 + "syn", 4496 - "syn 2.0.106", 4497 4367 ] 4498 4368 4499 4369 [[package]] ··· 4644 4514 4645 4515 [[package]] 4646 4516 name = "rsa" 4517 + version = "0.9.10" 4647 - version = "0.9.8" 4648 4518 source = "registry+https://github.com/rust-lang/crates.io-index" 4519 + checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" 4649 - checksum = "78928ac1ed176a5ca1d17e578a1825f3d81ca54cf41053a592584b020cfd691b" 4650 4520 dependencies = [ 4651 4521 "const-oid", 4652 4522 "digest", ··· 4875 4745 "proc-macro2", 4876 4746 "quote", 4877 4747 "serde_derive_internals", 4748 + "syn", 4878 - "syn 2.0.106", 4879 4749 ] 4880 4750 4881 4751 [[package]] ··· 5008 4878 dependencies = [ 5009 4879 "proc-macro2", 5010 4880 "quote", 4881 + "syn", 5011 - "syn 2.0.106", 5012 4882 ] 5013 4883 5014 4884 [[package]] ··· 5019 4889 dependencies = [ 5020 4890 "proc-macro2", 5021 4891 "quote", 4892 + "syn", 5022 - "syn 2.0.106", 5023 4893 ] 5024 4894 5025 4895 [[package]] ··· 5073 4943 5074 4944 [[package]] 5075 4945 name = "serde_spanned" 5076 - version = "0.6.9" 5077 - source = "registry+https://github.com/rust-lang/crates.io-index" 5078 - checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" 5079 - dependencies = [ 5080 - "serde", 5081 - ] 5082 - 5083 - [[package]] 5084 - name = "serde_spanned" 5085 4946 version = "1.0.2" 5086 4947 source = "registry+https://github.com/rust-lang/crates.io-index" 5087 4948 checksum = "5417783452c2be558477e104686f7de5dae53dba813c28435e0e70f82d9b04ee" ··· 5098 4959 "proc-macro2", 5099 4960 "quote", 5100 4961 "serde", 4962 + "syn", 5101 - "syn 2.0.106", 5102 4963 ] 5103 4964 5104 4965 [[package]] ··· 5137 4998 source = "registry+https://github.com/rust-lang/crates.io-index" 5138 4999 checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" 5139 5000 dependencies = [ 5001 + "darling", 5140 - "darling 0.20.11", 5141 5002 "proc-macro2", 5142 5003 "quote", 5004 + "syn", 5143 - "syn 2.0.106", 5144 5005 ] 5145 5006 5146 5007 [[package]] ··· 5232 5093 5233 5094 [[package]] 5234 5095 name = "slab" 5096 + version = "0.4.12" 5235 - version = "0.4.9" 5236 5097 source = "registry+https://github.com/rust-lang/crates.io-index" 5098 + checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" 5237 - checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" 5238 - dependencies = [ 5239 - "autocfg", 5240 - ] 5241 5099 5242 5100 [[package]] 5243 5101 name = "slingshot" ··· 5326 5184 ] 5327 5185 5328 5186 [[package]] 5187 + name = "small_ctor" 5188 + version = "0.1.2" 5189 + source = "registry+https://github.com/rust-lang/crates.io-index" 5190 + checksum = "88414a5ca1f85d82cc34471e975f0f74f6aa54c40f062efa42c0080e7f763f81" 5191 + 5192 + [[package]] 5329 5193 name = "smallvec" 5330 5194 version = "1.15.0" 5331 5195 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5413 5277 5414 5278 [[package]] 5415 5279 name = "strsim" 5416 - version = "0.10.0" 5417 - source = "registry+https://github.com/rust-lang/crates.io-index" 5418 - checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" 5419 - 5420 - [[package]] 5421 - name = "strsim" 5422 5280 version = "0.11.1" 5423 5281 source = "registry+https://github.com/rust-lang/crates.io-index" 5424 5282 checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" ··· 5431 5289 5432 5290 [[package]] 5433 5291 name = "syn" 5434 - version = "1.0.109" 5435 - source = "registry+https://github.com/rust-lang/crates.io-index" 5436 - checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" 5437 - dependencies = [ 5438 - "proc-macro2", 5439 - "quote", 5440 - "unicode-ident", 5441 - ] 5442 - 5443 - [[package]] 5444 - name = "syn" 5445 5292 version = "2.0.106" 5446 5293 source = "registry+https://github.com/rust-lang/crates.io-index" 5447 5294 checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" ··· 5468 5315 dependencies = [ 5469 5316 "proc-macro2", 5470 5317 "quote", 5318 + "syn", 5471 - "syn 2.0.106", 5472 5319 ] 5473 5320 5474 5321 [[package]] ··· 5554 5401 dependencies = [ 5555 5402 "proc-macro2", 5556 5403 "quote", 5404 + "syn", 5557 - "syn 2.0.106", 5558 5405 ] 5559 5406 5560 5407 [[package]] ··· 5565 5412 dependencies = [ 5566 5413 "proc-macro2", 5567 5414 "quote", 5415 + "syn", 5568 - "syn 2.0.106", 5569 5416 ] 5570 5417 5571 5418 [[package]] ··· 5690 5537 dependencies = [ 5691 5538 "proc-macro2", 5692 5539 "quote", 5540 + "syn", 5693 - "syn 2.0.106", 5694 5541 ] 5695 5542 5696 5543 [[package]] ··· 5776 5623 5777 5624 [[package]] 5778 5625 name = "toml" 5779 - version = "0.8.23" 5780 - source = "registry+https://github.com/rust-lang/crates.io-index" 5781 - checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" 5782 - dependencies = [ 5783 - "serde", 5784 - "serde_spanned 0.6.9", 5785 - "toml_datetime 0.6.11", 5786 - "toml_edit", 5787 - ] 5788 - 5789 - [[package]] 5790 - name = "toml" 5791 5626 version = "0.9.7" 5792 5627 source = "registry+https://github.com/rust-lang/crates.io-index" 5793 5628 checksum = "00e5e5d9bf2475ac9d4f0d9edab68cc573dc2fd644b0dba36b0c30a92dd9eaa0" 5794 5629 dependencies = [ 5795 5630 "indexmap 2.11.4", 5796 5631 "serde_core", 5632 + "serde_spanned", 5797 - "serde_spanned 1.0.2", 5798 5633 "toml_datetime 0.7.2", 5799 5634 "toml_parser", 5800 5635 "toml_writer", ··· 5806 5641 version = "0.6.11" 5807 5642 source = "registry+https://github.com/rust-lang/crates.io-index" 5808 5643 checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" 5809 - dependencies = [ 5810 - "serde", 5811 - ] 5812 5644 5813 5645 [[package]] 5814 5646 name = "toml_datetime" ··· 5826 5658 checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" 5827 5659 dependencies = [ 5828 5660 "indexmap 2.11.4", 5829 - "serde", 5830 - "serde_spanned 0.6.9", 5831 5661 "toml_datetime 0.6.11", 5832 - "toml_write", 5833 5662 "winnow", 5834 5663 ] 5835 5664 ··· 5841 5670 dependencies = [ 5842 5671 "winnow", 5843 5672 ] 5844 - 5845 - [[package]] 5846 - name = "toml_write" 5847 - version = "0.1.2" 5848 - source = "registry+https://github.com/rust-lang/crates.io-index" 5849 - checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" 5850 5673 5851 5674 [[package]] 5852 5675 name = "toml_writer" ··· 5920 5743 dependencies = [ 5921 5744 "proc-macro2", 5922 5745 "quote", 5746 + "syn", 5923 - "syn 2.0.106", 5924 5747 ] 5925 5748 5926 5749 [[package]] ··· 5970 5793 dependencies = [ 5971 5794 "proc-macro2", 5972 5795 "quote", 5796 + "syn", 5973 - "syn 2.0.106", 5974 5797 ] 5975 5798 5976 5799 [[package]] ··· 6292 6115 "log", 6293 6116 "proc-macro2", 6294 6117 "quote", 6118 + "syn", 6295 - "syn 2.0.106", 6296 6119 "wasm-bindgen-shared", 6297 6120 ] 6298 6121 ··· 6327 6150 dependencies = [ 6328 6151 "proc-macro2", 6329 6152 "quote", 6153 + "syn", 6330 - "syn 2.0.106", 6331 6154 "wasm-bindgen-backend", 6332 6155 "wasm-bindgen-shared", 6333 6156 ] ··· 6480 6303 dependencies = [ 6481 6304 "proc-macro2", 6482 6305 "quote", 6306 + "syn", 6483 - "syn 2.0.106", 6484 6307 ] 6485 6308 6486 6309 [[package]] ··· 6491 6314 dependencies = [ 6492 6315 "proc-macro2", 6493 6316 "quote", 6317 + "syn", 6494 - "syn 2.0.106", 6495 6318 ] 6496 6319 6497 6320 [[package]] ··· 6797 6620 dependencies = [ 6798 6621 "proc-macro2", 6799 6622 "quote", 6623 + "syn", 6800 - "syn 2.0.106", 6801 6624 "synstructure", 6802 6625 ] 6803 6626 ··· 6827 6650 dependencies = [ 6828 6651 "proc-macro2", 6829 6652 "quote", 6653 + "syn", 6830 - "syn 2.0.106", 6831 6654 ] 6832 6655 6833 6656 [[package]] ··· 6838 6661 dependencies = [ 6839 6662 "proc-macro2", 6840 6663 "quote", 6664 + "syn", 6841 - "syn 2.0.106", 6842 6665 ] 6843 6666 6844 6667 [[package]] ··· 6858 6681 dependencies = [ 6859 6682 "proc-macro2", 6860 6683 "quote", 6684 + "syn", 6861 - "syn 2.0.106", 6862 6685 "synstructure", 6863 6686 ] 6864 6687 ··· 6880 6703 dependencies = [ 6881 6704 "proc-macro2", 6882 6705 "quote", 6706 + "syn", 6883 - "syn 2.0.106", 6884 6707 ] 6885 6708 6886 6709 [[package]] ··· 6902 6725 dependencies = [ 6903 6726 "proc-macro2", 6904 6727 "quote", 6728 + "syn", 6905 - "syn 2.0.106", 6906 6729 ] 6907 6730 6908 6731 [[package]]
+3
Cargo.toml
··· 13 13 "pocket", 14 14 "reflector", 15 15 ] 16 + 17 + [workspace.dependencies] 18 + clap = { version = "4.5.56", features = ["derive", "env"] }
+1 -1
constellation/Cargo.toml
··· 11 11 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 12 12 axum-metrics = "0.2" 13 13 bincode = "1.3.3" 14 + clap = { workspace = true } 14 - clap = { version = "4.5.26", features = ["derive"] } 15 15 ctrlc = "3.4.5" 16 16 flume = { version = "0.11.1", default-features = false } 17 17 fs4 = { version = "0.12.0", features = ["sync"] }
+54 -49
constellation/src/bin/main.rs
··· 26 26 #[arg(long)] 27 27 #[clap(default_value = "0.0.0.0:6789")] 28 28 bind: SocketAddr, 29 + /// enable metrics collection and serving 30 + #[arg(long, action)] 29 - /// optionally disable the metrics server 30 - #[arg(long)] 31 - #[clap(default_value_t = false)] 32 31 collect_metrics: bool, 33 32 /// metrics server's listen address 33 + #[arg(long, requires("collect_metrics"))] 34 - #[arg(long)] 35 34 #[clap(default_value = "0.0.0.0:8765")] 36 35 bind_metrics: SocketAddr, 37 36 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: ··· 46 45 #[arg(short, long)] 47 46 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 48 47 backend: StorageBackend, 48 + /// Serve a did:web document for this domain 49 + #[arg(long)] 50 + did_web_domain: Option<String>, 49 51 /// Initiate a database backup into this dir, if supported by the storage 50 52 #[arg(long)] 51 53 backup: Option<PathBuf>, ··· 104 106 MemStorage::new(), 105 107 fixture, 106 108 None, 109 + args.did_web_domain, 107 110 stream, 108 111 bind, 109 112 metrics_bind, ··· 139 142 rocks, 140 143 fixture, 141 144 args.data, 145 + args.did_web_domain, 142 146 stream, 143 147 bind, 144 148 metrics_bind, ··· 160 164 mut storage: impl LinkStorage, 161 165 fixture: Option<PathBuf>, 162 166 data_dir: Option<PathBuf>, 167 + did_web_domain: Option<String>, 163 168 stream: String, 164 169 bind: SocketAddr, 165 170 metrics_bind: SocketAddr, ··· 212 217 if collect_metrics { 213 218 install_metrics_server(metrics_bind)?; 214 219 } 220 + serve(readable, bind, did_web_domain, staying_alive).await 215 - serve(readable, bind, staying_alive).await 216 221 }) 217 222 .unwrap(); 218 223 stay_alive.drop_guard(); ··· 222 227 // only spawn monitoring thread if the metrics server is running 223 228 if collect_metrics { 224 229 s.spawn(move || { // monitor thread 230 + let stay_alive = stay_alive.clone(); 231 + let check_alive = stay_alive.clone(); 225 - let stay_alive = stay_alive.clone(); 226 - let check_alive = stay_alive.clone(); 227 232 233 + let process_collector = metrics_process::Collector::default(); 234 + process_collector.describe(); 235 + metrics::describe_gauge!( 236 + "storage_available", 237 + metrics::Unit::Bytes, 238 + "available to be allocated" 239 + ); 240 + metrics::describe_gauge!( 241 + "storage_free", 242 + metrics::Unit::Bytes, 243 + "unused bytes in filesystem" 244 + ); 245 + if let Some(ref p) = data_dir { 246 + if let Err(e) = fs4::available_space(p) { 247 + eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 248 + } else { 249 + println!("disk space monitoring should work, watching at {p:?}"); 228 - let process_collector = metrics_process::Collector::default(); 229 - process_collector.describe(); 230 - metrics::describe_gauge!( 231 - "storage_available", 232 - metrics::Unit::Bytes, 233 - "available to be allocated" 234 - ); 235 - metrics::describe_gauge!( 236 - "storage_free", 237 - metrics::Unit::Bytes, 238 - "unused bytes in filesystem" 239 - ); 240 - if let Some(ref p) = data_dir { 241 - if let Err(e) = fs4::available_space(p) { 242 - eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 243 - } else { 244 - println!("disk space monitoring should work, watching at {p:?}"); 245 - } 246 - } 247 - 248 - 'monitor: loop { 249 - match readable.get_stats() { 250 - Ok(StorageStats { dids, targetables, linking_records, .. }) => { 251 - metrics::gauge!("storage.stats.dids").set(dids as f64); 252 - metrics::gauge!("storage.stats.targetables").set(targetables as f64); 253 - metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 254 250 } 255 - Err(e) => eprintln!("failed to get stats: {e:?}"), 256 251 } 257 252 253 + 'monitor: loop { 254 + match readable.get_stats() { 255 + Ok(StorageStats { dids, targetables, linking_records, .. }) => { 256 + metrics::gauge!("storage.stats.dids").set(dids as f64); 257 + metrics::gauge!("storage.stats.targetables").set(targetables as f64); 258 + metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 259 + } 260 + Err(e) => eprintln!("failed to get stats: {e:?}"), 258 - process_collector.collect(); 259 - if let Some(ref p) = data_dir { 260 - if let Ok(avail) = fs4::available_space(p) { 261 - metrics::gauge!("storage.available").set(avail as f64); 262 261 } 262 + 263 + process_collector.collect(); 264 + if let Some(ref p) = data_dir { 265 + if let Ok(avail) = fs4::available_space(p) { 266 + metrics::gauge!("storage.available").set(avail as f64); 267 + } 268 + if let Ok(free) = fs4::free_space(p) { 269 + metrics::gauge!("storage.free").set(free as f64); 270 + } 263 - if let Ok(free) = fs4::free_space(p) { 264 - metrics::gauge!("storage.free").set(free as f64); 265 271 } 272 + let wait = time::Instant::now(); 273 + while wait.elapsed() < MONITOR_INTERVAL { 274 + thread::sleep(time::Duration::from_millis(100)); 275 + if check_alive.is_cancelled() { 276 + break 'monitor 277 + } 266 - } 267 - let wait = time::Instant::now(); 268 - while wait.elapsed() < MONITOR_INTERVAL { 269 - thread::sleep(time::Duration::from_millis(100)); 270 - if check_alive.is_cancelled() { 271 - break 'monitor 272 278 } 273 279 } 280 + stay_alive.drop_guard(); 281 + }); 274 - } 275 - stay_alive.drop_guard(); 276 - }); 277 282 } 278 283 }); 279 284
constellation/src/lib.rs

This file has not been changed.

+78 -7
constellation/src/server/mod.rs
··· 3 3 extract::{Query, Request}, 4 4 http::{self, header}, 5 5 middleware::{self, Next}, 6 - response::{IntoResponse, Response}, 6 + response::{IntoResponse, Json, Response}, 7 7 routing::get, 8 8 Router, 9 9 }; ··· 37 37 http::StatusCode::INTERNAL_SERVER_ERROR 38 38 } 39 39 40 - pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 41 - where 42 - S: LinkReader, 43 - A: ToSocketAddrs, 44 - { 45 - let app = Router::new() 40 + pub async fn serve<S: LinkReader, A: ToSocketAddrs>( 41 + store: S, 42 + addr: A, 43 + did_web_domain: Option<String>, 44 + stay_alive: CancellationToken, 45 + ) -> anyhow::Result<()> { 46 + let mut app = Router::new(); 47 + 48 + if let Some(d) = did_web_domain { 49 + app = app.route( 50 + "/.well-known/did.json", 51 + get({ 52 + let domain = d.clone(); 53 + move || did_web(domain) 54 + }), 55 + ) 56 + } 57 + 58 + let app = app 46 59 .route("/robots.txt", get(robots)) 47 60 .route( 48 61 "/", ··· 66 79 } 67 80 }), 68 81 ) 82 + // deprecated 69 83 .route( 70 84 "/links/count", 71 85 get({ ··· 78 92 }), 79 93 ) 80 94 .route( 95 + "/xrpc/blue.microcosm.links.getBacklinksCount", 96 + get({ 97 + let store = store.clone(); 98 + move |accept, query| async { 99 + spawn_blocking(|| get_backlink_counts(accept, query, store)) 100 + .await 101 + .map_err(to500)? 102 + } 103 + }), 104 + ) 105 + .route( 81 106 "/links/count/distinct-dids", 82 107 get({ 83 108 let store = store.clone(); ··· 203 228 User-agent: * 204 229 Disallow: /links 205 230 Disallow: /links/ 231 + Disallow: /xrpc/ 206 232 " 207 233 } 208 234 235 + async fn did_web(domain: String) -> impl IntoResponse { 236 + Json(serde_json::json!({ 237 + "id": format!("did:web:{domain}"), 238 + "service": [{ 239 + "id": "#constellation", 240 + "type": "ConstellationGraphService", 241 + "serviceEndpoint": format!("https://{domain}") 242 + }] 243 + })) 244 + } 245 + 209 246 #[derive(Template, Serialize, Deserialize)] 210 247 #[template(path = "hello.html.j2")] 211 248 struct HelloReponse { ··· 365 402 Ok(acceptable( 366 403 accept, 367 404 GetLinksCountResponse { 405 + total, 406 + query: (*query).clone(), 407 + }, 408 + )) 409 + } 410 + 411 + #[derive(Clone, Deserialize)] 412 + struct GetItemsCountQuery { 413 + subject: String, 414 + source: String, 415 + } 416 + #[derive(Template, Serialize)] 417 + #[template(path = "get-backlinks-count.html.j2")] 418 + struct GetItemsCountResponse { 419 + total: u64, 420 + #[serde(skip_serializing)] 421 + query: GetItemsCountQuery, 422 + } 423 + fn get_backlink_counts( 424 + accept: ExtractAccept, 425 + query: axum_extra::extract::Query<GetItemsCountQuery>, 426 + store: impl LinkReader, 427 + ) -> Result<impl IntoResponse, http::StatusCode> { 428 + let Some((collection, path)) = query.source.split_once(':') else { 429 + return Err(http::StatusCode::BAD_REQUEST); 430 + }; 431 + let path = format!(".{path}"); 432 + let total = store 433 + .get_count(&query.subject, collection, &path) 434 + .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 435 + 436 + Ok(acceptable( 437 + accept, 438 + GetItemsCountResponse { 368 439 total, 369 440 query: (*query).clone(), 370 441 },
constellation/src/storage/mem_store.rs

This file has not been changed.

constellation/src/storage/mod.rs

This file has not been changed.

constellation/src/storage/rocks_store.rs

This file has not been changed.

constellation/templates/get-many-to-many.html.j2

This file has not been changed.

+14 -1
constellation/templates/hello.html.j2
··· 140 140 {% call try_it::dids("at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r", "app.bsky.feed.like", ".subject.uri") %} 141 141 142 142 143 - <h3 class="route"><code>GET /links/count</code></h3> 143 + <h3 class="route deprecated"><code>[deprecated] GET /links/count</code></h3> 144 144 145 145 <p>The total number of links pointing at a given target.</p> 146 146 ··· 156 156 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 157 157 {% call try_it::links_count("did:plc:vc7f4oafdgxsihk4cry2xpze", "app.bsky.graph.block", ".subject") %} 158 158 159 + <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getBacklinksCount</code></h3> 160 + 161 + <p>The total number of links pointing at a given target.</p> 162 + 163 + <h4>Query parameters:</h4> 164 + 165 + <ul> 166 + <li><code>subject</code>: required, must url-encode. The target being linked to. Example: <code>did:plc:vc7f4oafdgxsihk4cry2xpze</code> or <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></li> 167 + <li><code>source</code>: required. Collection and path specification for the primary link. Example: <code>app.bsky.feed.like:subject.uri</code></li> 168 + </ul> 169 + 170 + <p style="margin-bottom: 0"><strong>Try it:</strong></p> 171 + {% call try_it::get_backlinks_count("did:plc:vc7f4oafdgxsihk4cry2xpze", "app.bsky.graph.block:subject") %} 159 172 160 173 <h3 class="route"><code>GET /links/count/distinct-dids</code></h3> 161 174
+7
constellation/templates/try-it-macros.html.j2
··· 134 134 </form> 135 135 {% endmacro %} 136 136 137 + {% macro get_backlinks_count(subject, source) %} 138 + <form method="get" action="/xrpc/blue.microcosm.links.getBacklinksCount"> 139 + <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinksCount 140 + ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="subject" /> 141 + &source= <input type="text" name="source" value="{{ source }}" placeholder="source" /> <button type="submit">get links count</button></pre> 142 + </form> 143 + {% endmacro %} 137 144 138 145 {% macro links_count(target, collection, path) %} 139 146 <form method="get" action="/links/count">
+38
lexicons/blue.microcosm/links/getBacklinksCount.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "blue.microcosm.links.getBacklinksCount", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "count records that link to another record", 8 + "parameters": { 9 + "type": "params", 10 + "required": ["subject", "source"], 11 + "properties": { 12 + "subject": { 13 + "type": "string", 14 + "format": "uri", 15 + "description": "the primary target being linked to (at-uri, did, or uri)" 16 + }, 17 + "source": { 18 + "type": "string", 19 + "description": "collection and path specification for the primary link" 20 + } 21 + } 22 + }, 23 + "output": { 24 + "encoding": "application/json", 25 + "schema": { 26 + "type": "object", 27 + "required": ["total"], 28 + "properties": { 29 + "total": { 30 + "type": "integer", 31 + "description": "total number of matching links" 32 + } 33 + } 34 + } 35 + } 36 + } 37 + } 38 + }
lexicons/blue.microcosm/links/getManyToMany.json

This file has not been changed.

+1 -1
pocket/Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 atrium-crypto = "0.1.2" 8 + clap = { workspace = true } 8 - clap = { version = "4.5.41", features = ["derive"] } 9 9 jwt-compact = { git = "https://github.com/fatfingers23/jwt-compact.git", features = ["es256k"] } 10 10 log = "0.4.27" 11 11 poem = { version = "3.1.12", features = ["acme", "static-files"] }
+1 -1
quasar/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 + clap = { workspace = true } 7 - clap = { version = "4.5.46", features = ["derive"] } 8 8 fjall = "2.11.2"
+1 -1
readme.md
··· 10 10 Tutorials, how-to guides, and client SDK libraries are all in the works for gentler on-ramps, but are not quite ready yet. But don't let that stop you! Hop in the [microcosm discord](https://discord.gg/tcDfe4PGVB), or post questions and tag [@bad-example.com](https://bsky.app/profile/bad-example.com) on Bluesky if you get stuck anywhere. 11 11 12 12 > [!tip] 13 + > This repository's primary home is moving to tangled: [@microcosm.blue/microcosm-rs](https://tangled.org/microcosm.blue/microcosm-rs). It will continue to be mirrored on [github](https://github.com/at-microcosm/microcosm-rs) for the forseeable future, and it's fine to open issues or pulls in either place! 13 - > This repository's primary home is moving to tangled: [@microcosm.blue/microcosm-rs](https://tangled.sh/@microcosm.blue/microcosm-rs). It will continue to be mirrored on [github](https://github.com/at-microcosm/microcosm-rs) for the forseeable future, and it's fine to open issues or pulls in either place! 14 14 15 15 16 16 ๐ŸŒŒ [Constellation](./constellation/)
+1 -1
reflector/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 + clap = { workspace = true } 7 - clap = { version = "4.5.47", features = ["derive"] } 8 8 log = "0.4.28" 9 9 poem = "3.1.12" 10 10 serde = { version = "1.0.219", features = ["derive"] }
+2 -2
slingshot/Cargo.toml
··· 8 8 atrium-common = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 9 9 atrium-identity = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 10 10 atrium-oauth = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 11 + clap = { workspace = true } 11 - clap = { version = "4.5.41", features = ["derive"] } 12 12 ctrlc = "3.4.7" 13 + foyer = { version = "0.22.3", features = ["serde"] } 13 - foyer = { version = "0.18.0", features = ["serde"] } 14 14 hickory-resolver = "0.25.2" 15 15 jetstream = { path = "../jetstream", features = ["metrics"] } 16 16 links = { path = "../links" }
+2 -2
slingshot/api-description.md
··· 1 1 _A [gravitational slingshot](https://en.wikipedia.org/wiki/Gravity_assist) makes use of the gravity and relative movements of celestial bodies to accelerate a spacecraft and change its trajectory._ 2 2 3 3 4 + # Slingshot: edge record and identity cache 4 - # Slingshot: edge record cache 5 5 6 6 Applications in [ATProtocol](https://atproto.com/) store data in users' own [PDS](https://atproto.com/guides/self-hosting) (Personal Data Server), which are distributed across thousands of independently-run servers all over the world. Trying to access this data poses challenges for client applications: 7 7 ··· 90 90 - [๐ŸŽ‡ Spacedust](https://spacedust.microcosm.blue/), a firehose of all social interactions 91 91 92 92 > [!success] 93 + > All microcosm projects are [open source](https://tangled.org/bad-example.com/microcosm-links). **You can help sustain Slingshot** and all of microcosm by becoming a [Github sponsor](https://github.com/sponsors/uniphil/) or a [Ko-fi supporter](https://ko-fi.com/bad_example)! 93 - > All microcosm projects are [open source](https://tangled.sh/@bad-example.com/microcosm-links). **You can help sustain Slingshot** and all of microcosm by becoming a [Github sponsor](https://github.com/sponsors/uniphil/) or a [Ko-fi supporter](https://ko-fi.com/bad_example)!
+34
slingshot/readme.md
··· 5 5 ```bash 6 6 RUST_LOG=info,slingshot=trace ulimit -n 4096 && RUST_LOG=info cargo run -- --jetstream us-east-1 --cache-dir ./foyer 7 7 ``` 8 + 9 + the identity cache uses a lot of files so you probably need to bump ulimit 10 + 11 + on macos: 12 + 13 + ```bash 14 + ulimit -n 4096 15 + ``` 16 + 17 + ## prod deploy 18 + 19 + you **must** setcap the binary to run it on apollo!!!! 20 + 21 + ```bash 22 + sudo setcap CAP_NET_BIND_SERVICE=+eip ../target/release/slingshot 23 + ``` 24 + 25 + then run with 26 + 27 + ```bash 28 + RUST_BACKTRACE=1 RUST_LOG=info,slingshot=trace /home/ubuntu/links/target/release/slingshot \ 29 + --jetstream wss://jetstream1.us-east.fire.hose.cam/subscribe \ 30 + --healthcheck https://hc-ping.com/[REDACTED] \ 31 + --cache-dir ./foyer \ 32 + --record-cache-memory-mb 2048 \ 33 + --record-cache-disk-gb 32 \ 34 + --identity-cache-memory-mb 1024 \ 35 + --identity-cache-disk-gb 8 \ 36 + --collect-metrics \ 37 + --acme-ipv6 \ 38 + --acme-domain slingshot.microcosm.blue \ 39 + --acme-contact phil@bad-example.com \ 40 + --acme-cache-path /home/ubuntu/certs 41 + ```
+41 -25
slingshot/src/consumer.rs
··· 1 - use crate::CachedRecord; 2 1 use crate::error::ConsumerError; 2 + use crate::{CachedRecord, Identity, IdentityKey}; 3 3 use foyer::HybridCache; 4 4 use jetstream::{ 5 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 11 jetstream_endpoint: String, 12 12 cursor: Option<Cursor>, 13 13 no_zstd: bool, 14 + identity: Identity, 14 15 shutdown: CancellationToken, 15 16 cache: HybridCache<String, CachedRecord>, 16 17 ) -> Result<(), ConsumerError> { ··· 46 47 break; 47 48 }; 48 49 50 + match event.kind { 51 + EventKind::Commit => { 52 + let Some(ref mut commit) = event.commit else { 53 + log::warn!("consumer: commit event missing commit data, ignoring"); 54 + continue; 55 + }; 49 - if event.kind != EventKind::Commit { 50 - continue; 51 - } 52 - let Some(ref mut commit) = event.commit else { 53 - log::warn!("consumer: commit event missing commit data, ignoring"); 54 - continue; 55 - }; 56 56 57 + // TODO: something a bit more robust 58 + let at_uri = format!( 59 + "at://{}/{}/{}", 60 + &*event.did, &*commit.collection, &*commit.rkey 61 + ); 57 - // TODO: something a bit more robust 58 - let at_uri = format!( 59 - "at://{}/{}/{}", 60 - &*event.did, &*commit.collection, &*commit.rkey 61 - ); 62 62 63 + if commit.operation == CommitOp::Delete { 64 + cache.insert(at_uri, CachedRecord::Deleted); 65 + } else { 66 + let Some(record) = commit.record.take() else { 67 + log::warn!("consumer: commit insert or update missing record, ignoring"); 68 + continue; 69 + }; 70 + let Some(cid) = commit.cid.take() else { 71 + log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 + continue; 73 + }; 63 - if commit.operation == CommitOp::Delete { 64 - cache.insert(at_uri, CachedRecord::Deleted); 65 - } else { 66 - let Some(record) = commit.record.take() else { 67 - log::warn!("consumer: commit insert or update missing record, ignoring"); 68 - continue; 69 - }; 70 - let Some(cid) = commit.cid.take() else { 71 - log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 - continue; 73 - }; 74 74 75 + cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 + } 77 + } 78 + EventKind::Identity => { 79 + let Some(ident) = event.identity else { 80 + log::warn!("consumer: identity event missing identity data, ignoring"); 81 + continue; 82 + }; 83 + if let Some(handle) = ident.handle { 84 + metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1); 85 + identity.queue_refresh(IdentityKey::Handle(handle)).await; 86 + } 87 + metrics::counter!("identity_did_refresh_queued", "reason" => "identity event") 88 + .increment(1); 89 + identity.queue_refresh(IdentityKey::Did(ident.did)).await; 90 + } 91 + EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete) 75 - cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 92 } 77 93 } 78 94
+21 -9
slingshot/src/firehose_cache.rs
··· 1 1 use crate::CachedRecord; 2 + use foyer::{ 3 + BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder, 4 + PsyncIoEngineConfig, 5 + }; 2 - use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 3 6 use std::path::Path; 4 7 5 8 pub async fn firehose_cache( 6 9 cache_dir: impl AsRef<Path>, 10 + memory_mb: usize, 11 + disk_gb: usize, 7 12 ) -> Result<HybridCache<String, CachedRecord>, String> { 13 + let device = FsDeviceBuilder::new(cache_dir) 14 + .with_capacity(disk_gb * 2_usize.pow(30)) 15 + .build() 16 + .map_err(|e| format!("foyer device setup error: {e}"))?; 17 + 18 + let engine = BlockEngineConfig::new(device).with_block_size(16 * 2_usize.pow(20)); // note: this does limit the max cached item size 19 + 8 20 let cache = HybridCacheBuilder::new() 9 21 .with_name("firehose") 22 + .memory(memory_mb * 2_usize.pow(20)) 23 + .with_weighter(|k: &String, v: &CachedRecord| { 24 + std::mem::size_of_val(k.as_str()) + v.weight() 25 + }) 26 + .storage() 27 + .with_io_engine_config(PsyncIoEngineConfig::default()) 28 + .with_engine_config(engine) 10 - .memory(64 * 2_usize.pow(20)) 11 - .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 - .storage(Engine::large()) 13 - .with_device_options( 14 - DirectFsDeviceOptions::new(cache_dir) 15 - .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 16 - .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 17 - ) 18 29 .build() 19 30 .await 20 31 .map_err(|e| format!("foyer setup error: {e:?}"))?; 32 + 21 33 Ok(cache) 22 34 }
+111 -49
slingshot/src/identity.rs
··· 11 11 /// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param 12 12 /// 2. DID -> PDS resolution: so we know where to getRecord 13 13 /// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14 + use std::time::{Duration, Instant}; 14 - use std::time::Duration; 15 15 use tokio::sync::Mutex; 16 16 use tokio_util::sync::CancellationToken; 17 17 ··· 26 26 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 27 27 }; 28 28 use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but 29 + use foyer::{ 30 + BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder, 31 + PsyncIoEngineConfig, 32 + }; 29 - use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 30 33 use serde::{Deserialize, Serialize}; 31 34 use time::UtcDateTime; 32 35 ··· 35 38 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 36 39 37 40 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 + pub enum IdentityKey { 38 - enum IdentityKey { 39 42 Handle(Handle), 40 43 Did(Did), 41 44 } 42 45 46 + impl IdentityKey { 47 + fn weight(&self) -> usize { 48 + let s = match self { 49 + IdentityKey::Handle(h) => h.as_str(), 50 + IdentityKey::Did(d) => d.as_str(), 51 + }; 52 + std::mem::size_of::<Self>() + std::mem::size_of_val(s) 53 + } 54 + } 55 + 43 56 #[derive(Debug, Serialize, Deserialize)] 44 57 struct IdentityVal(UtcDateTime, IdentityData); 45 58 ··· 50 63 Doc(PartialMiniDoc), 51 64 } 52 65 66 + impl IdentityVal { 67 + fn weight(&self) -> usize { 68 + let wrapping = std::mem::size_of::<Self>(); 69 + let inner = match &self.1 { 70 + IdentityData::NotFound => 0, 71 + IdentityData::Did(d) => std::mem::size_of_val(d.as_str()), 72 + IdentityData::Doc(d) => { 73 + std::mem::size_of_val(d.unverified_handle.as_str()) 74 + + std::mem::size_of_val(d.pds.as_str()) 75 + + std::mem::size_of_val(d.signing_key.as_str()) 76 + } 77 + }; 78 + wrapping + inner 79 + } 80 + } 81 + 53 82 /// partial representation of a com.bad-example.identity mini atproto doc 54 83 /// 55 84 /// partial because the handle is not verified ··· 86 115 let Some(maybe_handle) = aka.strip_prefix("at://") else { 87 116 continue; 88 117 }; 118 + let Ok(valid_handle) = Handle::new(maybe_handle.to_lowercase()) else { 89 - let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 90 119 continue; 91 120 }; 92 121 unverified_handle = Some(valid_handle); ··· 157 186 /// multi-producer *single consumer* queue 158 187 refresh_queue: Arc<Mutex<RefreshQueue>>, 159 188 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 + refresher_task: Arc<Mutex<()>>, 160 - refresher: Arc<Mutex<()>>, 161 190 } 162 191 163 192 impl Identity { 193 + pub async fn new( 194 + cache_dir: impl AsRef<Path>, 195 + memory_mb: usize, 196 + disk_gb: usize, 197 + ) -> Result<Self, IdentityError> { 164 - pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> { 165 198 let http_client = Arc::new(DefaultHttpClient::default()); 166 199 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 167 200 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(), ··· 171 204 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(), 172 205 http_client: http_client.clone(), 173 206 }); 207 + 208 + let device = FsDeviceBuilder::new(cache_dir) 209 + .with_capacity(disk_gb * 2_usize.pow(30)) 210 + .build()?; 211 + let engine = BlockEngineConfig::new(device).with_block_size(2_usize.pow(20)); // note: this does limit the max cached item size 174 212 175 213 let cache = HybridCacheBuilder::new() 176 214 .with_name("identity") 215 + .memory(memory_mb * 2_usize.pow(20)) 216 + .with_weighter(|k: &IdentityKey, v: &IdentityVal| k.weight() + v.weight()) 217 + .storage() 218 + .with_io_engine_config(PsyncIoEngineConfig::default()) 219 + .with_engine_config(engine) 177 - .memory(16 * 2_usize.pow(20)) 178 - .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 179 - .storage(Engine::small()) 180 - .with_device_options( 181 - DirectFsDeviceOptions::new(cache_dir) 182 - .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 183 - .with_file_size(2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 184 - ) 185 220 .build() 186 221 .await?; 187 222 ··· 190 225 did_resolver: Arc::new(did_resolver), 191 226 cache, 192 227 refresh_queue: Default::default(), 228 + refresher_task: Default::default(), 193 - refresher: Default::default(), 194 229 }) 195 230 } 196 231 ··· 229 264 handle: &Handle, 230 265 ) -> Result<Option<Did>, IdentityError> { 231 266 let key = IdentityKey::Handle(handle.clone()); 267 + metrics::counter!("slingshot_get_handle").increment(1); 232 268 let entry = self 233 269 .cache 270 + .get_or_fetch(&key, { 234 - .fetch(key.clone(), { 235 271 let handle = handle.clone(); 236 272 let resolver = self.handle_resolver.clone(); 237 273 || async move { 274 + let t0 = Instant::now(); 275 + let (res, success) = match resolver.resolve(&handle).await { 276 + Ok(did) => ( 277 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 278 + "true", 279 + ), 280 + Err(atrium_identity::Error::NotFound) => ( 281 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)), 282 + "false", 283 + ), 284 + Err(other) => { 285 + log::debug!("other error resolving handle: {other:?}"); 286 + (Err(IdentityError::ResolutionFailed(other)), "false") 238 - match resolver.resolve(&handle).await { 239 - Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 240 - Err(atrium_identity::Error::NotFound) => { 241 - Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 242 287 } 288 + }; 289 + metrics::histogram!("slingshot_fetch_handle", "success" => success) 290 + .record(t0.elapsed()); 291 + res 243 - Err(other) => Err(foyer::Error::Other(Box::new({ 244 - log::debug!("other error resolving handle: {other:?}"); 245 - IdentityError::ResolutionFailed(other) 246 - }))), 247 - } 248 292 } 249 293 }) 250 294 .await?; ··· 258 302 } 259 303 IdentityData::NotFound => { 260 304 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 305 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 261 306 self.queue_refresh(key).await; 262 307 } 263 308 Ok(None) 264 309 } 265 310 IdentityData::Did(did) => { 266 311 if (now - *last_fetch) >= MIN_TTL { 312 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 267 313 self.queue_refresh(key).await; 268 314 } 269 315 Ok(Some(did.clone())) ··· 277 323 did: &Did, 278 324 ) -> Result<Option<PartialMiniDoc>, IdentityError> { 279 325 let key = IdentityKey::Did(did.clone()); 326 + metrics::counter!("slingshot_get_did_doc").increment(1); 280 327 let entry = self 281 328 .cache 329 + .get_or_fetch(&key, { 282 - .fetch(key.clone(), { 283 330 let did = did.clone(); 284 331 let resolver = self.did_resolver.clone(); 285 332 || async move { 333 + let t0 = Instant::now(); 334 + let (res, success) = match resolver.resolve(&did).await { 335 + Ok(did_doc) if did_doc.id != did.to_string() => ( 286 - match resolver.resolve(&did).await { 287 - Ok(did_doc) => { 288 336 // TODO: fix in atrium: should verify id is did 337 + Err(IdentityError::BadDidDoc( 338 + "did doc's id did not match did".to_string(), 339 + )), 340 + "false", 341 + ), 342 + Ok(did_doc) => match did_doc.try_into() { 343 + Ok(mini_doc) => ( 344 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))), 345 + "true", 346 + ), 347 + Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"), 348 + }, 349 + Err(atrium_identity::Error::NotFound) => ( 350 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)), 351 + "false", 352 + ), 353 + Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"), 354 + }; 355 + metrics::histogram!("slingshot_fetch_did_doc", "success" => success) 356 + .record(t0.elapsed()); 357 + res 289 - if did_doc.id != did.to_string() { 290 - return Err(foyer::Error::other(Box::new( 291 - IdentityError::BadDidDoc( 292 - "did doc's id did not match did".to_string(), 293 - ), 294 - ))); 295 - } 296 - let mini_doc = did_doc.try_into().map_err(|e| { 297 - foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e))) 298 - })?; 299 - Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))) 300 - } 301 - Err(atrium_identity::Error::NotFound) => { 302 - Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 303 - } 304 - Err(other) => Err(foyer::Error::Other(Box::new( 305 - IdentityError::ResolutionFailed(other), 306 - ))), 307 - } 308 358 } 309 359 }) 310 360 .await?; ··· 318 368 } 319 369 IdentityData::NotFound => { 320 370 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 371 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 321 372 self.queue_refresh(key).await; 322 373 } 323 374 Ok(None) 324 375 } 325 376 IdentityData::Doc(mini_did) => { 326 377 if (now - *last_fetch) >= MIN_TTL { 378 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 327 379 self.queue_refresh(key).await; 328 380 } 329 381 Ok(Some(mini_did.clone())) ··· 334 386 /// put a refresh task on the queue 335 387 /// 336 388 /// this can be safely called from multiple concurrent tasks 389 + pub async fn queue_refresh(&self, key: IdentityKey) { 337 - async fn queue_refresh(&self, key: IdentityKey) { 338 390 // todo: max queue size 339 391 let mut q = self.refresh_queue.lock().await; 340 392 if !q.items.contains(&key) { ··· 411 463 /// run the refresh queue consumer 412 464 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 413 465 let _guard = self 466 + .refresher_task 414 - .refresher 415 467 .try_lock() 416 468 .expect("there to only be one refresher running"); 417 469 loop { ··· 433 485 log::trace!("refreshing handle {handle:?}"); 434 486 match self.handle_resolver.resolve(handle).await { 435 487 Ok(did) => { 488 + metrics::counter!("identity_handle_refresh", "success" => "true") 489 + .increment(1); 436 490 self.cache.insert( 437 491 task_key.clone(), 438 492 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 439 493 ); 440 494 } 441 495 Err(atrium_identity::Error::NotFound) => { 496 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1); 442 497 self.cache.insert( 443 498 task_key.clone(), 444 499 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 445 500 ); 446 501 } 447 502 Err(err) => { 503 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1); 448 504 log::warn!( 449 505 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 450 506 ); ··· 459 515 Ok(did_doc) => { 460 516 // TODO: fix in atrium: should verify id is did 461 517 if did_doc.id != did.to_string() { 518 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 462 519 log::warn!( 463 520 "refreshed did doc failed: wrong did doc id. dropping refresh." 464 521 ); ··· 467 524 let mini_doc = match did_doc.try_into() { 468 525 Ok(md) => md, 469 526 Err(e) => { 527 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 470 528 log::warn!( 471 529 "converting mini doc failed: {e:?}. dropping refresh." 472 530 ); 473 531 continue; 474 532 } 475 533 }; 534 + metrics::counter!("identity_did_refresh", "success" => "true") 535 + .increment(1); 476 536 self.cache.insert( 477 537 task_key.clone(), 478 538 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 479 539 ); 480 540 } 481 541 Err(atrium_identity::Error::NotFound) => { 542 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1); 482 543 self.cache.insert( 483 544 task_key.clone(), 484 545 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 485 546 ); 486 547 } 487 548 Err(err) => { 549 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1); 488 550 log::warn!( 489 551 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 490 552 );
+1 -1
slingshot/src/lib.rs
··· 9 9 pub use consumer::consume; 10 10 pub use firehose_cache::firehose_cache; 11 11 pub use healthcheck::healthcheck; 12 + pub use identity::{Identity, IdentityKey}; 12 - pub use identity::Identity; 13 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 14 pub use server::serve;
+86 -31
slingshot/src/main.rs
··· 4 4 use slingshot::{ 5 5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6 6 }; 7 + use std::net::SocketAddr; 7 8 use std::path::PathBuf; 8 9 9 10 use clap::Parser; ··· 15 16 struct Args { 16 17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 17 18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 + #[arg(long, env = "SLINGSHOT_JETSTREAM")] 18 - #[arg(long)] 19 20 jetstream: String, 20 21 /// don't request zstd-compressed jetstream events 21 22 /// 22 23 /// reduces CPU at the expense of more ingress bandwidth 24 + #[arg(long, action, env = "SLINGSHOT_JETSTREAM_NO_ZSTD")] 23 - #[arg(long, action)] 24 25 jetstream_no_zstd: bool, 25 26 /// where to keep disk caches 27 + #[arg(long, env = "SLINGSHOT_CACHE_DIR")] 26 - #[arg(long)] 27 28 cache_dir: PathBuf, 29 + /// where to listen for incomming requests 30 + /// 31 + /// cannot be used with acme -- if you need ipv6 see --acme-ipv6 32 + #[arg(long, env = "SLINGSHOT_BIND")] 33 + #[clap(default_value = "0.0.0.0:8080")] 34 + bind: SocketAddr, 35 + /// memory cache size in megabytes for records 36 + #[arg(long, env = "SLINGSHOT_RECORD_CACHE_MEMORY_MB")] 37 + #[clap(default_value_t = 64)] 38 + record_cache_memory_mb: usize, 39 + /// disk cache size in gigabytes for records 40 + #[arg(long, env = "SLINGSHOT_RECORD_CACHE_DISK_DB")] 41 + #[clap(default_value_t = 1)] 42 + record_cache_disk_gb: usize, 43 + /// memory cache size in megabytes for identities 44 + #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_MEMORY_MB")] 45 + #[clap(default_value_t = 64)] 46 + identity_cache_memory_mb: usize, 47 + /// disk cache size in gigabytes for identities 48 + #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")] 49 + #[clap(default_value_t = 1)] 50 + identity_cache_disk_gb: usize, 28 51 /// the domain pointing to this server 29 52 /// 30 53 /// if present: 31 54 /// - a did:web document will be served at /.well-known/did.json 32 55 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 33 56 /// - TODO: a rate-limiter will be installed 57 + #[arg( 58 + long, 59 + conflicts_with("bind"), 60 + requires("acme_cache_path"), 61 + env = "SLINGSHOT_ACME_DOMAIN" 62 + )] 63 + acme_domain: Option<String>, 34 - #[arg(long)] 35 - domain: Option<String>, 36 64 /// email address for letsencrypt contact 37 65 /// 38 66 /// recommended in production, i guess? 67 + #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")] 39 - #[arg(long)] 40 68 acme_contact: Option<String>, 41 69 /// a location to cache acme https certs 42 70 /// 71 + /// required when (and only used when) --acme-domain is specified. 43 - /// only used if --host is specified. omitting requires re-requesting certs 44 - /// on every restart, and letsencrypt has rate limits that are easy to hit. 45 72 /// 46 73 /// recommended in production, but mind the file permissions. 74 + #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")] 75 + acme_cache_path: Option<PathBuf>, 76 + /// listen for ipv6 when using acme 77 + /// 78 + /// you must also configure the relevant DNS records for this to work 79 + #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")] 80 + acme_ipv6: bool, 47 - #[arg(long)] 48 - certs: Option<PathBuf>, 49 81 /// an web address to send healtcheck pings to every ~51s or so 82 + #[arg(long, env = "SLINGSHOT_HEALTHCHECK")] 50 - #[arg(long)] 51 83 healthcheck: Option<String>, 84 + /// enable metrics collection and serving 85 + #[arg(long, action, env = "SLINGSHOT_COLLECT_METRICS")] 86 + collect_metrics: bool, 87 + /// metrics server's listen address 88 + #[arg(long, requires("collect_metrics"), env = "SLINGSHOT_BIND_METRICS")] 89 + #[clap(default_value = "[::]:8765")] 90 + bind_metrics: std::net::SocketAddr, 52 91 } 53 92 54 93 #[tokio::main] ··· 62 101 63 102 let args = Args::parse(); 64 103 104 + if args.collect_metrics { 105 + log::trace!("installing metrics server..."); 106 + if let Err(e) = install_metrics_server(args.bind_metrics) { 107 + log::error!("failed to install metrics server: {e:?}"); 108 + } else { 109 + log::info!("metrics listening at http://{}", args.bind_metrics); 110 + } 65 - if let Err(e) = install_metrics_server() { 66 - log::error!("failed to install metrics server: {e:?}"); 67 - } else { 68 - log::info!("metrics listening at http://0.0.0.0:8765"); 69 111 } 70 112 71 113 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 83 125 log::info!("cache dir ready at at {cache_dir:?}."); 84 126 85 127 log::info!("setting up firehose cache..."); 128 + let cache = firehose_cache( 129 + cache_dir.join("./firehose"), 130 + args.record_cache_memory_mb, 131 + args.record_cache_disk_gb, 132 + ) 133 + .await?; 86 - let cache = firehose_cache(cache_dir.join("./firehose")).await?; 87 134 log::info!("firehose cache ready."); 88 135 89 136 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 90 137 91 138 log::info!("starting identity service..."); 139 + let identity = Identity::new( 140 + cache_dir.join("./identity"), 141 + args.identity_cache_memory_mb, 142 + args.identity_cache_disk_gb, 143 + ) 144 + .await 145 + .map_err(|e| format!("identity setup failed: {e:?}"))?; 146 + 92 - let identity = Identity::new(cache_dir.join("./identity")) 93 - .await 94 - .map_err(|e| format!("identity setup failed: {e:?}"))?; 95 147 log::info!("identity service ready."); 96 148 let identity_refresher = identity.clone(); 97 149 let identity_shutdown = shutdown.clone(); ··· 102 154 103 155 let repo = Repo::new(identity.clone()); 104 156 157 + let identity_for_server = identity.clone(); 105 158 let server_shutdown = shutdown.clone(); 106 159 let server_cache_handle = cache.clone(); 160 + let bind = args.bind; 107 161 tasks.spawn(async move { 108 162 serve( 109 163 server_cache_handle, 164 + identity_for_server, 110 - identity, 111 165 repo, 166 + args.acme_domain, 112 - args.domain, 113 167 args.acme_contact, 168 + args.acme_cache_path, 169 + args.acme_ipv6, 114 - args.certs, 115 170 server_shutdown, 171 + bind, 116 172 ) 117 173 .await?; 118 174 Ok(()) 119 175 }); 120 176 177 + let identity_refreshable = identity.clone(); 121 178 let consumer_shutdown = shutdown.clone(); 122 179 let consumer_cache = cache.clone(); 123 180 tasks.spawn(async move { ··· 125 182 args.jetstream, 126 183 None, 127 184 args.jetstream_no_zstd, 185 + identity_refreshable, 128 186 consumer_shutdown, 129 187 consumer_cache, 130 188 ) ··· 172 230 Ok(()) 173 231 } 174 232 233 + fn install_metrics_server( 234 + bind_metrics: std::net::SocketAddr, 235 + ) -> Result<(), metrics_exporter_prometheus::BuildError> { 175 - fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 176 236 log::info!("installing metrics server..."); 177 - let host = [0, 0, 0, 0]; 178 - let port = 8765; 179 237 PrometheusBuilder::new() 180 238 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 181 239 .set_bucket_duration(std::time::Duration::from_secs(300))? 182 240 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 183 241 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 242 + .with_http_listener(bind_metrics) 184 - .with_http_listener((host, port)) 185 243 .install()?; 186 244 log::info!( 245 + "metrics server installed! listening on http://{}", 246 + bind_metrics 187 - "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 188 - host[0], 189 - host[1], 190 - host[2], 191 - host[3] 192 247 ); 193 248 Ok(()) 194 249 }
+12 -1
slingshot/src/record.rs
··· 42 42 Deleted, 43 43 } 44 44 45 + impl CachedRecord { 46 + pub(crate) fn weight(&self) -> usize { 47 + let wrapping = std::mem::size_of::<Self>(); 48 + let inner = match self { 49 + CachedRecord::Found(RawRecord { record, .. }) => std::mem::size_of_val(record.as_str()), 50 + _ => 0, 51 + }; 52 + wrapping + inner 53 + } 54 + } 55 + 45 56 //////// upstream record fetching 46 57 47 58 #[derive(Deserialize)] ··· 72 83 pub fn new(identity: Identity) -> Self { 73 84 let client = Client::builder() 74 85 .user_agent(format!( 86 + "microcosm slingshot v{} (contact: @bad-example.com)", 75 - "microcosm slingshot v{} (dev: @bad-example.com)", 76 87 env!("CARGO_PKG_VERSION") 77 88 )) 78 89 .no_proxy()
+103 -28
slingshot/src/server.rs
··· 9 9 use std::path::PathBuf; 10 10 use std::str::FromStr; 11 11 use std::sync::Arc; 12 + use std::time::Instant; 12 13 use tokio_util::sync::CancellationToken; 13 14 14 15 use poem::{ 16 + Endpoint, EndpointExt, IntoResponse, Route, Server, 15 - Endpoint, EndpointExt, Route, Server, 16 17 endpoint::{StaticFileEndpoint, make_sync}, 17 18 http::Method, 18 19 listener::{ ··· 288 289 self.get_record_impl(repo, collection, rkey, cid).await 289 290 } 290 291 292 + /// blue.microcosm.repo.getRecordByUri 293 + /// 294 + /// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name 295 + #[oai( 296 + path = "/blue.microcosm.repo.getRecordByUri", 297 + method = "get", 298 + tag = "ApiTags::Custom" 299 + )] 300 + async fn get_record_by_uri( 301 + &self, 302 + /// The at-uri of the record 303 + /// 304 + /// The identifier can be a DID or an atproto handle, and the collection 305 + /// and rkey segments must be present. 306 + #[oai(example = "example_uri")] 307 + Query(at_uri): Query<String>, 308 + /// Optional: the CID of the version of the record. 309 + /// 310 + /// If not specified, then return the most recent version. 311 + /// 312 + /// > [!tip] 313 + /// > If specified and a newer version of the record exists, returns 404 not 314 + /// > found. That is: slingshot only retains the most recent version of a 315 + /// > record. 316 + Query(cid): Query<Option<String>>, 317 + ) -> GetRecordResponse { 318 + self.get_uri_record(Query(at_uri), Query(cid)).await 319 + } 320 + 291 321 /// com.bad-example.repo.getUriRecord 292 322 /// 293 323 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) ··· 375 405 #[oai(example = "example_handle")] 376 406 Query(handle): Query<String>, 377 407 ) -> JustDidResponse { 408 + let Ok(handle) = Handle::new(handle.to_lowercase()) else { 378 - let Ok(handle) = Handle::new(handle) else { 379 409 return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle")); 380 410 }; 381 411 ··· 413 443 })) 414 444 } 415 445 446 + /// blue.microcosm.identity.resolveMiniDoc 447 + /// 448 + /// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name 449 + #[oai( 450 + path = "/blue.microcosm.identity.resolveMiniDoc", 451 + method = "get", 452 + tag = "ApiTags::Custom" 453 + )] 454 + async fn resolve_mini_doc( 455 + &self, 456 + /// Handle or DID to resolve 457 + #[oai(example = "example_handle")] 458 + Query(identifier): Query<String>, 459 + ) -> ResolveMiniIDResponse { 460 + self.resolve_mini_id(Query(identifier)).await 461 + } 462 + 416 463 /// com.bad-example.identity.resolveMiniDoc 417 464 /// 418 465 /// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity) ··· 436 483 let did = match Did::new(identifier.clone()) { 437 484 Ok(did) => did, 438 485 Err(_) => { 486 + let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 439 - let Ok(alleged_handle) = Handle::new(identifier) else { 440 487 return invalid("Identifier was not a valid DID or handle"); 441 488 }; 442 489 ··· 513 560 let did = match Did::new(repo.clone()) { 514 561 Ok(did) => did, 515 562 Err(_) => { 563 + let Ok(handle) = Handle::new(repo.to_lowercase()) else { 516 - let Ok(handle) = Handle::new(repo) else { 517 564 return GetRecordResponse::BadRequest(xrpc_error( 518 565 "InvalidRequest", 519 566 "Repo was not a valid DID or handle", ··· 563 610 564 611 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey); 565 612 613 + metrics::counter!("slingshot_get_record").increment(1); 566 614 let fr = self 567 615 .cache 616 + .get_or_fetch(&at_uri, { 568 - .fetch(at_uri.clone(), { 569 617 let cid = cid.clone(); 570 618 let repo_api = self.repo.clone(); 571 619 || async move { 620 + let t0 = Instant::now(); 621 + let res = repo_api.get_record(&did, &collection, &rkey, &cid).await; 622 + let success = if res.is_ok() { "true" } else { "false" }; 623 + metrics::histogram!("slingshot_fetch_record", "success" => success) 624 + .record(t0.elapsed()); 625 + res 572 - repo_api 573 - .get_record(&did, &collection, &rkey, &cid) 574 - .await 575 - .map_err(|e| foyer::Error::Other(Box::new(e))) 576 626 } 577 627 }) 578 628 .await; 579 629 580 630 let entry = match fr { 581 631 Ok(e) => e, 632 + Err(e) if e.kind() == foyer::ErrorKind::External => { 633 + let record_error = match e.source().map(|s| s.downcast_ref::<RecordError>()) { 634 + Some(Some(e)) => e, 635 + other => { 636 + if other.is_none() { 637 + log::error!("external error without a source. wat? {e}"); 638 + } else { 639 + log::error!("downcast to RecordError failed...? {e}"); 640 + } 582 - Err(foyer::Error::Other(e)) => { 583 - let record_error = match e.downcast::<RecordError>() { 584 - Ok(e) => e, 585 - Err(e) => { 586 - log::error!("error (foyer other) getting cache entry, {e:?}"); 587 641 return GetRecordResponse::ServerError(xrpc_error( 588 642 "ServerError", 589 643 "sorry, something went wrong", 590 644 )); 591 645 } 592 646 }; 647 + let RecordError::UpstreamBadRequest(ErrorResponseObject { 648 + ref error, 649 + ref message, 650 + }) = *record_error 593 - let RecordError::UpstreamBadRequest(ErrorResponseObject { error, message }) = 594 - *record_error 595 651 else { 596 652 log::error!("RecordError getting cache entry, {record_error:?}"); 597 653 return GetRecordResponse::ServerError(xrpc_error( ··· 643 699 } 644 700 645 701 // TODO 646 - // #[oai(path = "/com.atproto.identity.resolveHandle", method = "get")] 647 702 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")] 648 703 // but these are both not specified to do bidirectional validation, which is what we want to offer 649 704 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc ··· 652 707 // handle -> verified did + pds url 653 708 // 654 709 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc 710 + // but this will *definitely* cause problems probably 711 + // 712 + // resolveMiniDoc gets most of this well enough. 655 - // but this will *definitely* cause problems because eg. we're not currently storing pubkeys and 656 - // those are a little bit important 657 713 } 658 714 659 715 #[derive(Debug, Clone, Serialize)] ··· 687 743 make_sync(move |_| doc.clone()) 688 744 } 689 745 746 + #[allow(clippy::too_many_arguments)] 690 747 pub async fn serve( 691 748 cache: HybridCache<String, CachedRecord>, 692 749 identity: Identity, 693 750 repo: Repo, 751 + acme_domain: Option<String>, 694 - domain: Option<String>, 695 752 acme_contact: Option<String>, 753 + acme_cache_path: Option<PathBuf>, 754 + acme_ipv6: bool, 696 - certs: Option<PathBuf>, 697 755 shutdown: CancellationToken, 756 + bind: std::net::SocketAddr, 698 757 ) -> Result<(), ServerError> { 699 758 let repo = Arc::new(repo); 700 759 let api_service = OpenApiService::new( ··· 706 765 "Slingshot", 707 766 env!("CARGO_PKG_VERSION"), 708 767 ) 768 + .server(if let Some(ref h) = acme_domain { 709 - .server(if let Some(ref h) = domain { 710 769 format!("https://{h}") 711 770 } else { 771 + format!("http://{bind}") // yeah should probably fix this for reverse-proxy scenarios but it's ok for dev for now 712 - "http://localhost:3000".to_string() 713 772 }) 714 773 .url_prefix("/xrpc") 715 774 .contact( ··· 727 786 .nest("/openapi", api_service.spec_endpoint()) 728 787 .nest("/xrpc/", api_service); 729 788 789 + if let Some(domain) = acme_domain { 730 - if let Some(domain) = domain { 731 790 rustls::crypto::aws_lc_rs::default_provider() 732 791 .install_default() 733 792 .expect("alskfjalksdjf"); ··· 740 799 if let Some(contact) = acme_contact { 741 800 auto_cert = auto_cert.contact(contact); 742 801 } 802 + if let Some(cache_path) = acme_cache_path { 803 + auto_cert = auto_cert.cache_path(cache_path); 743 - if let Some(certs) = certs { 744 - auto_cert = auto_cert.cache_path(certs); 745 804 } 746 805 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 747 806 748 807 run( 808 + TcpListener::bind(if acme_ipv6 { "[::]:443" } else { "0.0.0.0:443" }).acme(auto_cert), 749 - TcpListener::bind("0.0.0.0:443").acme(auto_cert), 750 809 app, 751 810 shutdown, 752 811 ) 753 812 .await 754 813 } else { 814 + run(TcpListener::bind(bind), app, shutdown).await 755 - run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await 756 815 } 757 816 } 758 817 ··· 768 827 .allow_credentials(false), 769 828 ) 770 829 .with(CatchPanic::new()) 830 + .around(request_counter) 771 831 .with(Tracing); 832 + 772 833 Server::new(listener) 773 834 .name("slingshot") 774 835 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 776 837 .map_err(ServerError::ServerExited) 777 838 .inspect(|()| log::info!("server ended. goodbye.")) 778 839 } 840 + 841 + async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> { 842 + let t0 = std::time::Instant::now(); 843 + let method = req.method().to_string(); 844 + let path = req.uri().path().to_string(); 845 + let res = next.call(req).await?.into_response(); 846 + metrics::histogram!( 847 + "server_request", 848 + "endpoint" => format!("{method} {path}"), 849 + "status" => res.status().to_string(), 850 + ) 851 + .record(t0.elapsed()); 852 + Ok(res) 853 + }
+2 -2
slingshot/static/index.html
··· 43 43 <body> 44 44 <header class="custom-header scalar-app"> 45 45 <p> 46 + get atproto records and identities faster 46 - TODO: thing 47 47 </p> 48 48 <nav> 49 49 <b>a <a href="https://microcosm.blue">microcosm</a> project</b> 50 50 <a href="https://bsky.app/profile/microcosm.blue">@microcosm.blue</a> 51 + <a href="https://tangled.org/microcosm.blue/microcosm-rs">tangled</a> 51 - <a href="https://github.com/at-microcosm">github</a> 52 52 </nav> 53 53 </header> 54 54
+1 -1
spacedust/Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 async-trait = "0.1.88" 8 + clap = { workspace = true } 8 - clap = { version = "4.5.40", features = ["derive"] } 9 9 ctrlc = "3.4.7" 10 10 dropshot = "0.16.2" 11 11 env_logger = "0.11.8"
+26 -17
spacedust/src/main.rs
··· 16 16 struct Args { 17 17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 18 18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 + #[arg(long, env = "SPACEDUST_JETSTREAM")] 19 - #[arg(long)] 20 20 jetstream: String, 21 21 /// don't request zstd-compressed jetstream events 22 22 /// 23 23 /// reduces CPU at the expense of more ingress bandwidth 24 + #[arg(long, action, env = "SPACEDUST_JETSTREAM_NO_ZSTD")] 24 - #[arg(long, action)] 25 25 jetstream_no_zstd: bool, 26 + /// spacedust server's listen address 27 + #[arg(long, env = "SPACEDUST_BIND")] 28 + #[clap(default_value = "[::]:8080")] 29 + bind: std::net::SocketAddr, 30 + /// enable metrics collection and serving 31 + #[arg(long, action, env = "SPACEDUST_COLLECT_METRICS")] 32 + collect_metrics: bool, 33 + /// metrics server's listen address 34 + #[arg(long, requires("collect_metrics"), env = "SPACEDUST_BIND_METRICS")] 35 + #[clap(default_value = "[::]:8765")] 36 + bind_metrics: std::net::SocketAddr, 26 37 } 27 38 28 39 #[tokio::main] ··· 60 71 61 72 let args = Args::parse(); 62 73 74 + if args.collect_metrics { 75 + log::trace!("installing metrics server..."); 76 + if let Err(e) = install_metrics_server(args.bind_metrics) { 77 + log::error!("failed to install metrics server: {e:?}"); 78 + }; 79 + } 63 - if let Err(e) = install_metrics_server() { 64 - log::error!("failed to install metrics server: {e:?}"); 65 - }; 66 80 67 81 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 68 82 69 83 let server_shutdown = shutdown.clone(); 84 + let bind = args.bind; 70 85 tasks.spawn(async move { 86 + server::serve(b, d, server_shutdown, bind).await?; 71 - server::serve(b, d, server_shutdown).await?; 72 87 Ok(()) 73 88 }); 74 89 ··· 122 137 Ok(()) 123 138 } 124 139 140 + fn install_metrics_server( 141 + bind: std::net::SocketAddr, 142 + ) -> Result<(), metrics_exporter_prometheus::BuildError> { 125 - fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 126 143 log::info!("installing metrics server..."); 127 - let host = [0, 0, 0, 0]; 128 - let port = 8765; 129 144 PrometheusBuilder::new() 130 145 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 131 146 .set_bucket_duration(std::time::Duration::from_secs(300))? 132 147 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 133 148 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 149 + .with_http_listener(bind) 134 - .with_http_listener((host, port)) 135 150 .install()?; 151 + log::info!("metrics server installed! listening on {bind}"); 136 - log::info!( 137 - "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 138 - host[0], 139 - host[1], 140 - host[2], 141 - host[3] 142 - ); 143 152 Ok(()) 144 153 }
+2 -1
spacedust/src/server.rs
··· 29 29 b: broadcast::Sender<Arc<ClientMessage>>, 30 30 d: broadcast::Sender<Arc<ClientMessage>>, 31 31 shutdown: CancellationToken, 32 + bind: std::net::SocketAddr, 32 33 ) -> Result<(), ServerError> { 33 34 let config_logging = ConfigLogging::StderrTerminal { 34 35 level: ConfigLoggingLevel::Info, ··· 72 73 73 74 let server = ServerBuilder::new(api, ctx, log) 74 75 .config(ConfigDropshot { 76 + bind_address: bind, 75 - bind_address: "0.0.0.0:9998".parse().unwrap(), 76 77 ..Default::default() 77 78 }) 78 79 .start()?;
+1 -1
ufos/Cargo.toml
··· 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 11 cardinality-estimator-safe = { version = "4.0.2", features = ["with_serde", "with_digest"] } 12 12 chrono = { version = "0.4.41", features = ["serde"] } 13 + clap = { workspace = true } 13 - clap = { version = "4.5.31", features = ["derive"] } 14 14 dropshot = "0.16.0" 15 15 env_logger = "0.11.7" 16 16 fjall = { git = "https://github.com/fjall-rs/fjall.git", rev = "fb229572bb7d1d6966a596994dc1708e47ec57d8", features = ["lz4"] }
+27 -21
ufos/src/main.rs
··· 26 26 struct Args { 27 27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 28 28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 29 + #[arg(long, env = "UFOS_JETSTREAM")] 29 - #[arg(long)] 30 30 jetstream: String, 31 31 /// allow changing jetstream endpoints 32 + #[arg(long, action, env = "UFOS_JETSTREAM_FORCE")] 32 - #[arg(long, action)] 33 33 jetstream_force: bool, 34 34 /// don't request zstd-compressed jetstream events 35 35 /// 36 36 /// reduces CPU at the expense of more ingress bandwidth 37 + #[arg(long, action, env = "UFOS_JETSTREAM_NO_ZSTD")] 37 - #[arg(long, action)] 38 38 jetstream_no_zstd: bool, 39 + /// ufos server's listen address 40 + #[arg(long, env = "UFOS_BIND")] 41 + #[clap(default_value = "0.0.0.0:9990")] 42 + bind: std::net::SocketAddr, 39 43 /// Location to store persist data to disk 44 + #[arg(long, env = "UFOS_DATA")] 40 - #[arg(long)] 41 45 data: PathBuf, 42 46 /// DEBUG: don't start the jetstream consumer or its write loop 47 + #[arg(long, action, env = "UFOS_PAUSE_WRITER")] 43 - #[arg(long, action)] 44 48 pause_writer: bool, 45 49 /// Adjust runtime settings like background task intervals for efficient backfill 50 + #[arg(long, action, env = "UFOS_BACKFILL_MODE")] 46 - #[arg(long, action)] 47 51 backfill: bool, 48 52 /// DEBUG: force the rw loop to fall behind by pausing it 49 53 /// todo: restore this 50 54 #[arg(long, action)] 51 55 pause_rw: bool, 52 56 /// reset the rollup cursor, scrape through missed things in the past (backfill) 57 + #[arg(long, action, env = "UFOS_REROLL")] 53 - #[arg(long, action)] 54 58 reroll: bool, 55 59 /// DEBUG: interpret jetstream as a file fixture 60 + #[arg(long, action, env = "UFOS_JETSTREAM_FIXTURE")] 56 - #[arg(long, action)] 57 61 jetstream_fixture: bool, 62 + /// enable metrics collection and serving 63 + #[arg(long, action, env = "UFOS_COLLECT_METRICS")] 64 + collect_metrics: bool, 65 + /// metrics server's listen address 66 + #[arg(long, env = "UFOS_BIND_METRICS")] 67 + #[clap(default_value = "0.0.0.0:8765")] 68 + bind_metrics: std::net::SocketAddr, 58 69 } 59 70 60 71 #[tokio::main] ··· 84 95 let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 85 96 86 97 println!("starting server with storage..."); 98 + let serving = server::serve(read_store.clone(), args.bind); 87 - let serving = server::serve(read_store.clone()); 88 99 whatever_tasks.spawn(async move { 89 100 serving.await.map_err(|e| { 90 101 log::warn!("server ended: {e}"); ··· 137 148 Ok(()) 138 149 }); 139 150 151 + if args.collect_metrics { 152 + log::trace!("installing metrics server..."); 153 + install_metrics_server(args.bind_metrics)?; 154 + } 140 - install_metrics_server()?; 141 155 142 156 for (i, t) in consumer_tasks.join_all().await.iter().enumerate() { 143 157 log::warn!("task {i} done: {t:?}"); ··· 151 165 Ok(()) 152 166 } 153 167 168 + fn install_metrics_server(bind: std::net::SocketAddr) -> anyhow::Result<()> { 154 - fn install_metrics_server() -> anyhow::Result<()> { 155 169 log::info!("installing metrics server..."); 156 - let host = [0, 0, 0, 0]; 157 - let port = 8765; 158 170 PrometheusBuilder::new() 159 171 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 160 172 .set_bucket_duration(Duration::from_secs(60))? 161 173 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here. 162 174 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 175 + .with_http_listener(bind) 163 - .with_http_listener((host, port)) 164 176 .install()?; 177 + log::info!("metrics server installed! listening on {bind}"); 165 - log::info!( 166 - "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 167 - host[0], 168 - host[1], 169 - host[2], 170 - host[3] 171 - ); 172 178 Ok(()) 173 179 } 174 180
+5 -2
ufos/src/server/mod.rs
··· 716 716 .await 717 717 } 718 718 719 + pub async fn serve( 720 + storage: impl StoreReader + 'static, 721 + bind: std::net::SocketAddr, 722 + ) -> Result<(), String> { 719 - pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 720 723 describe_metrics(); 721 724 let log = ConfigLogging::StderrTerminal { 722 725 level: ConfigLoggingLevel::Warn, ··· 758 761 759 762 ServerBuilder::new(api, context, log) 760 763 .config(ConfigDropshot { 764 + bind_address: bind, 761 - bind_address: "0.0.0.0:9999".parse().unwrap(), 762 765 ..Default::default() 763 766 }) 764 767 .start()
+1 -1
who-am-i/Cargo.toml
··· 11 11 axum = "0.8.4" 12 12 axum-extra = { version = "0.10.1", features = ["cookie-signed", "typed-header"] } 13 13 axum-template = { version = "3.0.0", features = ["handlebars"] } 14 + clap = { workspace = true } 14 - clap = { version = "4.5.40", features = ["derive", "env"] } 15 15 ctrlc = "3.4.7" 16 16 dashmap = "6.1.0" 17 17 elliptic-curve = "0.13.8"

History

8 rounds 13 comments
sign up or login to add to the discussion
11 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
Address feedback from fig
expand 0 comments
pull request successfully merged
10 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
expand 0 comments
8 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
expand 1 comment

Okay. I wrapped my head around the composite cursor you proposed and am working on refactoring both storage implementations towards that. I think I might re-submit another round tomorrow :)

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 3 comments

Found a bug in how we handle some of the pagination logic in cases where the number of items and the user selected limit are identical to very close too each other (already working on a fix)

thanks for the rebase! i tried to write things in the tiny text box but ended up needing to make a diagram: https://bsky.app/profile/did:plc:hdhoaan3xa3jiuq4fg4mefid/post/3mejuq44twc2t

key thing is that where the focus of getManyToManyCounts was the other subject (aggregation was against that, so grouping happened with it),

i think the focus of disagreggated many-to-many is on the linking records themselves

to me that takes me toward a few things

  • i don't think we should need to group the links by target (does the current code build up the full aggregation on every requested page? we should be able to avoid doing that)

  • i think the order of the response should actually be based on the linking record itself (since we have a row in the output), not the other subject, unlike with the aggregated/count version. this means you get eg. list items in order they were added instead of the order of the listed things being created. (i haven't fully wrapped my head around the grouping/ordering code here yet)

  • since any linking record can have a path_to_other with multiple links, i think a composite cursor could work here:

a 2-tuple of (backlink_vec_idx, forward_vec_idx).

for normal cases where the many-to-many record points to exactly one other subject, it would just be advancing backlink_vec_idx like normal backlinks

for cases where the many-to-many record actually has multiple foward links at the given path_to_other, the second part of the tuple would track progress through that list

i think that allows us to hold the necessary state between calls without needing to reconstruct too much in memory each time?

(also it's hard to write in this tiny tiny textbox and have a sense of whether what i'm saying makes sense)

Interesting approach! I have to think through this for a bit to be honest. Maybe I tried to follow the existing counts implementation too closely

Having said that, I added a new composite cursor to fix a couple of bugs that would arrive when hitting a couple of possible edge-cases in the pagination logic. This affects both the new get-many-to-many endpoint as well as the existing get-many-to-many-counts endpoint. As the changes are split over two distinct commits things should be straightforward to review.

Your assumption is still correct in the sense that we do indeed have to build up the aggregation again for every request. I have to double-check the get-backlinks endpoint to get a better sense of where you're going at.

Finally, I agree that the interface here doesn't necessarily make the whole thing easier to understand, unfortunately

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 2 comments

i think something got funky with a rebase or the way tangled is showing it -- some of my changes on main seem to be getting shown (reverted) in the diff.

i don't mind sorting it locally but will mostly get to it tomorrow, in case you want to see what's up before i do.

That's one on me, sorry! Rebased again on main and now everything seems fine

5 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
expand 5 comments

Rebased on main. As we discussed in the PR for the order query parameter, I didn't include this here as it's not a particular sensible fit.

i need to get into the code properly but my initial thought is that this endpoint should return a flat list of results, like

{
  "items": [
    {
      "link": { did, collection, rkey }, // the m2m link record
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "b.com"
    },
  ]
}

this will require a bit of tricks in the cursor to track pages across half-finished groups of links

(also this isn't an immediate change request, just getting it down for discussion!)

(and separately, i've also been wondering about moving more toward returning at-uris instead of broken-out did/collection/rkey objects. which isn't specifically about this PR, but if that happens then switching before releasing it is nice)

Hmm, I wonder how this would then work with the path_to_other parameter. Currently we have this nested grouping in order to show and disambiguate different relationships between different links.

For instance take the following query and it's results:

http://localhost:6789/xrpc/blue.microcosm.links.getManyToMany?subject=at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y&source=app.bsky.feed.post:reply.root.uri&pathToOther=reply.parent.uri&limit=16

This query asks: "Show me all posts in this thread, grouped by who they're responding to."

A flat list would just give us all the posts in the thread. The nested structure answers a richer question: who's talking to whom? Some posts are direct responses to the original article. Others are replies to other commenters, forming side conversations that branch off from the main thread.

The pathToOther grouping preserves that distinction. Without it, we'd lose the information about who's talking to whom.

{
  "linking_records": [
    {
      "subject": "at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y",
      "records": [
        {
          "did": "did:plc:lznqwrsbnyf6fdxohikqj6h3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd27pja7s2y"
        },
        {
          "did": "did:plc:uffx77au6hoauuuumkbuvqdr",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2tt5efc2a"
        },
        {
          "did": "did:plc:y7qyxzo7dns5m54dlq3youu3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2wtjxgc2d"
        },
        {
          "did": "did:plc:yaakslxyqydb76ybgkhrr4jk",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd35hyads22"
        },
        {
          "did": "did:plc:fia7w2kbnrdjwp6zvxywt7qv",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd37j3ldk2m"
        },
        {
          "did": "did:plc:xtecipifublblkomwau5x2ok",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd3dbtbz22n"
        },
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd42hpw7c2e"
        },
        {
          "did": "did:plc:fgquypfh32pewivn3bcmzseb",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd46jteoc2m"
        }
      ]
    },
    {
      "subject": "at://did:plc:3rhjxwwui6wwfokh4at3q2dl/app.bsky.feed.post/3mdczc7c4gk2i",
      "records": [
        {
          "did": "did:plc:3rhjxwwui6wwfokh4at3q2dl",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdczt7cwhk2i"
        }
      ]
    },
    {
      "subject": "at://did:plc:6buibzhkqr4vkqu75ezr2uv2/app.bsky.feed.post/3mdby25hbbk2v",
      "records": [
        {
          "did": "did:plc:fgeie2bmzlmx37iglj3xbzuj",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd26ulf4k2j"
        }
      ]
    },
    {
      "subject": "at://did:plc:lwgvv5oqh5stzb6dxa5d7z3n/app.bsky.feed.post/3mdcxqbkkfk2i",
      "records": [
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd45u56sk2e"
        }
      ]
    }
  ],
  "cursor": null
}

Correct me if I'm somehow wrong here!

Regarding returning at-uris: I think this might be a nice idea as users might be able to split these up when they feel the need to any way and it feels conceptually more complete. But, it might be easier to do this in a different PR over all existing XRPC endpoints. This would allow us to add this new endpoint already while working on the updated return values in the meantime. I'd like to avoid doing too much distinct stuff in one PR. :)

at-uris: totally fair, holding off for a follow-up.

flat list: i might have messed it up in my example but i think what i meant is actually equivalent to the grouped version: flattened, with the subject ("group by") included with every item in the flatted list.

clients can collect the flat list and group on subject to get back to your structured example, if they want.

my motivations are probably part sql-brain, part flat-list-enjoyer, and part cursor-related. i'm trying to disregard the first two, and i'm curious about your thoughts about how to handle the cursor:

with a flat list it's easy (from the client perspective at least) -- just keep chasing the cursor for as much of the data as you need. (cursors can happen in the middle of a subject)

with nested results grouped by subject it's less obvious to me. correct me if i'm wrong (need another block of time to actually get into the code) but i think the grouped item sub-list is unbounded size in the proposed code here? so cursors are only limiting the number of groups.

if we go with the grouped nested response, i think maybe we'd want something like:

  • a cursor at the end for fetching more groups, and
  • a cursor for each group-list that lets you fetch more items from just that group-list.

(i think this kind of nested paging is pretty neat!)

Interesting. Now that you mention it I feel I kinda get where you're going at!

I think the whole cursor thing, albeit possible for sure, is kinda creating more unnecessary complexity so I'll probably go with your suggestion.

It seems easier to create custom groupings on their own for most users (having more freedom is always great) and I think from an ergonomic perspective the two cursors might create more friction.

4 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
expand 1 comment

Added the missing lexicon entry for the new endpoint and changed the return type as well. Commented this wrongly at the other PR that I was working on. Sorry about that lol.

3 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
expand 1 comment

I think the existing get_many_to_many_counts handler and the new get_many_to_many handler are similar enough that we might extract the bulk of their logic in a shared piece of logic. Maybe a method that takes the existing identical function parameters and a new additional callback parameter (that handles what we do with found matches, i.e. calculate counts or join URIs) might be one way to go for it.

I am not too sure yet though if this is indeed the right thing to do as the new shared implementation might be a bit complicated. But given the strong similarities between the two I think it's worth at least considering.