Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Add new get_many_to_many XRPC endpoint #7

merged opened by seoul.systems targeting main from seoul.systems/microcosm-rs: xrpc_many2many

Added a new XRPC API endpoint to fetch joined record URIs, termed get_many_to_many (we talked about this briefly on Discord already). It is implemented and functions almost identical to the existing get_many_to_many_counts endpoint and handler. Some of its possible flaws like the two step lookup to verify a matching DID is indeed active are duplicated as well. On the plus side, this should make the PR pretty straightforward to review and make it easier to modify both endpoints later on when a more efficient way to validate the status of DIDs is possible.

If you have comments remarks etc. I am happy to work on some parts again.

Labels

None yet.

Participants 2
AT URI
at://did:plc:53wellrw53o7sw4zlpfenvuh/sh.tangled.repo.pull/3mbkyehqooh22
+669 -907
Interdiff #2 โ†’ #3
-6
constellation/src/lib.rs
··· 50 50 } 51 51 } 52 52 53 - #[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)] 54 - pub struct RecordsBySubject { 55 - pub subject: String, 56 - pub records: Vec<RecordId>, 57 - } 58 - 59 53 /// maybe the worst type in this repo, and there are some bad types
constellation/src/server/mod.rs

This patch was likely rebased, as context lines do not match.

+10 -8
constellation/src/storage/mem_store.rs
··· 1 1 use super::{ 2 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 3 }; 4 - use crate::{ActionableEvent, CountsByCount, Did, RecordId, RecordsBySubject}; 4 + use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 5 use anyhow::Result; 6 6 use links::CollectedLink; 7 7 use std::collections::{HashMap, HashSet}; ··· 244 244 after: Option<String>, 245 245 filter_dids: &HashSet<Did>, 246 246 filter_to_targets: &HashSet<String>, 247 - ) -> Result<PagedOrderedCollection<RecordsBySubject, String>> { 247 + ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 248 248 let empty_res = Ok(PagedOrderedCollection { 249 249 items: Vec::new(), 250 250 next: None, ··· 307 307 308 308 let mut items = grouped_links 309 309 .into_iter() 310 - .map(|(t, r)| RecordsBySubject { 311 - subject: t.0, 312 - records: r, 310 + .flat_map(|(target, records)| { 311 + records 312 + .iter() 313 + .map(move |r| (r.clone(), target.0.clone())) 314 + .collect::<Vec<_>>() 313 315 }) 314 316 .collect::<Vec<_>>(); 315 317 316 - items.sort_by(|a, b| a.subject.cmp(&b.subject)); 318 + items.sort_by(|a: &(RecordId, String), b| a.1.cmp(&b.1)); 317 319 318 320 items = items 319 321 .into_iter() 320 - .skip_while(|item| after.as_ref().map(|a| &item.subject <= a).unwrap_or(false)) 322 + .skip_while(|item| after.as_ref().map(|a| &item.1 <= a).unwrap_or(false)) 321 323 .take(limit as usize) 322 324 .collect(); 323 325 324 326 let next = if items.len() as u64 >= limit { 325 - items.last().map(|item| item.subject.clone()) 327 + items.last().map(|item| item.1.clone()) 326 328 } else { 327 329 None 328 330 };
+34 -44
constellation/src/storage/mod.rs
··· 1 - use crate::{ActionableEvent, CountsByCount, Did, RecordId, RecordsBySubject}; 1 + use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 2 2 use anyhow::Result; 3 3 use serde::{Deserialize, Serialize}; 4 4 use std::collections::{HashMap, HashSet}; ··· 145 145 after: Option<String>, 146 146 filter_dids: &HashSet<Did>, 147 147 filter_to_targets: &HashSet<String>, 148 - ) -> Result<PagedOrderedCollection<RecordsBySubject, String>>; 148 + ) -> Result<PagedOrderedCollection<(RecordId, String), String>>; 149 149 150 150 fn get_all_counts( 151 151 &self, ··· 1740 1740 &HashSet::new(), 1741 1741 )?, 1742 1742 PagedOrderedCollection { 1743 - items: vec![RecordsBySubject { 1744 - subject: "b.com".to_string(), 1745 - records: vec![RecordId { 1743 + items: vec![( 1744 + RecordId { 1746 1745 did: "did:plc:asdf".into(), 1747 1746 collection: "app.t.c".into(), 1748 1747 rkey: "asdf".into(), 1749 - }] 1750 - }], 1748 + }, 1749 + "b.com".to_string(), 1750 + )], 1751 1751 next: None, 1752 1752 } 1753 1753 ); 1754 1754 }); 1755 1755 1756 - test_each_storage!(get_m2m_filters, |storage| { 1756 + test_each_storage!(get_m2m_no_filters, |storage| { 1757 1757 storage.push( 1758 1758 &ActionableEvent::CreateLinks { 1759 1759 record_id: RecordId { ··· 1835 1835 3, 1836 1836 )?; 1837 1837 1838 - // Test without filters - should get all records grouped by secondary target 1838 + // Test without filters - should get all records as flat items 1839 1839 let result = storage.get_many_to_many( 1840 1840 "a.com", 1841 1841 "app.t.c", ··· 1846 1846 &HashSet::new(), 1847 1847 &HashSet::new(), 1848 1848 )?; 1849 - assert_eq!(result.items.len(), 2); 1849 + assert_eq!(result.items.len(), 4); 1850 1850 assert_eq!(result.next, None); 1851 - // Find b.com group 1852 - let b_group = result 1851 + // Check b.com items 1852 + let b_items: Vec<_> = result 1853 1853 .items 1854 1854 .iter() 1855 - .find(|group| group.subject == "b.com") 1856 - .unwrap(); 1857 - assert_eq!(b_group.subject, "b.com"); 1858 - assert_eq!(b_group.records.len(), 2); 1859 - assert!(b_group 1860 - .records 1855 + .filter(|(_, subject)| subject == "b.com") 1856 + .collect(); 1857 + assert_eq!(b_items.len(), 2); 1858 + assert!(b_items 1861 1859 .iter() 1862 - .any(|r| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1863 - assert!(b_group 1864 - .records 1860 + .any(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf")); 1861 + assert!(b_items 1865 1862 .iter() 1866 - .any(|r| r.did.0 == "did:plc:asdf" && r.rkey == "asdf2")); 1867 - // Find c.com group 1868 - let c_group = result 1863 + .any(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf2")); 1864 + // Check c.com items 1865 + let c_items: Vec<_> = result 1869 1866 .items 1870 1867 .iter() 1871 - .find(|group| group.subject == "c.com") 1872 - .unwrap(); 1873 - assert_eq!(c_group.subject, "c.com"); 1874 - assert_eq!(c_group.records.len(), 2); 1875 - assert!(c_group 1876 - .records 1868 + .filter(|(_, subject)| subject == "c.com") 1869 + .collect(); 1870 + assert_eq!(c_items.len(), 2); 1871 + assert!(c_items 1877 1872 .iter() 1878 - .any(|r| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa")); 1879 - assert!(c_group 1880 - .records 1873 + .any(|(r, _)| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa")); 1874 + assert!(c_items 1881 1875 .iter() 1882 - .any(|r| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa2")); 1876 + .any(|(r, _)| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa2")); 1883 1877 1884 1878 // Test with DID filter - should only get records from did:plc:fdsa 1885 1879 let result = storage.get_many_to_many( ··· 1892 1886 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), 1893 1887 &HashSet::new(), 1894 1888 )?; 1895 - assert_eq!(result.items.len(), 1); 1896 - let group = &result.items[0]; 1897 - assert_eq!(group.subject, "c.com"); 1898 - assert_eq!(group.records.len(), 2); 1899 - assert!(group.records.iter().all(|r| r.did.0 == "did:plc:fdsa")); 1889 + assert_eq!(result.items.len(), 2); 1890 + assert!(result.items.iter().all(|(_, subject)| subject == "c.com")); 1891 + assert!(result.items.iter().all(|(r, _)| r.did.0 == "did:plc:fdsa")); 1900 1892 1901 1893 // Test with target filter - should only get records linking to b.com 1902 1894 let result = storage.get_many_to_many( ··· 1909 1901 &HashSet::new(), 1910 1902 &HashSet::from_iter(["b.com".to_string()]), 1911 1903 )?; 1912 - assert_eq!(result.items.len(), 1); 1913 - let group = &result.items[0]; 1914 - assert_eq!(group.subject, "b.com"); 1915 - assert_eq!(group.records.len(), 2); 1916 - assert!(group.records.iter().all(|r| r.did.0 == "did:plc:asdf")); 1904 + assert_eq!(result.items.len(), 2); 1905 + assert!(result.items.iter().all(|(_, subject)| subject == "b.com")); 1906 + assert!(result.items.iter().all(|(r, _)| r.did.0 == "did:plc:asdf")); 1917 1907 }); 1918 1908 }
+6 -7
constellation/src/storage/rocks_store.rs
··· 2 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 3 PagedOrderedCollection, StorageStats, 4 4 }; 5 - use crate::{CountsByCount, Did, RecordId, RecordsBySubject}; 5 + use crate::{CountsByCount, Did, RecordId}; 6 6 use anyhow::{bail, Result}; 7 7 use bincode::Options as BincodeOptions; 8 8 use links::CollectedLink; ··· 1132 1132 after: Option<String>, 1133 1133 filter_dids: &HashSet<Did>, 1134 1134 filter_to_targets: &HashSet<String>, 1135 - ) -> Result<PagedOrderedCollection<RecordsBySubject, String>> { 1135 + ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1136 1136 let collection = Collection(collection.to_string()); 1137 1137 let path = RPath(path.to_string()); 1138 1138 ··· 1241 1241 } 1242 1242 } 1243 1243 1244 - let mut items: Vec<RecordsBySubject> = Vec::with_capacity(grouped_links.len()); 1244 + let mut items: Vec<(RecordId, String)> = Vec::with_capacity(grouped_links.len()); 1245 1245 for (fwd_target_id, records) in &grouped_links { 1246 1246 let Some(target_key) = self 1247 1247 .target_id_table ··· 1253 1253 1254 1254 let target_string = target_key.0 .0; 1255 1255 1256 - items.push(RecordsBySubject { 1257 - subject: target_string, 1258 - records: records.clone(), 1259 - }); 1256 + records 1257 + .iter() 1258 + .for_each(|r| items.push((r.clone(), target_string.clone()))); 1260 1259 } 1261 1260 1262 1261 let next = if grouped_links.len() as u64 >= limit {
+6 -8
constellation/templates/get-many-to-many.html.j2
··· 23 23 24 24 <h3>Many-to-many links, most recent first:</h3> 25 25 26 - {% for group in linking_records %} 27 - <h4>Target: <code>{{ group.subject }}</code> <small>(<a href="/links/all?target={{ group.subject|urlencode }}">view all links</a>)</small></h4> 28 - {% for record in group.records %} 29 - <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} 30 - <strong>Collection</strong>: {{ record.collection }} 31 - <strong>RKey</strong>: {{ record.rkey }} 32 - -> <a href="https://pdsls.dev/at://{{ record.did().0 }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre> 33 - {% endfor %} 26 + {% for item in items %} 27 + <pre style="display: block; margin: 1em 2em" class="code"><strong>Subject</strong>: <a href="/links/all?target={{ item.subject|urlencode }}">{{ item.subject }}</a> 28 + <strong>DID</strong>: {{ item.link.did().0 }} 29 + <strong>Collection</strong>: {{ item.link.collection }} 30 + <strong>RKey</strong>: {{ item.link.rkey }} 31 + -> <a href="https://pdsls.dev/at://{{ item.link.did().0 }}/{{ item.link.collection }}/{{ item.link.rkey }}">browse record</a></pre> 34 32 {% endfor %} 35 33 36 34 {% if let Some(c) = cursor %}
+1 -14
constellation/templates/hello.html.j2
··· 140 140 {% call try_it::dids("at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r", "app.bsky.feed.like", ".subject.uri") %} 141 141 142 142 143 - <h3 class="route deprecated"><code>[deprecated] GET /links/count</code></h3> 143 + <h3 class="route"><code>GET /links/count</code></h3> 144 144 145 145 <p>The total number of links pointing at a given target.</p> 146 146 ··· 156 156 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 157 157 {% call try_it::links_count("did:plc:vc7f4oafdgxsihk4cry2xpze", "app.bsky.graph.block", ".subject") %} 158 158 159 - <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getBacklinksCount</code></h3> 160 - 161 - <p>The total number of links pointing at a given target.</p> 162 - 163 - <h4>Query parameters:</h4> 164 - 165 - <ul> 166 - <li><code>subject</code>: required, must url-encode. The target being linked to. Example: <code>did:plc:vc7f4oafdgxsihk4cry2xpze</code> or <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></li> 167 - <li><code>source</code>: required. Collection and path specification for the primary link. Example: <code>app.bsky.feed.like:subject.uri</code></li> 168 - </ul> 169 - 170 - <p style="margin-bottom: 0"><strong>Try it:</strong></p> 171 - {% call try_it::get_backlinks_count("did:plc:vc7f4oafdgxsihk4cry2xpze", "app.bsky.graph.block:subject") %} 172 159 173 160 <h3 class="route"><code>GET /links/count/distinct-dids</code></h3> 174 161
-7
constellation/templates/try-it-macros.html.j2
··· 134 134 </form> 135 135 {% endmacro %} 136 136 137 - {% macro get_backlinks_count(subject, source) %} 138 - <form method="get" action="/xrpc/blue.microcosm.links.getBacklinksCount"> 139 - <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinksCount 140 - ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="subject" /> 141 - &source= <input type="text" name="source" value="{{ source }}" placeholder="source" /> <button type="submit">get links count</button></pre> 142 - </form> 143 - {% endmacro %} 144 137 145 138 {% macro links_count(target, collection, path) %} 146 139 <form method="get" action="/links/count">
+12 -17
lexicons/blue.microcosm/links/getManyToMany.json
··· 4 4 "defs": { 5 5 "main": { 6 6 "type": "query", 7 - "description": "Get records that link to a primary subject, grouped by the secondary subjects they also reference", 7 + "description": "Get records that link to a primary subject along with the secondary subjects they also reference", 8 8 "parameters": { 9 9 "type": "params", 10 10 "required": ["subject", "source", "pathToOther"], ··· 50 50 "encoding": "application/json", 51 51 "schema": { 52 52 "type": "object", 53 - "required": ["linking_records"], 53 + "required": ["items"], 54 54 "properties": { 55 - "linking_records": { 55 + "items": { 56 56 "type": "array", 57 57 "items": { 58 58 "type": "ref", 59 - "ref": "#recordsBySubject" 59 + "ref": "#item" 60 60 } 61 61 }, 62 62 "cursor": { 63 - "type": "string", 64 - "description": "pagination cursor" 63 + "type": "string" 65 64 } 66 65 } 67 66 } 68 67 } 69 68 }, 70 - "recordsBySubject": { 69 + "item": { 71 70 "type": "object", 72 - "required": ["subject", "records"], 71 + "required": ["link", "subject"], 73 72 "properties": { 74 - "subject": { 75 - "type": "string", 76 - "description": "the secondary subject that these records link to" 73 + "link": { 74 + "type": "ref", 75 + "ref": "#linkRecord" 77 76 }, 78 - "records": { 79 - "type": "array", 80 - "items": { 81 - "type": "ref", 82 - "ref": "#linkRecord" 83 - } 77 + "subject": { 78 + "type": "string" 84 79 } 85 80 } 86 81 },
-1
.gitignore
··· 1 1 /target 2 2 local/ 3 - rocks.test
-4
.prettierrc
··· 1 - { 2 - "tabWidth": 2, 3 - "useTabs": false 4 - }
+352 -175
Cargo.lock
··· 24 24 checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" 25 25 dependencies = [ 26 26 "cfg-if", 27 + "getrandom 0.2.15", 27 28 "once_cell", 28 29 "version_check", 29 30 "zerocopy 0.7.35", ··· 122 123 checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223" 123 124 124 125 [[package]] 126 + name = "arc-swap" 127 + version = "1.7.1" 128 + source = "registry+https://github.com/rust-lang/crates.io-index" 129 + checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 130 + 131 + [[package]] 125 132 name = "arrayvec" 126 133 version = "0.7.6" 127 134 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 155 162 "proc-macro2", 156 163 "quote", 157 164 "serde", 158 - "syn", 165 + "syn 2.0.106", 159 166 ] 160 167 161 168 [[package]] ··· 197 204 dependencies = [ 198 205 "proc-macro2", 199 206 "quote", 200 - "syn", 207 + "syn 2.0.106", 201 208 "synstructure", 202 209 ] 203 210 ··· 209 216 dependencies = [ 210 217 "proc-macro2", 211 218 "quote", 212 - "syn", 219 + "syn 2.0.106", 220 + ] 221 + 222 + [[package]] 223 + name = "async-channel" 224 + version = "2.5.0" 225 + source = "registry+https://github.com/rust-lang/crates.io-index" 226 + checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" 227 + dependencies = [ 228 + "concurrent-queue", 229 + "event-listener-strategy", 230 + "futures-core", 231 + "pin-project-lite", 213 232 ] 214 233 215 234 [[package]] ··· 255 274 dependencies = [ 256 275 "proc-macro2", 257 276 "quote", 258 - "syn", 277 + "syn 2.0.106", 259 278 ] 279 + 280 + [[package]] 281 + name = "async-task" 282 + version = "4.7.1" 283 + source = "registry+https://github.com/rust-lang/crates.io-index" 284 + checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" 260 285 261 286 [[package]] 262 287 name = "async-trait" ··· 266 291 dependencies = [ 267 292 "proc-macro2", 268 293 "quote", 269 - "syn", 294 + "syn 2.0.106", 270 295 ] 271 296 272 297 [[package]] ··· 471 496 "serde_json", 472 497 "thiserror 1.0.69", 473 498 "trait-variant", 499 + ] 500 + 501 + [[package]] 502 + name = "auto_enums" 503 + version = "0.8.7" 504 + source = "registry+https://github.com/rust-lang/crates.io-index" 505 + checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781" 506 + dependencies = [ 507 + "derive_utils", 508 + "proc-macro2", 509 + "quote", 510 + "syn 2.0.106", 474 511 ] 475 512 476 513 [[package]] ··· 712 749 "regex", 713 750 "rustc-hash 1.1.0", 714 751 "shlex", 715 - "syn", 752 + "syn 2.0.106", 716 753 "which", 717 754 ] 718 755 ··· 725 762 "bitflags", 726 763 "cexpr", 727 764 "clang-sys", 728 - "itertools 0.12.1", 765 + "itertools 0.13.0", 729 766 "proc-macro2", 730 767 "quote", 731 768 "regex", 732 769 "rustc-hash 1.1.0", 733 770 "shlex", 734 - "syn", 771 + "syn 2.0.106", 735 772 ] 736 773 737 774 [[package]] ··· 743 780 "bitflags", 744 781 "cexpr", 745 782 "clang-sys", 746 - "itertools 0.12.1", 783 + "itertools 0.13.0", 747 784 "proc-macro2", 748 785 "quote", 749 786 "regex", 750 787 "rustc-hash 2.1.1", 751 788 "shlex", 752 - "syn", 789 + "syn 2.0.106", 753 790 ] 754 791 755 792 [[package]] ··· 803 840 804 841 [[package]] 805 842 name = "bytes" 806 - version = "1.11.1" 843 + version = "1.10.1" 807 844 source = "registry+https://github.com/rust-lang/crates.io-index" 808 - checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" 845 + checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 809 846 810 847 [[package]] 811 848 name = "byteview" ··· 955 992 956 993 [[package]] 957 994 name = "clap" 958 - version = "4.5.56" 995 + version = "4.5.48" 959 996 source = "registry+https://github.com/rust-lang/crates.io-index" 960 - checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" 997 + checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" 961 998 dependencies = [ 962 999 "clap_builder", 963 1000 "clap_derive", ··· 965 1002 966 1003 [[package]] 967 1004 name = "clap_builder" 968 - version = "4.5.56" 1005 + version = "4.5.48" 969 1006 source = "registry+https://github.com/rust-lang/crates.io-index" 970 - checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" 1007 + checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" 971 1008 dependencies = [ 972 1009 "anstream", 973 1010 "anstyle", 974 1011 "clap_lex", 975 - "strsim", 1012 + "strsim 0.11.1", 976 1013 ] 977 1014 978 1015 [[package]] 979 1016 name = "clap_derive" 980 - version = "4.5.55" 1017 + version = "4.5.47" 981 1018 source = "registry+https://github.com/rust-lang/crates.io-index" 982 - checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" 1019 + checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" 983 1020 dependencies = [ 984 1021 "heck", 985 1022 "proc-macro2", 986 1023 "quote", 987 - "syn", 1024 + "syn 2.0.106", 988 1025 ] 989 1026 990 1027 [[package]] ··· 1136 1173 ] 1137 1174 1138 1175 [[package]] 1139 - name = "core_affinity" 1140 - version = "0.8.3" 1141 - source = "registry+https://github.com/rust-lang/crates.io-index" 1142 - checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" 1143 - dependencies = [ 1144 - "libc", 1145 - "num_cpus", 1146 - "winapi", 1147 - ] 1148 - 1149 - [[package]] 1150 1176 name = "cpufeatures" 1151 1177 version = "0.2.17" 1152 1178 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1244 1270 1245 1271 [[package]] 1246 1272 name = "darling" 1273 + version = "0.14.4" 1274 + source = "registry+https://github.com/rust-lang/crates.io-index" 1275 + checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" 1276 + dependencies = [ 1277 + "darling_core 0.14.4", 1278 + "darling_macro 0.14.4", 1279 + ] 1280 + 1281 + [[package]] 1282 + name = "darling" 1247 1283 version = "0.20.11" 1248 1284 source = "registry+https://github.com/rust-lang/crates.io-index" 1249 1285 checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" 1250 1286 dependencies = [ 1251 - "darling_core", 1252 - "darling_macro", 1287 + "darling_core 0.20.11", 1288 + "darling_macro 0.20.11", 1289 + ] 1290 + 1291 + [[package]] 1292 + name = "darling_core" 1293 + version = "0.14.4" 1294 + source = "registry+https://github.com/rust-lang/crates.io-index" 1295 + checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" 1296 + dependencies = [ 1297 + "fnv", 1298 + "ident_case", 1299 + "proc-macro2", 1300 + "quote", 1301 + "strsim 0.10.0", 1302 + "syn 1.0.109", 1253 1303 ] 1254 1304 1255 1305 [[package]] ··· 1262 1312 "ident_case", 1263 1313 "proc-macro2", 1264 1314 "quote", 1265 - "strsim", 1266 - "syn", 1315 + "strsim 0.11.1", 1316 + "syn 2.0.106", 1317 + ] 1318 + 1319 + [[package]] 1320 + name = "darling_macro" 1321 + version = "0.14.4" 1322 + source = "registry+https://github.com/rust-lang/crates.io-index" 1323 + checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" 1324 + dependencies = [ 1325 + "darling_core 0.14.4", 1326 + "quote", 1327 + "syn 1.0.109", 1267 1328 ] 1268 1329 1269 1330 [[package]] ··· 1272 1333 source = "registry+https://github.com/rust-lang/crates.io-index" 1273 1334 checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" 1274 1335 dependencies = [ 1275 - "darling_core", 1336 + "darling_core 0.20.11", 1276 1337 "quote", 1277 - "syn", 1338 + "syn 2.0.106", 1278 1339 ] 1279 1340 1280 1341 [[package]] ··· 1314 1375 checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" 1315 1376 dependencies = [ 1316 1377 "data-encoding", 1317 - "syn", 1378 + "syn 2.0.106", 1318 1379 ] 1319 1380 1320 1381 [[package]] ··· 1373 1434 source = "registry+https://github.com/rust-lang/crates.io-index" 1374 1435 checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" 1375 1436 dependencies = [ 1376 - "darling", 1437 + "darling 0.20.11", 1377 1438 "proc-macro2", 1378 1439 "quote", 1379 - "syn", 1440 + "syn 2.0.106", 1380 1441 ] 1381 1442 1382 1443 [[package]] ··· 1386 1447 checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" 1387 1448 dependencies = [ 1388 1449 "derive_builder_core", 1389 - "syn", 1450 + "syn 2.0.106", 1390 1451 ] 1391 1452 1392 1453 [[package]] ··· 1406 1467 dependencies = [ 1407 1468 "proc-macro2", 1408 1469 "quote", 1409 - "syn", 1470 + "syn 2.0.106", 1410 1471 "unicode-xid", 1411 1472 ] 1412 1473 1413 1474 [[package]] 1475 + name = "derive_utils" 1476 + version = "0.15.0" 1477 + source = "registry+https://github.com/rust-lang/crates.io-index" 1478 + checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" 1479 + dependencies = [ 1480 + "proc-macro2", 1481 + "quote", 1482 + "syn 2.0.106", 1483 + ] 1484 + 1485 + [[package]] 1414 1486 name = "digest" 1415 1487 version = "0.10.7" 1416 1488 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1451 1523 dependencies = [ 1452 1524 "proc-macro2", 1453 1525 "quote", 1454 - "syn", 1526 + "syn 2.0.106", 1455 1527 ] 1456 1528 1457 1529 [[package]] ··· 1459 1531 version = "0.1.0" 1460 1532 source = "registry+https://github.com/rust-lang/crates.io-index" 1461 1533 checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" 1534 + 1535 + [[package]] 1536 + name = "downcast-rs" 1537 + version = "1.2.1" 1538 + source = "registry+https://github.com/rust-lang/crates.io-index" 1539 + checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" 1462 1540 1463 1541 [[package]] 1464 1542 name = "dropshot" ··· 1504 1582 "thiserror 2.0.16", 1505 1583 "tokio", 1506 1584 "tokio-rustls 0.25.0", 1507 - "toml", 1585 + "toml 0.9.7", 1508 1586 "uuid", 1509 1587 "version_check", 1510 1588 "waitgroup", ··· 1522 1600 "semver", 1523 1601 "serde", 1524 1602 "serde_tokenstream", 1525 - "syn", 1603 + "syn 2.0.106", 1526 1604 ] 1527 1605 1528 1606 [[package]] ··· 1595 1673 "heck", 1596 1674 "proc-macro2", 1597 1675 "quote", 1598 - "syn", 1676 + "syn 2.0.106", 1599 1677 ] 1600 1678 1601 1679 [[package]] ··· 1607 1685 "once_cell", 1608 1686 "proc-macro2", 1609 1687 "quote", 1610 - "syn", 1688 + "syn 2.0.106", 1611 1689 ] 1612 1690 1613 1691 [[package]] ··· 1681 1759 version = "0.1.9" 1682 1760 source = "registry+https://github.com/rust-lang/crates.io-index" 1683 1761 checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 1684 - 1685 - [[package]] 1686 - name = "fastant" 1687 - version = "0.1.11" 1688 - source = "registry+https://github.com/rust-lang/crates.io-index" 1689 - checksum = "2e825441bfb2d831c47c97d05821552db8832479f44c571b97fededbf0099c07" 1690 - dependencies = [ 1691 - "small_ctor", 1692 - "web-time", 1693 - ] 1694 1762 1695 1763 [[package]] 1696 1764 name = "fastrand" ··· 1767 1835 source = "registry+https://github.com/rust-lang/crates.io-index" 1768 1836 checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" 1769 1837 dependencies = [ 1838 + "futures-core", 1839 + "futures-sink", 1840 + "nanorand", 1770 1841 "spin", 1771 1842 ] 1772 1843 ··· 1783 1854 checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 1784 1855 1785 1856 [[package]] 1786 - name = "foldhash" 1787 - version = "0.2.0" 1788 - source = "registry+https://github.com/rust-lang/crates.io-index" 1789 - checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 1790 - 1791 - [[package]] 1792 1857 name = "foreign-types" 1793 1858 version = "0.3.2" 1794 1859 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1814 1879 1815 1880 [[package]] 1816 1881 name = "foyer" 1817 - version = "0.22.3" 1882 + version = "0.18.0" 1818 1883 source = "registry+https://github.com/rust-lang/crates.io-index" 1819 - checksum = "3b0abc0b87814989efa711f9becd9f26969820e2d3905db27d10969c4bd45890" 1884 + checksum = "0b4d8e96374206ff1b4265f2e2e6e1f80bc3048957b2a1e7fdeef929d68f318f" 1820 1885 dependencies = [ 1821 - "anyhow", 1822 1886 "equivalent", 1823 1887 "foyer-common", 1824 1888 "foyer-memory", 1825 1889 "foyer-storage", 1826 - "foyer-tokio", 1827 - "futures-util", 1828 - "mea", 1890 + "madsim-tokio", 1829 1891 "mixtrics", 1830 1892 "pin-project", 1831 1893 "serde", 1894 + "thiserror 2.0.16", 1895 + "tokio", 1832 1896 "tracing", 1833 1897 ] 1834 1898 1835 1899 [[package]] 1836 1900 name = "foyer-common" 1837 - version = "0.22.3" 1901 + version = "0.18.0" 1838 1902 source = "registry+https://github.com/rust-lang/crates.io-index" 1839 - checksum = "a3db80d5dece93adb7ad709c84578794724a9cba342a7e566c3551c7ec626789" 1903 + checksum = "911b8e3f23d5fe55b0b240f75af1d2fa5cb7261d3f9b38ef1c57bbc9f0449317" 1840 1904 dependencies = [ 1841 - "anyhow", 1842 1905 "bincode 1.3.3", 1843 1906 "bytes", 1844 1907 "cfg-if", 1845 - "foyer-tokio", 1908 + "itertools 0.14.0", 1909 + "madsim-tokio", 1846 1910 "mixtrics", 1847 1911 "parking_lot", 1848 1912 "pin-project", 1849 1913 "serde", 1914 + "thiserror 2.0.16", 1915 + "tokio", 1850 1916 "twox-hash", 1851 1917 ] 1852 1918 ··· 1861 1927 1862 1928 [[package]] 1863 1929 name = "foyer-memory" 1864 - version = "0.22.3" 1930 + version = "0.18.0" 1865 1931 source = "registry+https://github.com/rust-lang/crates.io-index" 1866 - checksum = "db907f40a527ca2aa2f40a5f68b32ea58aa70f050cd233518e9ffd402cfba6ce" 1932 + checksum = "506883d5a8500dea1b1662f7180f3534bdcbfa718d3253db7179552ef83612fa" 1867 1933 dependencies = [ 1868 - "anyhow", 1934 + "arc-swap", 1869 1935 "bitflags", 1870 1936 "cmsketch", 1871 1937 "equivalent", 1872 1938 "foyer-common", 1873 1939 "foyer-intrusive-collections", 1874 - "foyer-tokio", 1875 - "futures-util", 1876 - "hashbrown 0.16.1", 1940 + "hashbrown 0.15.2", 1877 1941 "itertools 0.14.0", 1878 - "mea", 1942 + "madsim-tokio", 1879 1943 "mixtrics", 1880 1944 "parking_lot", 1881 - "paste", 1882 1945 "pin-project", 1883 1946 "serde", 1947 + "thiserror 2.0.16", 1948 + "tokio", 1884 1949 "tracing", 1885 1950 ] 1886 1951 1887 1952 [[package]] 1888 1953 name = "foyer-storage" 1889 - version = "0.22.3" 1954 + version = "0.18.0" 1890 1955 source = "registry+https://github.com/rust-lang/crates.io-index" 1891 - checksum = "1983f1db3d0710e9c9d5fc116d9202dccd41a2d1e032572224f1aff5520aa958" 1956 + checksum = "1ba8403a54a2f2032fb647e49c442e5feeb33f3989f7024f1b178341a016f06d" 1892 1957 dependencies = [ 1893 1958 "allocator-api2", 1894 1959 "anyhow", 1960 + "auto_enums", 1895 1961 "bytes", 1896 - "core_affinity", 1897 1962 "equivalent", 1898 - "fastant", 1963 + "flume", 1899 1964 "foyer-common", 1900 1965 "foyer-memory", 1901 - "foyer-tokio", 1902 1966 "fs4 0.13.1", 1903 1967 "futures-core", 1904 1968 "futures-util", 1905 - "hashbrown 0.16.1", 1906 - "io-uring", 1907 1969 "itertools 0.14.0", 1908 1970 "libc", 1909 1971 "lz4", 1910 - "mea", 1972 + "madsim-tokio", 1973 + "ordered_hash_map", 1911 1974 "parking_lot", 1975 + "paste", 1912 1976 "pin-project", 1913 1977 "rand 0.9.1", 1914 1978 "serde", 1979 + "thiserror 2.0.16", 1980 + "tokio", 1915 1981 "tracing", 1916 1982 "twox-hash", 1917 1983 "zstd", 1918 - ] 1919 - 1920 - [[package]] 1921 - name = "foyer-tokio" 1922 - version = "0.22.3" 1923 - source = "registry+https://github.com/rust-lang/crates.io-index" 1924 - checksum = "f6577b05a7ffad0db555aedf00bfe52af818220fc4c1c3a7a12520896fc38627" 1925 - dependencies = [ 1926 - "tokio", 1927 1984 ] 1928 1985 1929 1986 [[package]] ··· 2008 2065 dependencies = [ 2009 2066 "proc-macro2", 2010 2067 "quote", 2011 - "syn", 2068 + "syn 2.0.106", 2012 2069 ] 2013 2070 2014 2071 [[package]] ··· 2172 2229 version = "0.12.3" 2173 2230 source = "registry+https://github.com/rust-lang/crates.io-index" 2174 2231 checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" 2232 + 2233 + [[package]] 2234 + name = "hashbrown" 2235 + version = "0.13.2" 2236 + source = "registry+https://github.com/rust-lang/crates.io-index" 2237 + checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" 2238 + dependencies = [ 2239 + "ahash", 2240 + ] 2175 2241 2176 2242 [[package]] 2177 2243 name = "hashbrown" ··· 2187 2253 dependencies = [ 2188 2254 "allocator-api2", 2189 2255 "equivalent", 2190 - "foldhash 0.1.5", 2191 - ] 2192 - 2193 - [[package]] 2194 - name = "hashbrown" 2195 - version = "0.16.1" 2196 - source = "registry+https://github.com/rust-lang/crates.io-index" 2197 - checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 2198 - dependencies = [ 2199 - "allocator-api2", 2200 - "equivalent", 2201 - "foldhash 0.2.0", 2256 + "foldhash", 2202 2257 ] 2203 2258 2204 2259 [[package]] ··· 2632 2687 dependencies = [ 2633 2688 "proc-macro2", 2634 2689 "quote", 2635 - "syn", 2690 + "syn 2.0.106", 2636 2691 ] 2637 2692 2638 2693 [[package]] ··· 2772 2827 2773 2828 [[package]] 2774 2829 name = "itertools" 2830 + version = "0.13.0" 2831 + source = "registry+https://github.com/rust-lang/crates.io-index" 2832 + checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" 2833 + dependencies = [ 2834 + "either", 2835 + ] 2836 + 2837 + [[package]] 2838 + name = "itertools" 2775 2839 version = "0.14.0" 2776 2840 source = "registry+https://github.com/rust-lang/crates.io-index" 2777 2841 checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" ··· 2827 2891 dependencies = [ 2828 2892 "proc-macro2", 2829 2893 "quote", 2830 - "syn", 2894 + "syn 2.0.106", 2831 2895 ] 2832 2896 2833 2897 [[package]] ··· 3179 3243 ] 3180 3244 3181 3245 [[package]] 3246 + name = "madsim" 3247 + version = "0.2.32" 3248 + source = "registry+https://github.com/rust-lang/crates.io-index" 3249 + checksum = "db6694555643da293dfb89e33c2880a13b62711d64b6588bc7df6ce4110b27f1" 3250 + dependencies = [ 3251 + "ahash", 3252 + "async-channel", 3253 + "async-stream", 3254 + "async-task", 3255 + "bincode 1.3.3", 3256 + "bytes", 3257 + "downcast-rs", 3258 + "futures-util", 3259 + "lazy_static", 3260 + "libc", 3261 + "madsim-macros", 3262 + "naive-timer", 3263 + "panic-message", 3264 + "rand 0.8.5", 3265 + "rand_xoshiro 0.6.0", 3266 + "rustversion", 3267 + "serde", 3268 + "spin", 3269 + "tokio", 3270 + "tokio-util", 3271 + "toml 0.8.23", 3272 + "tracing", 3273 + "tracing-subscriber", 3274 + ] 3275 + 3276 + [[package]] 3277 + name = "madsim-macros" 3278 + version = "0.2.12" 3279 + source = "registry+https://github.com/rust-lang/crates.io-index" 3280 + checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" 3281 + dependencies = [ 3282 + "darling 0.14.4", 3283 + "proc-macro2", 3284 + "quote", 3285 + "syn 1.0.109", 3286 + ] 3287 + 3288 + [[package]] 3289 + name = "madsim-tokio" 3290 + version = "0.2.30" 3291 + source = "registry+https://github.com/rust-lang/crates.io-index" 3292 + checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5" 3293 + dependencies = [ 3294 + "madsim", 3295 + "spin", 3296 + "tokio", 3297 + ] 3298 + 3299 + [[package]] 3182 3300 name = "match_cfg" 3183 3301 version = "0.1.0" 3184 3302 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3198 3316 version = "0.8.4" 3199 3317 source = "registry+https://github.com/rust-lang/crates.io-index" 3200 3318 checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" 3201 - 3202 - [[package]] 3203 - name = "mea" 3204 - version = "0.6.3" 3205 - source = "registry+https://github.com/rust-lang/crates.io-index" 3206 - checksum = "6747f54621d156e1b47eb6b25f39a941b9fc347f98f67d25d8881ff99e8ed832" 3207 - dependencies = [ 3208 - "slab", 3209 - ] 3210 3319 3211 3320 [[package]] 3212 3321 name = "mediatype" ··· 3443 3552 ] 3444 3553 3445 3554 [[package]] 3555 + name = "naive-timer" 3556 + version = "0.2.0" 3557 + source = "registry+https://github.com/rust-lang/crates.io-index" 3558 + checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" 3559 + 3560 + [[package]] 3561 + name = "nanorand" 3562 + version = "0.7.0" 3563 + source = "registry+https://github.com/rust-lang/crates.io-index" 3564 + checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" 3565 + dependencies = [ 3566 + "getrandom 0.2.15", 3567 + ] 3568 + 3569 + [[package]] 3446 3570 name = "native-tls" 3447 3571 version = "0.2.14" 3448 3572 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3502 3626 3503 3627 [[package]] 3504 3628 name = "num-bigint-dig" 3505 - version = "0.8.6" 3629 + version = "0.8.4" 3506 3630 source = "registry+https://github.com/rust-lang/crates.io-index" 3507 - checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7" 3631 + checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" 3508 3632 dependencies = [ 3633 + "byteorder", 3509 3634 "lazy_static", 3510 3635 "libm", 3511 3636 "num-integer", ··· 3578 3703 ] 3579 3704 3580 3705 [[package]] 3581 - name = "num_cpus" 3582 - version = "1.17.0" 3583 - source = "registry+https://github.com/rust-lang/crates.io-index" 3584 - checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" 3585 - dependencies = [ 3586 - "hermit-abi", 3587 - "libc", 3588 - ] 3589 - 3590 - [[package]] 3591 3706 name = "num_threads" 3592 3707 version = "0.1.7" 3593 3708 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3658 3773 dependencies = [ 3659 3774 "proc-macro2", 3660 3775 "quote", 3661 - "syn", 3776 + "syn 2.0.106", 3662 3777 ] 3663 3778 3664 3779 [[package]] ··· 3690 3805 ] 3691 3806 3692 3807 [[package]] 3808 + name = "ordered_hash_map" 3809 + version = "0.4.0" 3810 + source = "registry+https://github.com/rust-lang/crates.io-index" 3811 + checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176" 3812 + dependencies = [ 3813 + "hashbrown 0.13.2", 3814 + ] 3815 + 3816 + [[package]] 3693 3817 name = "p256" 3694 3818 version = "0.13.2" 3695 3819 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3712 3836 ] 3713 3837 3714 3838 [[package]] 3839 + name = "panic-message" 3840 + version = "0.3.0" 3841 + source = "registry+https://github.com/rust-lang/crates.io-index" 3842 + checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" 3843 + 3844 + [[package]] 3715 3845 name = "parking" 3716 3846 version = "2.2.1" 3717 3847 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3820 3950 "pest_meta", 3821 3951 "proc-macro2", 3822 3952 "quote", 3823 - "syn", 3953 + "syn 2.0.106", 3824 3954 ] 3825 3955 3826 3956 [[package]] ··· 3850 3980 dependencies = [ 3851 3981 "proc-macro2", 3852 3982 "quote", 3853 - "syn", 3983 + "syn 2.0.106", 3854 3984 ] 3855 3985 3856 3986 [[package]] ··· 3968 4098 "proc-macro-crate", 3969 4099 "proc-macro2", 3970 4100 "quote", 3971 - "syn", 4101 + "syn 2.0.106", 3972 4102 ] 3973 4103 3974 4104 [[package]] ··· 4003 4133 source = "registry+https://github.com/rust-lang/crates.io-index" 4004 4134 checksum = "41273b691a3d467a8c44d05506afba9f7b6bd56c9cdf80123de13fe52d7ec587" 4005 4135 dependencies = [ 4006 - "darling", 4136 + "darling 0.20.11", 4007 4137 "http", 4008 4138 "indexmap 2.11.4", 4009 4139 "mime", ··· 4011 4141 "proc-macro2", 4012 4142 "quote", 4013 4143 "regex", 4014 - "syn", 4144 + "syn 2.0.106", 4015 4145 "thiserror 2.0.16", 4016 4146 ] 4017 4147 ··· 4052 4182 checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55" 4053 4183 dependencies = [ 4054 4184 "proc-macro2", 4055 - "syn", 4185 + "syn 2.0.106", 4056 4186 ] 4057 4187 4058 4188 [[package]] ··· 4363 4493 dependencies = [ 4364 4494 "proc-macro2", 4365 4495 "quote", 4366 - "syn", 4496 + "syn 2.0.106", 4367 4497 ] 4368 4498 4369 4499 [[package]] ··· 4514 4644 4515 4645 [[package]] 4516 4646 name = "rsa" 4517 - version = "0.9.10" 4647 + version = "0.9.8" 4518 4648 source = "registry+https://github.com/rust-lang/crates.io-index" 4519 - checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" 4649 + checksum = "78928ac1ed176a5ca1d17e578a1825f3d81ca54cf41053a592584b020cfd691b" 4520 4650 dependencies = [ 4521 4651 "const-oid", 4522 4652 "digest", ··· 4745 4875 "proc-macro2", 4746 4876 "quote", 4747 4877 "serde_derive_internals", 4748 - "syn", 4878 + "syn 2.0.106", 4749 4879 ] 4750 4880 4751 4881 [[package]] ··· 4878 5008 dependencies = [ 4879 5009 "proc-macro2", 4880 5010 "quote", 4881 - "syn", 5011 + "syn 2.0.106", 4882 5012 ] 4883 5013 4884 5014 [[package]] ··· 4889 5019 dependencies = [ 4890 5020 "proc-macro2", 4891 5021 "quote", 4892 - "syn", 5022 + "syn 2.0.106", 4893 5023 ] 4894 5024 4895 5025 [[package]] ··· 4943 5073 4944 5074 [[package]] 4945 5075 name = "serde_spanned" 5076 + version = "0.6.9" 5077 + source = "registry+https://github.com/rust-lang/crates.io-index" 5078 + checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" 5079 + dependencies = [ 5080 + "serde", 5081 + ] 5082 + 5083 + [[package]] 5084 + name = "serde_spanned" 4946 5085 version = "1.0.2" 4947 5086 source = "registry+https://github.com/rust-lang/crates.io-index" 4948 5087 checksum = "5417783452c2be558477e104686f7de5dae53dba813c28435e0e70f82d9b04ee" ··· 4959 5098 "proc-macro2", 4960 5099 "quote", 4961 5100 "serde", 4962 - "syn", 5101 + "syn 2.0.106", 4963 5102 ] 4964 5103 4965 5104 [[package]] ··· 4998 5137 source = "registry+https://github.com/rust-lang/crates.io-index" 4999 5138 checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" 5000 5139 dependencies = [ 5001 - "darling", 5140 + "darling 0.20.11", 5002 5141 "proc-macro2", 5003 5142 "quote", 5004 - "syn", 5143 + "syn 2.0.106", 5005 5144 ] 5006 5145 5007 5146 [[package]] ··· 5093 5232 5094 5233 [[package]] 5095 5234 name = "slab" 5096 - version = "0.4.12" 5235 + version = "0.4.9" 5097 5236 source = "registry+https://github.com/rust-lang/crates.io-index" 5098 - checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" 5237 + checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" 5238 + dependencies = [ 5239 + "autocfg", 5240 + ] 5099 5241 5100 5242 [[package]] 5101 5243 name = "slingshot" ··· 5184 5326 ] 5185 5327 5186 5328 [[package]] 5187 - name = "small_ctor" 5188 - version = "0.1.2" 5189 - source = "registry+https://github.com/rust-lang/crates.io-index" 5190 - checksum = "88414a5ca1f85d82cc34471e975f0f74f6aa54c40f062efa42c0080e7f763f81" 5191 - 5192 - [[package]] 5193 5329 name = "smallvec" 5194 5330 version = "1.15.0" 5195 5331 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5277 5413 5278 5414 [[package]] 5279 5415 name = "strsim" 5416 + version = "0.10.0" 5417 + source = "registry+https://github.com/rust-lang/crates.io-index" 5418 + checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" 5419 + 5420 + [[package]] 5421 + name = "strsim" 5280 5422 version = "0.11.1" 5281 5423 source = "registry+https://github.com/rust-lang/crates.io-index" 5282 5424 checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" ··· 5289 5431 5290 5432 [[package]] 5291 5433 name = "syn" 5434 + version = "1.0.109" 5435 + source = "registry+https://github.com/rust-lang/crates.io-index" 5436 + checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" 5437 + dependencies = [ 5438 + "proc-macro2", 5439 + "quote", 5440 + "unicode-ident", 5441 + ] 5442 + 5443 + [[package]] 5444 + name = "syn" 5292 5445 version = "2.0.106" 5293 5446 source = "registry+https://github.com/rust-lang/crates.io-index" 5294 5447 checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" ··· 5315 5468 dependencies = [ 5316 5469 "proc-macro2", 5317 5470 "quote", 5318 - "syn", 5471 + "syn 2.0.106", 5319 5472 ] 5320 5473 5321 5474 [[package]] ··· 5401 5554 dependencies = [ 5402 5555 "proc-macro2", 5403 5556 "quote", 5404 - "syn", 5557 + "syn 2.0.106", 5405 5558 ] 5406 5559 5407 5560 [[package]] ··· 5412 5565 dependencies = [ 5413 5566 "proc-macro2", 5414 5567 "quote", 5415 - "syn", 5568 + "syn 2.0.106", 5416 5569 ] 5417 5570 5418 5571 [[package]] ··· 5537 5690 dependencies = [ 5538 5691 "proc-macro2", 5539 5692 "quote", 5540 - "syn", 5693 + "syn 2.0.106", 5541 5694 ] 5542 5695 5543 5696 [[package]] ··· 5623 5776 5624 5777 [[package]] 5625 5778 name = "toml" 5779 + version = "0.8.23" 5780 + source = "registry+https://github.com/rust-lang/crates.io-index" 5781 + checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" 5782 + dependencies = [ 5783 + "serde", 5784 + "serde_spanned 0.6.9", 5785 + "toml_datetime 0.6.11", 5786 + "toml_edit", 5787 + ] 5788 + 5789 + [[package]] 5790 + name = "toml" 5626 5791 version = "0.9.7" 5627 5792 source = "registry+https://github.com/rust-lang/crates.io-index" 5628 5793 checksum = "00e5e5d9bf2475ac9d4f0d9edab68cc573dc2fd644b0dba36b0c30a92dd9eaa0" 5629 5794 dependencies = [ 5630 5795 "indexmap 2.11.4", 5631 5796 "serde_core", 5632 - "serde_spanned", 5797 + "serde_spanned 1.0.2", 5633 5798 "toml_datetime 0.7.2", 5634 5799 "toml_parser", 5635 5800 "toml_writer", ··· 5641 5806 version = "0.6.11" 5642 5807 source = "registry+https://github.com/rust-lang/crates.io-index" 5643 5808 checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" 5809 + dependencies = [ 5810 + "serde", 5811 + ] 5644 5812 5645 5813 [[package]] 5646 5814 name = "toml_datetime" ··· 5658 5826 checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" 5659 5827 dependencies = [ 5660 5828 "indexmap 2.11.4", 5829 + "serde", 5830 + "serde_spanned 0.6.9", 5661 5831 "toml_datetime 0.6.11", 5832 + "toml_write", 5662 5833 "winnow", 5663 5834 ] 5664 5835 ··· 5670 5841 dependencies = [ 5671 5842 "winnow", 5672 5843 ] 5844 + 5845 + [[package]] 5846 + name = "toml_write" 5847 + version = "0.1.2" 5848 + source = "registry+https://github.com/rust-lang/crates.io-index" 5849 + checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" 5673 5850 5674 5851 [[package]] 5675 5852 name = "toml_writer" ··· 5743 5920 dependencies = [ 5744 5921 "proc-macro2", 5745 5922 "quote", 5746 - "syn", 5923 + "syn 2.0.106", 5747 5924 ] 5748 5925 5749 5926 [[package]] ··· 5793 5970 dependencies = [ 5794 5971 "proc-macro2", 5795 5972 "quote", 5796 - "syn", 5973 + "syn 2.0.106", 5797 5974 ] 5798 5975 5799 5976 [[package]] ··· 6115 6292 "log", 6116 6293 "proc-macro2", 6117 6294 "quote", 6118 - "syn", 6295 + "syn 2.0.106", 6119 6296 "wasm-bindgen-shared", 6120 6297 ] 6121 6298 ··· 6150 6327 dependencies = [ 6151 6328 "proc-macro2", 6152 6329 "quote", 6153 - "syn", 6330 + "syn 2.0.106", 6154 6331 "wasm-bindgen-backend", 6155 6332 "wasm-bindgen-shared", 6156 6333 ] ··· 6303 6480 dependencies = [ 6304 6481 "proc-macro2", 6305 6482 "quote", 6306 - "syn", 6483 + "syn 2.0.106", 6307 6484 ] 6308 6485 6309 6486 [[package]] ··· 6314 6491 dependencies = [ 6315 6492 "proc-macro2", 6316 6493 "quote", 6317 - "syn", 6494 + "syn 2.0.106", 6318 6495 ] 6319 6496 6320 6497 [[package]] ··· 6620 6797 dependencies = [ 6621 6798 "proc-macro2", 6622 6799 "quote", 6623 - "syn", 6800 + "syn 2.0.106", 6624 6801 "synstructure", 6625 6802 ] 6626 6803 ··· 6650 6827 dependencies = [ 6651 6828 "proc-macro2", 6652 6829 "quote", 6653 - "syn", 6830 + "syn 2.0.106", 6654 6831 ] 6655 6832 6656 6833 [[package]] ··· 6661 6838 dependencies = [ 6662 6839 "proc-macro2", 6663 6840 "quote", 6664 - "syn", 6841 + "syn 2.0.106", 6665 6842 ] 6666 6843 6667 6844 [[package]] ··· 6681 6858 dependencies = [ 6682 6859 "proc-macro2", 6683 6860 "quote", 6684 - "syn", 6861 + "syn 2.0.106", 6685 6862 "synstructure", 6686 6863 ] 6687 6864 ··· 6703 6880 dependencies = [ 6704 6881 "proc-macro2", 6705 6882 "quote", 6706 - "syn", 6883 + "syn 2.0.106", 6707 6884 ] 6708 6885 6709 6886 [[package]] ··· 6725 6902 dependencies = [ 6726 6903 "proc-macro2", 6727 6904 "quote", 6728 - "syn", 6905 + "syn 2.0.106", 6729 6906 ] 6730 6907 6731 6908 [[package]]
-3
Cargo.toml
··· 13 13 "pocket", 14 14 "reflector", 15 15 ] 16 - 17 - [workspace.dependencies] 18 - clap = { version = "4.5.56", features = ["derive", "env"] }
+1 -1
constellation/Cargo.toml
··· 11 11 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 12 12 axum-metrics = "0.2" 13 13 bincode = "1.3.3" 14 - clap = { workspace = true } 14 + clap = { version = "4.5.26", features = ["derive"] } 15 15 ctrlc = "3.4.5" 16 16 flume = { version = "0.11.1", default-features = false } 17 17 fs4 = { version = "0.12.0", features = ["sync"] }
+49 -54
constellation/src/bin/main.rs
··· 26 26 #[arg(long)] 27 27 #[clap(default_value = "0.0.0.0:6789")] 28 28 bind: SocketAddr, 29 - /// enable metrics collection and serving 30 - #[arg(long, action)] 29 + /// optionally disable the metrics server 30 + #[arg(long)] 31 + #[clap(default_value_t = false)] 31 32 collect_metrics: bool, 32 33 /// metrics server's listen address 33 - #[arg(long, requires("collect_metrics"))] 34 + #[arg(long)] 34 35 #[clap(default_value = "0.0.0.0:8765")] 35 36 bind_metrics: SocketAddr, 36 37 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: ··· 45 46 #[arg(short, long)] 46 47 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 47 48 backend: StorageBackend, 48 - /// Serve a did:web document for this domain 49 - #[arg(long)] 50 - did_web_domain: Option<String>, 51 49 /// Initiate a database backup into this dir, if supported by the storage 52 50 #[arg(long)] 53 51 backup: Option<PathBuf>, ··· 106 104 MemStorage::new(), 107 105 fixture, 108 106 None, 109 - args.did_web_domain, 110 107 stream, 111 108 bind, 112 109 metrics_bind, ··· 142 139 rocks, 143 140 fixture, 144 141 args.data, 145 - args.did_web_domain, 146 142 stream, 147 143 bind, 148 144 metrics_bind, ··· 164 160 mut storage: impl LinkStorage, 165 161 fixture: Option<PathBuf>, 166 162 data_dir: Option<PathBuf>, 167 - did_web_domain: Option<String>, 168 163 stream: String, 169 164 bind: SocketAddr, 170 165 metrics_bind: SocketAddr, ··· 217 212 if collect_metrics { 218 213 install_metrics_server(metrics_bind)?; 219 214 } 220 - serve(readable, bind, did_web_domain, staying_alive).await 215 + serve(readable, bind, staying_alive).await 221 216 }) 222 217 .unwrap(); 223 218 stay_alive.drop_guard(); ··· 227 222 // only spawn monitoring thread if the metrics server is running 228 223 if collect_metrics { 229 224 s.spawn(move || { // monitor thread 230 - let stay_alive = stay_alive.clone(); 231 - let check_alive = stay_alive.clone(); 225 + let stay_alive = stay_alive.clone(); 226 + let check_alive = stay_alive.clone(); 232 227 233 - let process_collector = metrics_process::Collector::default(); 234 - process_collector.describe(); 235 - metrics::describe_gauge!( 236 - "storage_available", 237 - metrics::Unit::Bytes, 238 - "available to be allocated" 239 - ); 240 - metrics::describe_gauge!( 241 - "storage_free", 242 - metrics::Unit::Bytes, 243 - "unused bytes in filesystem" 244 - ); 245 - if let Some(ref p) = data_dir { 246 - if let Err(e) = fs4::available_space(p) { 247 - eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 248 - } else { 249 - println!("disk space monitoring should work, watching at {p:?}"); 228 + let process_collector = metrics_process::Collector::default(); 229 + process_collector.describe(); 230 + metrics::describe_gauge!( 231 + "storage_available", 232 + metrics::Unit::Bytes, 233 + "available to be allocated" 234 + ); 235 + metrics::describe_gauge!( 236 + "storage_free", 237 + metrics::Unit::Bytes, 238 + "unused bytes in filesystem" 239 + ); 240 + if let Some(ref p) = data_dir { 241 + if let Err(e) = fs4::available_space(p) { 242 + eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 243 + } else { 244 + println!("disk space monitoring should work, watching at {p:?}"); 245 + } 246 + } 247 + 248 + 'monitor: loop { 249 + match readable.get_stats() { 250 + Ok(StorageStats { dids, targetables, linking_records, .. }) => { 251 + metrics::gauge!("storage.stats.dids").set(dids as f64); 252 + metrics::gauge!("storage.stats.targetables").set(targetables as f64); 253 + metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 250 254 } 255 + Err(e) => eprintln!("failed to get stats: {e:?}"), 251 256 } 252 257 253 - 'monitor: loop { 254 - match readable.get_stats() { 255 - Ok(StorageStats { dids, targetables, linking_records, .. }) => { 256 - metrics::gauge!("storage.stats.dids").set(dids as f64); 257 - metrics::gauge!("storage.stats.targetables").set(targetables as f64); 258 - metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 259 - } 260 - Err(e) => eprintln!("failed to get stats: {e:?}"), 258 + process_collector.collect(); 259 + if let Some(ref p) = data_dir { 260 + if let Ok(avail) = fs4::available_space(p) { 261 + metrics::gauge!("storage.available").set(avail as f64); 261 262 } 262 - 263 - process_collector.collect(); 264 - if let Some(ref p) = data_dir { 265 - if let Ok(avail) = fs4::available_space(p) { 266 - metrics::gauge!("storage.available").set(avail as f64); 267 - } 268 - if let Ok(free) = fs4::free_space(p) { 269 - metrics::gauge!("storage.free").set(free as f64); 270 - } 263 + if let Ok(free) = fs4::free_space(p) { 264 + metrics::gauge!("storage.free").set(free as f64); 271 265 } 272 - let wait = time::Instant::now(); 273 - while wait.elapsed() < MONITOR_INTERVAL { 274 - thread::sleep(time::Duration::from_millis(100)); 275 - if check_alive.is_cancelled() { 276 - break 'monitor 277 - } 266 + } 267 + let wait = time::Instant::now(); 268 + while wait.elapsed() < MONITOR_INTERVAL { 269 + thread::sleep(time::Duration::from_millis(100)); 270 + if check_alive.is_cancelled() { 271 + break 'monitor 278 272 } 279 273 } 280 - stay_alive.drop_guard(); 281 - }); 274 + } 275 + stay_alive.drop_guard(); 276 + }); 282 277 } 283 278 }); 284 279
-38
lexicons/blue.microcosm/links/getBacklinksCount.json
··· 1 - { 2 - "lexicon": 1, 3 - "id": "blue.microcosm.links.getBacklinksCount", 4 - "defs": { 5 - "main": { 6 - "type": "query", 7 - "description": "count records that link to another record", 8 - "parameters": { 9 - "type": "params", 10 - "required": ["subject", "source"], 11 - "properties": { 12 - "subject": { 13 - "type": "string", 14 - "format": "uri", 15 - "description": "the primary target being linked to (at-uri, did, or uri)" 16 - }, 17 - "source": { 18 - "type": "string", 19 - "description": "collection and path specification for the primary link" 20 - } 21 - } 22 - }, 23 - "output": { 24 - "encoding": "application/json", 25 - "schema": { 26 - "type": "object", 27 - "required": ["total"], 28 - "properties": { 29 - "total": { 30 - "type": "integer", 31 - "description": "total number of matching links" 32 - } 33 - } 34 - } 35 - } 36 - } 37 - } 38 - }
+1 -1
pocket/Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 atrium-crypto = "0.1.2" 8 - clap = { workspace = true } 8 + clap = { version = "4.5.41", features = ["derive"] } 9 9 jwt-compact = { git = "https://github.com/fatfingers23/jwt-compact.git", features = ["es256k"] } 10 10 log = "0.4.27" 11 11 poem = { version = "3.1.12", features = ["acme", "static-files"] }
+1 -1
quasar/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 - clap = { workspace = true } 7 + clap = { version = "4.5.46", features = ["derive"] } 8 8 fjall = "2.11.2"
+1 -1
readme.md
··· 10 10 Tutorials, how-to guides, and client SDK libraries are all in the works for gentler on-ramps, but are not quite ready yet. But don't let that stop you! Hop in the [microcosm discord](https://discord.gg/tcDfe4PGVB), or post questions and tag [@bad-example.com](https://bsky.app/profile/bad-example.com) on Bluesky if you get stuck anywhere. 11 11 12 12 > [!tip] 13 - > This repository's primary home is moving to tangled: [@microcosm.blue/microcosm-rs](https://tangled.org/microcosm.blue/microcosm-rs). It will continue to be mirrored on [github](https://github.com/at-microcosm/microcosm-rs) for the forseeable future, and it's fine to open issues or pulls in either place! 13 + > This repository's primary home is moving to tangled: [@microcosm.blue/microcosm-rs](https://tangled.sh/@microcosm.blue/microcosm-rs). It will continue to be mirrored on [github](https://github.com/at-microcosm/microcosm-rs) for the forseeable future, and it's fine to open issues or pulls in either place! 14 14 15 15 16 16 ๐ŸŒŒ [Constellation](./constellation/)
+1 -1
reflector/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 - clap = { workspace = true } 7 + clap = { version = "4.5.47", features = ["derive"] } 8 8 log = "0.4.28" 9 9 poem = "3.1.12" 10 10 serde = { version = "1.0.219", features = ["derive"] }
+2 -2
slingshot/Cargo.toml
··· 8 8 atrium-common = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 9 9 atrium-identity = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 10 10 atrium-oauth = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 11 - clap = { workspace = true } 11 + clap = { version = "4.5.41", features = ["derive"] } 12 12 ctrlc = "3.4.7" 13 - foyer = { version = "0.22.3", features = ["serde"] } 13 + foyer = { version = "0.18.0", features = ["serde"] } 14 14 hickory-resolver = "0.25.2" 15 15 jetstream = { path = "../jetstream", features = ["metrics"] } 16 16 links = { path = "../links" }
+2 -2
slingshot/api-description.md
··· 1 1 _A [gravitational slingshot](https://en.wikipedia.org/wiki/Gravity_assist) makes use of the gravity and relative movements of celestial bodies to accelerate a spacecraft and change its trajectory._ 2 2 3 3 4 - # Slingshot: edge record and identity cache 4 + # Slingshot: edge record cache 5 5 6 6 Applications in [ATProtocol](https://atproto.com/) store data in users' own [PDS](https://atproto.com/guides/self-hosting) (Personal Data Server), which are distributed across thousands of independently-run servers all over the world. Trying to access this data poses challenges for client applications: 7 7 ··· 90 90 - [๐ŸŽ‡ Spacedust](https://spacedust.microcosm.blue/), a firehose of all social interactions 91 91 92 92 > [!success] 93 - > All microcosm projects are [open source](https://tangled.org/bad-example.com/microcosm-links). **You can help sustain Slingshot** and all of microcosm by becoming a [Github sponsor](https://github.com/sponsors/uniphil/) or a [Ko-fi supporter](https://ko-fi.com/bad_example)! 93 + > All microcosm projects are [open source](https://tangled.sh/@bad-example.com/microcosm-links). **You can help sustain Slingshot** and all of microcosm by becoming a [Github sponsor](https://github.com/sponsors/uniphil/) or a [Ko-fi supporter](https://ko-fi.com/bad_example)!
-34
slingshot/readme.md
··· 5 5 ```bash 6 6 RUST_LOG=info,slingshot=trace ulimit -n 4096 && RUST_LOG=info cargo run -- --jetstream us-east-1 --cache-dir ./foyer 7 7 ``` 8 - 9 - the identity cache uses a lot of files so you probably need to bump ulimit 10 - 11 - on macos: 12 - 13 - ```bash 14 - ulimit -n 4096 15 - ``` 16 - 17 - ## prod deploy 18 - 19 - you **must** setcap the binary to run it on apollo!!!! 20 - 21 - ```bash 22 - sudo setcap CAP_NET_BIND_SERVICE=+eip ../target/release/slingshot 23 - ``` 24 - 25 - then run with 26 - 27 - ```bash 28 - RUST_BACKTRACE=1 RUST_LOG=info,slingshot=trace /home/ubuntu/links/target/release/slingshot \ 29 - --jetstream wss://jetstream1.us-east.fire.hose.cam/subscribe \ 30 - --healthcheck https://hc-ping.com/[REDACTED] \ 31 - --cache-dir ./foyer \ 32 - --record-cache-memory-mb 2048 \ 33 - --record-cache-disk-gb 32 \ 34 - --identity-cache-memory-mb 1024 \ 35 - --identity-cache-disk-gb 8 \ 36 - --collect-metrics \ 37 - --acme-ipv6 \ 38 - --acme-domain slingshot.microcosm.blue \ 39 - --acme-contact phil@bad-example.com \ 40 - --acme-cache-path /home/ubuntu/certs 41 - ```
+25 -41
slingshot/src/consumer.rs
··· 1 + use crate::CachedRecord; 1 2 use crate::error::ConsumerError; 2 - use crate::{CachedRecord, Identity, IdentityKey}; 3 3 use foyer::HybridCache; 4 4 use jetstream::{ 5 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 11 jetstream_endpoint: String, 12 12 cursor: Option<Cursor>, 13 13 no_zstd: bool, 14 - identity: Identity, 15 14 shutdown: CancellationToken, 16 15 cache: HybridCache<String, CachedRecord>, 17 16 ) -> Result<(), ConsumerError> { ··· 47 46 break; 48 47 }; 49 48 50 - match event.kind { 51 - EventKind::Commit => { 52 - let Some(ref mut commit) = event.commit else { 53 - log::warn!("consumer: commit event missing commit data, ignoring"); 54 - continue; 55 - }; 49 + if event.kind != EventKind::Commit { 50 + continue; 51 + } 52 + let Some(ref mut commit) = event.commit else { 53 + log::warn!("consumer: commit event missing commit data, ignoring"); 54 + continue; 55 + }; 56 56 57 - // TODO: something a bit more robust 58 - let at_uri = format!( 59 - "at://{}/{}/{}", 60 - &*event.did, &*commit.collection, &*commit.rkey 61 - ); 57 + // TODO: something a bit more robust 58 + let at_uri = format!( 59 + "at://{}/{}/{}", 60 + &*event.did, &*commit.collection, &*commit.rkey 61 + ); 62 62 63 - if commit.operation == CommitOp::Delete { 64 - cache.insert(at_uri, CachedRecord::Deleted); 65 - } else { 66 - let Some(record) = commit.record.take() else { 67 - log::warn!("consumer: commit insert or update missing record, ignoring"); 68 - continue; 69 - }; 70 - let Some(cid) = commit.cid.take() else { 71 - log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 - continue; 73 - }; 63 + if commit.operation == CommitOp::Delete { 64 + cache.insert(at_uri, CachedRecord::Deleted); 65 + } else { 66 + let Some(record) = commit.record.take() else { 67 + log::warn!("consumer: commit insert or update missing record, ignoring"); 68 + continue; 69 + }; 70 + let Some(cid) = commit.cid.take() else { 71 + log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 + continue; 73 + }; 74 74 75 - cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 - } 77 - } 78 - EventKind::Identity => { 79 - let Some(ident) = event.identity else { 80 - log::warn!("consumer: identity event missing identity data, ignoring"); 81 - continue; 82 - }; 83 - if let Some(handle) = ident.handle { 84 - metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1); 85 - identity.queue_refresh(IdentityKey::Handle(handle)).await; 86 - } 87 - metrics::counter!("identity_did_refresh_queued", "reason" => "identity event") 88 - .increment(1); 89 - identity.queue_refresh(IdentityKey::Did(ident.did)).await; 90 - } 91 - EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete) 75 + cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 92 76 } 93 77 } 94 78
+9 -21
slingshot/src/firehose_cache.rs
··· 1 1 use crate::CachedRecord; 2 - use foyer::{ 3 - BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder, 4 - PsyncIoEngineConfig, 5 - }; 2 + use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 6 3 use std::path::Path; 7 4 8 5 pub async fn firehose_cache( 9 6 cache_dir: impl AsRef<Path>, 10 - memory_mb: usize, 11 - disk_gb: usize, 12 7 ) -> Result<HybridCache<String, CachedRecord>, String> { 13 - let device = FsDeviceBuilder::new(cache_dir) 14 - .with_capacity(disk_gb * 2_usize.pow(30)) 15 - .build() 16 - .map_err(|e| format!("foyer device setup error: {e}"))?; 17 - 18 - let engine = BlockEngineConfig::new(device).with_block_size(16 * 2_usize.pow(20)); // note: this does limit the max cached item size 19 - 20 8 let cache = HybridCacheBuilder::new() 21 9 .with_name("firehose") 22 - .memory(memory_mb * 2_usize.pow(20)) 23 - .with_weighter(|k: &String, v: &CachedRecord| { 24 - std::mem::size_of_val(k.as_str()) + v.weight() 25 - }) 26 - .storage() 27 - .with_io_engine_config(PsyncIoEngineConfig::default()) 28 - .with_engine_config(engine) 10 + .memory(64 * 2_usize.pow(20)) 11 + .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 + .storage(Engine::large()) 13 + .with_device_options( 14 + DirectFsDeviceOptions::new(cache_dir) 15 + .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 16 + .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 17 + ) 29 18 .build() 30 19 .await 31 20 .map_err(|e| format!("foyer setup error: {e:?}"))?; 32 - 33 21 Ok(cache) 34 22 }
+49 -111
slingshot/src/identity.rs
··· 11 11 /// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param 12 12 /// 2. DID -> PDS resolution: so we know where to getRecord 13 13 /// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14 - use std::time::{Duration, Instant}; 14 + use std::time::Duration; 15 15 use tokio::sync::Mutex; 16 16 use tokio_util::sync::CancellationToken; 17 17 ··· 26 26 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 27 27 }; 28 28 use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but 29 - use foyer::{ 30 - BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder, 31 - PsyncIoEngineConfig, 32 - }; 29 + use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 33 30 use serde::{Deserialize, Serialize}; 34 31 use time::UtcDateTime; 35 32 ··· 38 35 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 39 36 40 37 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 - pub enum IdentityKey { 38 + enum IdentityKey { 42 39 Handle(Handle), 43 40 Did(Did), 44 41 } 45 42 46 - impl IdentityKey { 47 - fn weight(&self) -> usize { 48 - let s = match self { 49 - IdentityKey::Handle(h) => h.as_str(), 50 - IdentityKey::Did(d) => d.as_str(), 51 - }; 52 - std::mem::size_of::<Self>() + std::mem::size_of_val(s) 53 - } 54 - } 55 - 56 43 #[derive(Debug, Serialize, Deserialize)] 57 44 struct IdentityVal(UtcDateTime, IdentityData); 58 45 ··· 63 50 Doc(PartialMiniDoc), 64 51 } 65 52 66 - impl IdentityVal { 67 - fn weight(&self) -> usize { 68 - let wrapping = std::mem::size_of::<Self>(); 69 - let inner = match &self.1 { 70 - IdentityData::NotFound => 0, 71 - IdentityData::Did(d) => std::mem::size_of_val(d.as_str()), 72 - IdentityData::Doc(d) => { 73 - std::mem::size_of_val(d.unverified_handle.as_str()) 74 - + std::mem::size_of_val(d.pds.as_str()) 75 - + std::mem::size_of_val(d.signing_key.as_str()) 76 - } 77 - }; 78 - wrapping + inner 79 - } 80 - } 81 - 82 53 /// partial representation of a com.bad-example.identity mini atproto doc 83 54 /// 84 55 /// partial because the handle is not verified ··· 115 86 let Some(maybe_handle) = aka.strip_prefix("at://") else { 116 87 continue; 117 88 }; 118 - let Ok(valid_handle) = Handle::new(maybe_handle.to_lowercase()) else { 89 + let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 119 90 continue; 120 91 }; 121 92 unverified_handle = Some(valid_handle); ··· 186 157 /// multi-producer *single consumer* queue 187 158 refresh_queue: Arc<Mutex<RefreshQueue>>, 188 159 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 - refresher_task: Arc<Mutex<()>>, 160 + refresher: Arc<Mutex<()>>, 190 161 } 191 162 192 163 impl Identity { 193 - pub async fn new( 194 - cache_dir: impl AsRef<Path>, 195 - memory_mb: usize, 196 - disk_gb: usize, 197 - ) -> Result<Self, IdentityError> { 164 + pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> { 198 165 let http_client = Arc::new(DefaultHttpClient::default()); 199 166 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 200 167 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(), ··· 204 171 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(), 205 172 http_client: http_client.clone(), 206 173 }); 207 - 208 - let device = FsDeviceBuilder::new(cache_dir) 209 - .with_capacity(disk_gb * 2_usize.pow(30)) 210 - .build()?; 211 - let engine = BlockEngineConfig::new(device).with_block_size(2_usize.pow(20)); // note: this does limit the max cached item size 212 174 213 175 let cache = HybridCacheBuilder::new() 214 176 .with_name("identity") 215 - .memory(memory_mb * 2_usize.pow(20)) 216 - .with_weighter(|k: &IdentityKey, v: &IdentityVal| k.weight() + v.weight()) 217 - .storage() 218 - .with_io_engine_config(PsyncIoEngineConfig::default()) 219 - .with_engine_config(engine) 177 + .memory(16 * 2_usize.pow(20)) 178 + .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 179 + .storage(Engine::small()) 180 + .with_device_options( 181 + DirectFsDeviceOptions::new(cache_dir) 182 + .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 183 + .with_file_size(2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 184 + ) 220 185 .build() 221 186 .await?; 222 187 ··· 225 190 did_resolver: Arc::new(did_resolver), 226 191 cache, 227 192 refresh_queue: Default::default(), 228 - refresher_task: Default::default(), 193 + refresher: Default::default(), 229 194 }) 230 195 } 231 196 ··· 264 229 handle: &Handle, 265 230 ) -> Result<Option<Did>, IdentityError> { 266 231 let key = IdentityKey::Handle(handle.clone()); 267 - metrics::counter!("slingshot_get_handle").increment(1); 268 232 let entry = self 269 233 .cache 270 - .get_or_fetch(&key, { 234 + .fetch(key.clone(), { 271 235 let handle = handle.clone(); 272 236 let resolver = self.handle_resolver.clone(); 273 237 || async move { 274 - let t0 = Instant::now(); 275 - let (res, success) = match resolver.resolve(&handle).await { 276 - Ok(did) => ( 277 - Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 278 - "true", 279 - ), 280 - Err(atrium_identity::Error::NotFound) => ( 281 - Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)), 282 - "false", 283 - ), 284 - Err(other) => { 285 - log::debug!("other error resolving handle: {other:?}"); 286 - (Err(IdentityError::ResolutionFailed(other)), "false") 238 + match resolver.resolve(&handle).await { 239 + Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 240 + Err(atrium_identity::Error::NotFound) => { 241 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 287 242 } 288 - }; 289 - metrics::histogram!("slingshot_fetch_handle", "success" => success) 290 - .record(t0.elapsed()); 291 - res 243 + Err(other) => Err(foyer::Error::Other(Box::new({ 244 + log::debug!("other error resolving handle: {other:?}"); 245 + IdentityError::ResolutionFailed(other) 246 + }))), 247 + } 292 248 } 293 249 }) 294 250 .await?; ··· 302 258 } 303 259 IdentityData::NotFound => { 304 260 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 305 - metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 306 261 self.queue_refresh(key).await; 307 262 } 308 263 Ok(None) 309 264 } 310 265 IdentityData::Did(did) => { 311 266 if (now - *last_fetch) >= MIN_TTL { 312 - metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 313 267 self.queue_refresh(key).await; 314 268 } 315 269 Ok(Some(did.clone())) ··· 323 277 did: &Did, 324 278 ) -> Result<Option<PartialMiniDoc>, IdentityError> { 325 279 let key = IdentityKey::Did(did.clone()); 326 - metrics::counter!("slingshot_get_did_doc").increment(1); 327 280 let entry = self 328 281 .cache 329 - .get_or_fetch(&key, { 282 + .fetch(key.clone(), { 330 283 let did = did.clone(); 331 284 let resolver = self.did_resolver.clone(); 332 285 || async move { 333 - let t0 = Instant::now(); 334 - let (res, success) = match resolver.resolve(&did).await { 335 - Ok(did_doc) if did_doc.id != did.to_string() => ( 286 + match resolver.resolve(&did).await { 287 + Ok(did_doc) => { 336 288 // TODO: fix in atrium: should verify id is did 337 - Err(IdentityError::BadDidDoc( 338 - "did doc's id did not match did".to_string(), 339 - )), 340 - "false", 341 - ), 342 - Ok(did_doc) => match did_doc.try_into() { 343 - Ok(mini_doc) => ( 344 - Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))), 345 - "true", 346 - ), 347 - Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"), 348 - }, 349 - Err(atrium_identity::Error::NotFound) => ( 350 - Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)), 351 - "false", 352 - ), 353 - Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"), 354 - }; 355 - metrics::histogram!("slingshot_fetch_did_doc", "success" => success) 356 - .record(t0.elapsed()); 357 - res 289 + if did_doc.id != did.to_string() { 290 + return Err(foyer::Error::other(Box::new( 291 + IdentityError::BadDidDoc( 292 + "did doc's id did not match did".to_string(), 293 + ), 294 + ))); 295 + } 296 + let mini_doc = did_doc.try_into().map_err(|e| { 297 + foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e))) 298 + })?; 299 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))) 300 + } 301 + Err(atrium_identity::Error::NotFound) => { 302 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 303 + } 304 + Err(other) => Err(foyer::Error::Other(Box::new( 305 + IdentityError::ResolutionFailed(other), 306 + ))), 307 + } 358 308 } 359 309 }) 360 310 .await?; ··· 368 318 } 369 319 IdentityData::NotFound => { 370 320 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 371 - metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 372 321 self.queue_refresh(key).await; 373 322 } 374 323 Ok(None) 375 324 } 376 325 IdentityData::Doc(mini_did) => { 377 326 if (now - *last_fetch) >= MIN_TTL { 378 - metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 379 327 self.queue_refresh(key).await; 380 328 } 381 329 Ok(Some(mini_did.clone())) ··· 386 334 /// put a refresh task on the queue 387 335 /// 388 336 /// this can be safely called from multiple concurrent tasks 389 - pub async fn queue_refresh(&self, key: IdentityKey) { 337 + async fn queue_refresh(&self, key: IdentityKey) { 390 338 // todo: max queue size 391 339 let mut q = self.refresh_queue.lock().await; 392 340 if !q.items.contains(&key) { ··· 463 411 /// run the refresh queue consumer 464 412 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 465 413 let _guard = self 466 - .refresher_task 414 + .refresher 467 415 .try_lock() 468 416 .expect("there to only be one refresher running"); 469 417 loop { ··· 485 433 log::trace!("refreshing handle {handle:?}"); 486 434 match self.handle_resolver.resolve(handle).await { 487 435 Ok(did) => { 488 - metrics::counter!("identity_handle_refresh", "success" => "true") 489 - .increment(1); 490 436 self.cache.insert( 491 437 task_key.clone(), 492 438 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 493 439 ); 494 440 } 495 441 Err(atrium_identity::Error::NotFound) => { 496 - metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1); 497 442 self.cache.insert( 498 443 task_key.clone(), 499 444 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 500 445 ); 501 446 } 502 447 Err(err) => { 503 - metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1); 504 448 log::warn!( 505 449 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 506 450 ); ··· 515 459 Ok(did_doc) => { 516 460 // TODO: fix in atrium: should verify id is did 517 461 if did_doc.id != did.to_string() { 518 - metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 519 462 log::warn!( 520 463 "refreshed did doc failed: wrong did doc id. dropping refresh." 521 464 ); ··· 524 467 let mini_doc = match did_doc.try_into() { 525 468 Ok(md) => md, 526 469 Err(e) => { 527 - metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 528 470 log::warn!( 529 471 "converting mini doc failed: {e:?}. dropping refresh." 530 472 ); 531 473 continue; 532 474 } 533 475 }; 534 - metrics::counter!("identity_did_refresh", "success" => "true") 535 - .increment(1); 536 476 self.cache.insert( 537 477 task_key.clone(), 538 478 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 539 479 ); 540 480 } 541 481 Err(atrium_identity::Error::NotFound) => { 542 - metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1); 543 482 self.cache.insert( 544 483 task_key.clone(), 545 484 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 546 485 ); 547 486 } 548 487 Err(err) => { 549 - metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1); 550 488 log::warn!( 551 489 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 552 490 );
+1 -1
slingshot/src/lib.rs
··· 9 9 pub use consumer::consume; 10 10 pub use firehose_cache::firehose_cache; 11 11 pub use healthcheck::healthcheck; 12 - pub use identity::{Identity, IdentityKey}; 12 + pub use identity::Identity; 13 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 14 pub use server::serve;
+31 -86
slingshot/src/main.rs
··· 4 4 use slingshot::{ 5 5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6 6 }; 7 - use std::net::SocketAddr; 8 7 use std::path::PathBuf; 9 8 10 9 use clap::Parser; ··· 16 15 struct Args { 17 16 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 18 17 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 - #[arg(long, env = "SLINGSHOT_JETSTREAM")] 18 + #[arg(long)] 20 19 jetstream: String, 21 20 /// don't request zstd-compressed jetstream events 22 21 /// 23 22 /// reduces CPU at the expense of more ingress bandwidth 24 - #[arg(long, action, env = "SLINGSHOT_JETSTREAM_NO_ZSTD")] 23 + #[arg(long, action)] 25 24 jetstream_no_zstd: bool, 26 25 /// where to keep disk caches 27 - #[arg(long, env = "SLINGSHOT_CACHE_DIR")] 26 + #[arg(long)] 28 27 cache_dir: PathBuf, 29 - /// where to listen for incomming requests 30 - /// 31 - /// cannot be used with acme -- if you need ipv6 see --acme-ipv6 32 - #[arg(long, env = "SLINGSHOT_BIND")] 33 - #[clap(default_value = "0.0.0.0:8080")] 34 - bind: SocketAddr, 35 - /// memory cache size in megabytes for records 36 - #[arg(long, env = "SLINGSHOT_RECORD_CACHE_MEMORY_MB")] 37 - #[clap(default_value_t = 64)] 38 - record_cache_memory_mb: usize, 39 - /// disk cache size in gigabytes for records 40 - #[arg(long, env = "SLINGSHOT_RECORD_CACHE_DISK_DB")] 41 - #[clap(default_value_t = 1)] 42 - record_cache_disk_gb: usize, 43 - /// memory cache size in megabytes for identities 44 - #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_MEMORY_MB")] 45 - #[clap(default_value_t = 64)] 46 - identity_cache_memory_mb: usize, 47 - /// disk cache size in gigabytes for identities 48 - #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")] 49 - #[clap(default_value_t = 1)] 50 - identity_cache_disk_gb: usize, 51 28 /// the domain pointing to this server 52 29 /// 53 30 /// if present: 54 31 /// - a did:web document will be served at /.well-known/did.json 55 32 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 56 33 /// - TODO: a rate-limiter will be installed 57 - #[arg( 58 - long, 59 - conflicts_with("bind"), 60 - requires("acme_cache_path"), 61 - env = "SLINGSHOT_ACME_DOMAIN" 62 - )] 63 - acme_domain: Option<String>, 34 + #[arg(long)] 35 + domain: Option<String>, 64 36 /// email address for letsencrypt contact 65 37 /// 66 38 /// recommended in production, i guess? 67 - #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")] 39 + #[arg(long)] 68 40 acme_contact: Option<String>, 69 41 /// a location to cache acme https certs 70 42 /// 71 - /// required when (and only used when) --acme-domain is specified. 43 + /// only used if --host is specified. omitting requires re-requesting certs 44 + /// on every restart, and letsencrypt has rate limits that are easy to hit. 72 45 /// 73 46 /// recommended in production, but mind the file permissions. 74 - #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")] 75 - acme_cache_path: Option<PathBuf>, 76 - /// listen for ipv6 when using acme 77 - /// 78 - /// you must also configure the relevant DNS records for this to work 79 - #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")] 80 - acme_ipv6: bool, 47 + #[arg(long)] 48 + certs: Option<PathBuf>, 81 49 /// an web address to send healtcheck pings to every ~51s or so 82 - #[arg(long, env = "SLINGSHOT_HEALTHCHECK")] 50 + #[arg(long)] 83 51 healthcheck: Option<String>, 84 - /// enable metrics collection and serving 85 - #[arg(long, action, env = "SLINGSHOT_COLLECT_METRICS")] 86 - collect_metrics: bool, 87 - /// metrics server's listen address 88 - #[arg(long, requires("collect_metrics"), env = "SLINGSHOT_BIND_METRICS")] 89 - #[clap(default_value = "[::]:8765")] 90 - bind_metrics: std::net::SocketAddr, 91 52 } 92 53 93 54 #[tokio::main] ··· 101 62 102 63 let args = Args::parse(); 103 64 104 - if args.collect_metrics { 105 - log::trace!("installing metrics server..."); 106 - if let Err(e) = install_metrics_server(args.bind_metrics) { 107 - log::error!("failed to install metrics server: {e:?}"); 108 - } else { 109 - log::info!("metrics listening at http://{}", args.bind_metrics); 110 - } 65 + if let Err(e) = install_metrics_server() { 66 + log::error!("failed to install metrics server: {e:?}"); 67 + } else { 68 + log::info!("metrics listening at http://0.0.0.0:8765"); 111 69 } 112 70 113 71 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 125 83 log::info!("cache dir ready at at {cache_dir:?}."); 126 84 127 85 log::info!("setting up firehose cache..."); 128 - let cache = firehose_cache( 129 - cache_dir.join("./firehose"), 130 - args.record_cache_memory_mb, 131 - args.record_cache_disk_gb, 132 - ) 133 - .await?; 86 + let cache = firehose_cache(cache_dir.join("./firehose")).await?; 134 87 log::info!("firehose cache ready."); 135 88 136 89 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 137 90 138 91 log::info!("starting identity service..."); 139 - let identity = Identity::new( 140 - cache_dir.join("./identity"), 141 - args.identity_cache_memory_mb, 142 - args.identity_cache_disk_gb, 143 - ) 144 - .await 145 - .map_err(|e| format!("identity setup failed: {e:?}"))?; 146 - 92 + let identity = Identity::new(cache_dir.join("./identity")) 93 + .await 94 + .map_err(|e| format!("identity setup failed: {e:?}"))?; 147 95 log::info!("identity service ready."); 148 96 let identity_refresher = identity.clone(); 149 97 let identity_shutdown = shutdown.clone(); ··· 154 102 155 103 let repo = Repo::new(identity.clone()); 156 104 157 - let identity_for_server = identity.clone(); 158 105 let server_shutdown = shutdown.clone(); 159 106 let server_cache_handle = cache.clone(); 160 - let bind = args.bind; 161 107 tasks.spawn(async move { 162 108 serve( 163 109 server_cache_handle, 164 - identity_for_server, 110 + identity, 165 111 repo, 166 - args.acme_domain, 112 + args.domain, 167 113 args.acme_contact, 168 - args.acme_cache_path, 169 - args.acme_ipv6, 114 + args.certs, 170 115 server_shutdown, 171 - bind, 172 116 ) 173 117 .await?; 174 118 Ok(()) 175 119 }); 176 120 177 - let identity_refreshable = identity.clone(); 178 121 let consumer_shutdown = shutdown.clone(); 179 122 let consumer_cache = cache.clone(); 180 123 tasks.spawn(async move { ··· 182 125 args.jetstream, 183 126 None, 184 127 args.jetstream_no_zstd, 185 - identity_refreshable, 186 128 consumer_shutdown, 187 129 consumer_cache, 188 130 ) ··· 230 172 Ok(()) 231 173 } 232 174 233 - fn install_metrics_server( 234 - bind_metrics: std::net::SocketAddr, 235 - ) -> Result<(), metrics_exporter_prometheus::BuildError> { 175 + fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 236 176 log::info!("installing metrics server..."); 177 + let host = [0, 0, 0, 0]; 178 + let port = 8765; 237 179 PrometheusBuilder::new() 238 180 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 239 181 .set_bucket_duration(std::time::Duration::from_secs(300))? 240 182 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 241 183 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 242 - .with_http_listener(bind_metrics) 184 + .with_http_listener((host, port)) 243 185 .install()?; 244 186 log::info!( 245 - "metrics server installed! listening on http://{}", 246 - bind_metrics 187 + "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 188 + host[0], 189 + host[1], 190 + host[2], 191 + host[3] 247 192 ); 248 193 Ok(()) 249 194 }
+1 -12
slingshot/src/record.rs
··· 42 42 Deleted, 43 43 } 44 44 45 - impl CachedRecord { 46 - pub(crate) fn weight(&self) -> usize { 47 - let wrapping = std::mem::size_of::<Self>(); 48 - let inner = match self { 49 - CachedRecord::Found(RawRecord { record, .. }) => std::mem::size_of_val(record.as_str()), 50 - _ => 0, 51 - }; 52 - wrapping + inner 53 - } 54 - } 55 - 56 45 //////// upstream record fetching 57 46 58 47 #[derive(Deserialize)] ··· 83 72 pub fn new(identity: Identity) -> Self { 84 73 let client = Client::builder() 85 74 .user_agent(format!( 86 - "microcosm slingshot v{} (contact: @bad-example.com)", 75 + "microcosm slingshot v{} (dev: @bad-example.com)", 87 76 env!("CARGO_PKG_VERSION") 88 77 )) 89 78 .no_proxy()
+28 -103
slingshot/src/server.rs
··· 9 9 use std::path::PathBuf; 10 10 use std::str::FromStr; 11 11 use std::sync::Arc; 12 - use std::time::Instant; 13 12 use tokio_util::sync::CancellationToken; 14 13 15 14 use poem::{ 16 - Endpoint, EndpointExt, IntoResponse, Route, Server, 15 + Endpoint, EndpointExt, Route, Server, 17 16 endpoint::{StaticFileEndpoint, make_sync}, 18 17 http::Method, 19 18 listener::{ ··· 289 288 self.get_record_impl(repo, collection, rkey, cid).await 290 289 } 291 290 292 - /// blue.microcosm.repo.getRecordByUri 293 - /// 294 - /// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name 295 - #[oai( 296 - path = "/blue.microcosm.repo.getRecordByUri", 297 - method = "get", 298 - tag = "ApiTags::Custom" 299 - )] 300 - async fn get_record_by_uri( 301 - &self, 302 - /// The at-uri of the record 303 - /// 304 - /// The identifier can be a DID or an atproto handle, and the collection 305 - /// and rkey segments must be present. 306 - #[oai(example = "example_uri")] 307 - Query(at_uri): Query<String>, 308 - /// Optional: the CID of the version of the record. 309 - /// 310 - /// If not specified, then return the most recent version. 311 - /// 312 - /// > [!tip] 313 - /// > If specified and a newer version of the record exists, returns 404 not 314 - /// > found. That is: slingshot only retains the most recent version of a 315 - /// > record. 316 - Query(cid): Query<Option<String>>, 317 - ) -> GetRecordResponse { 318 - self.get_uri_record(Query(at_uri), Query(cid)).await 319 - } 320 - 321 291 /// com.bad-example.repo.getUriRecord 322 292 /// 323 293 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) ··· 405 375 #[oai(example = "example_handle")] 406 376 Query(handle): Query<String>, 407 377 ) -> JustDidResponse { 408 - let Ok(handle) = Handle::new(handle.to_lowercase()) else { 378 + let Ok(handle) = Handle::new(handle) else { 409 379 return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle")); 410 380 }; 411 381 ··· 443 413 })) 444 414 } 445 415 446 - /// blue.microcosm.identity.resolveMiniDoc 447 - /// 448 - /// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name 449 - #[oai( 450 - path = "/blue.microcosm.identity.resolveMiniDoc", 451 - method = "get", 452 - tag = "ApiTags::Custom" 453 - )] 454 - async fn resolve_mini_doc( 455 - &self, 456 - /// Handle or DID to resolve 457 - #[oai(example = "example_handle")] 458 - Query(identifier): Query<String>, 459 - ) -> ResolveMiniIDResponse { 460 - self.resolve_mini_id(Query(identifier)).await 461 - } 462 - 463 416 /// com.bad-example.identity.resolveMiniDoc 464 417 /// 465 418 /// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity) ··· 483 436 let did = match Did::new(identifier.clone()) { 484 437 Ok(did) => did, 485 438 Err(_) => { 486 - let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 439 + let Ok(alleged_handle) = Handle::new(identifier) else { 487 440 return invalid("Identifier was not a valid DID or handle"); 488 441 }; 489 442 ··· 560 513 let did = match Did::new(repo.clone()) { 561 514 Ok(did) => did, 562 515 Err(_) => { 563 - let Ok(handle) = Handle::new(repo.to_lowercase()) else { 516 + let Ok(handle) = Handle::new(repo) else { 564 517 return GetRecordResponse::BadRequest(xrpc_error( 565 518 "InvalidRequest", 566 519 "Repo was not a valid DID or handle", ··· 610 563 611 564 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey); 612 565 613 - metrics::counter!("slingshot_get_record").increment(1); 614 566 let fr = self 615 567 .cache 616 - .get_or_fetch(&at_uri, { 568 + .fetch(at_uri.clone(), { 617 569 let cid = cid.clone(); 618 570 let repo_api = self.repo.clone(); 619 571 || async move { 620 - let t0 = Instant::now(); 621 - let res = repo_api.get_record(&did, &collection, &rkey, &cid).await; 622 - let success = if res.is_ok() { "true" } else { "false" }; 623 - metrics::histogram!("slingshot_fetch_record", "success" => success) 624 - .record(t0.elapsed()); 625 - res 572 + repo_api 573 + .get_record(&did, &collection, &rkey, &cid) 574 + .await 575 + .map_err(|e| foyer::Error::Other(Box::new(e))) 626 576 } 627 577 }) 628 578 .await; 629 579 630 580 let entry = match fr { 631 581 Ok(e) => e, 632 - Err(e) if e.kind() == foyer::ErrorKind::External => { 633 - let record_error = match e.source().map(|s| s.downcast_ref::<RecordError>()) { 634 - Some(Some(e)) => e, 635 - other => { 636 - if other.is_none() { 637 - log::error!("external error without a source. wat? {e}"); 638 - } else { 639 - log::error!("downcast to RecordError failed...? {e}"); 640 - } 582 + Err(foyer::Error::Other(e)) => { 583 + let record_error = match e.downcast::<RecordError>() { 584 + Ok(e) => e, 585 + Err(e) => { 586 + log::error!("error (foyer other) getting cache entry, {e:?}"); 641 587 return GetRecordResponse::ServerError(xrpc_error( 642 588 "ServerError", 643 589 "sorry, something went wrong", 644 590 )); 645 591 } 646 592 }; 647 - let RecordError::UpstreamBadRequest(ErrorResponseObject { 648 - ref error, 649 - ref message, 650 - }) = *record_error 593 + let RecordError::UpstreamBadRequest(ErrorResponseObject { error, message }) = 594 + *record_error 651 595 else { 652 596 log::error!("RecordError getting cache entry, {record_error:?}"); 653 597 return GetRecordResponse::ServerError(xrpc_error( ··· 699 643 } 700 644 701 645 // TODO 646 + // #[oai(path = "/com.atproto.identity.resolveHandle", method = "get")] 702 647 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")] 703 648 // but these are both not specified to do bidirectional validation, which is what we want to offer 704 649 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc ··· 707 652 // handle -> verified did + pds url 708 653 // 709 654 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc 710 - // but this will *definitely* cause problems probably 711 - // 712 - // resolveMiniDoc gets most of this well enough. 655 + // but this will *definitely* cause problems because eg. we're not currently storing pubkeys and 656 + // those are a little bit important 713 657 } 714 658 715 659 #[derive(Debug, Clone, Serialize)] ··· 743 687 make_sync(move |_| doc.clone()) 744 688 } 745 689 746 - #[allow(clippy::too_many_arguments)] 747 690 pub async fn serve( 748 691 cache: HybridCache<String, CachedRecord>, 749 692 identity: Identity, 750 693 repo: Repo, 751 - acme_domain: Option<String>, 694 + domain: Option<String>, 752 695 acme_contact: Option<String>, 753 - acme_cache_path: Option<PathBuf>, 754 - acme_ipv6: bool, 696 + certs: Option<PathBuf>, 755 697 shutdown: CancellationToken, 756 - bind: std::net::SocketAddr, 757 698 ) -> Result<(), ServerError> { 758 699 let repo = Arc::new(repo); 759 700 let api_service = OpenApiService::new( ··· 765 706 "Slingshot", 766 707 env!("CARGO_PKG_VERSION"), 767 708 ) 768 - .server(if let Some(ref h) = acme_domain { 709 + .server(if let Some(ref h) = domain { 769 710 format!("https://{h}") 770 711 } else { 771 - format!("http://{bind}") // yeah should probably fix this for reverse-proxy scenarios but it's ok for dev for now 712 + "http://localhost:3000".to_string() 772 713 }) 773 714 .url_prefix("/xrpc") 774 715 .contact( ··· 786 727 .nest("/openapi", api_service.spec_endpoint()) 787 728 .nest("/xrpc/", api_service); 788 729 789 - if let Some(domain) = acme_domain { 730 + if let Some(domain) = domain { 790 731 rustls::crypto::aws_lc_rs::default_provider() 791 732 .install_default() 792 733 .expect("alskfjalksdjf"); ··· 799 740 if let Some(contact) = acme_contact { 800 741 auto_cert = auto_cert.contact(contact); 801 742 } 802 - if let Some(cache_path) = acme_cache_path { 803 - auto_cert = auto_cert.cache_path(cache_path); 743 + if let Some(certs) = certs { 744 + auto_cert = auto_cert.cache_path(certs); 804 745 } 805 746 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 806 747 807 748 run( 808 - TcpListener::bind(if acme_ipv6 { "[::]:443" } else { "0.0.0.0:443" }).acme(auto_cert), 749 + TcpListener::bind("0.0.0.0:443").acme(auto_cert), 809 750 app, 810 751 shutdown, 811 752 ) 812 753 .await 813 754 } else { 814 - run(TcpListener::bind(bind), app, shutdown).await 755 + run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await 815 756 } 816 757 } 817 758 ··· 827 768 .allow_credentials(false), 828 769 ) 829 770 .with(CatchPanic::new()) 830 - .around(request_counter) 831 771 .with(Tracing); 832 - 833 772 Server::new(listener) 834 773 .name("slingshot") 835 774 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 837 776 .map_err(ServerError::ServerExited) 838 777 .inspect(|()| log::info!("server ended. goodbye.")) 839 778 } 840 - 841 - async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> { 842 - let t0 = std::time::Instant::now(); 843 - let method = req.method().to_string(); 844 - let path = req.uri().path().to_string(); 845 - let res = next.call(req).await?.into_response(); 846 - metrics::histogram!( 847 - "server_request", 848 - "endpoint" => format!("{method} {path}"), 849 - "status" => res.status().to_string(), 850 - ) 851 - .record(t0.elapsed()); 852 - Ok(res) 853 - }
+2 -2
slingshot/static/index.html
··· 43 43 <body> 44 44 <header class="custom-header scalar-app"> 45 45 <p> 46 - get atproto records and identities faster 46 + TODO: thing 47 47 </p> 48 48 <nav> 49 49 <b>a <a href="https://microcosm.blue">microcosm</a> project</b> 50 50 <a href="https://bsky.app/profile/microcosm.blue">@microcosm.blue</a> 51 - <a href="https://tangled.org/microcosm.blue/microcosm-rs">tangled</a> 51 + <a href="https://github.com/at-microcosm">github</a> 52 52 </nav> 53 53 </header> 54 54
+1 -1
spacedust/Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 async-trait = "0.1.88" 8 - clap = { workspace = true } 8 + clap = { version = "4.5.40", features = ["derive"] } 9 9 ctrlc = "3.4.7" 10 10 dropshot = "0.16.2" 11 11 env_logger = "0.11.8"
+17 -26
spacedust/src/main.rs
··· 16 16 struct Args { 17 17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 18 18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 - #[arg(long, env = "SPACEDUST_JETSTREAM")] 19 + #[arg(long)] 20 20 jetstream: String, 21 21 /// don't request zstd-compressed jetstream events 22 22 /// 23 23 /// reduces CPU at the expense of more ingress bandwidth 24 - #[arg(long, action, env = "SPACEDUST_JETSTREAM_NO_ZSTD")] 24 + #[arg(long, action)] 25 25 jetstream_no_zstd: bool, 26 - /// spacedust server's listen address 27 - #[arg(long, env = "SPACEDUST_BIND")] 28 - #[clap(default_value = "[::]:8080")] 29 - bind: std::net::SocketAddr, 30 - /// enable metrics collection and serving 31 - #[arg(long, action, env = "SPACEDUST_COLLECT_METRICS")] 32 - collect_metrics: bool, 33 - /// metrics server's listen address 34 - #[arg(long, requires("collect_metrics"), env = "SPACEDUST_BIND_METRICS")] 35 - #[clap(default_value = "[::]:8765")] 36 - bind_metrics: std::net::SocketAddr, 37 26 } 38 27 39 28 #[tokio::main] ··· 71 60 72 61 let args = Args::parse(); 73 62 74 - if args.collect_metrics { 75 - log::trace!("installing metrics server..."); 76 - if let Err(e) = install_metrics_server(args.bind_metrics) { 77 - log::error!("failed to install metrics server: {e:?}"); 78 - }; 79 - } 63 + if let Err(e) = install_metrics_server() { 64 + log::error!("failed to install metrics server: {e:?}"); 65 + }; 80 66 81 67 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 82 68 83 69 let server_shutdown = shutdown.clone(); 84 - let bind = args.bind; 85 70 tasks.spawn(async move { 86 - server::serve(b, d, server_shutdown, bind).await?; 71 + server::serve(b, d, server_shutdown).await?; 87 72 Ok(()) 88 73 }); 89 74 ··· 137 122 Ok(()) 138 123 } 139 124 140 - fn install_metrics_server( 141 - bind: std::net::SocketAddr, 142 - ) -> Result<(), metrics_exporter_prometheus::BuildError> { 125 + fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 143 126 log::info!("installing metrics server..."); 127 + let host = [0, 0, 0, 0]; 128 + let port = 8765; 144 129 PrometheusBuilder::new() 145 130 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 146 131 .set_bucket_duration(std::time::Duration::from_secs(300))? 147 132 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 148 133 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 149 - .with_http_listener(bind) 134 + .with_http_listener((host, port)) 150 135 .install()?; 151 - log::info!("metrics server installed! listening on {bind}"); 136 + log::info!( 137 + "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 138 + host[0], 139 + host[1], 140 + host[2], 141 + host[3] 142 + ); 152 143 Ok(()) 153 144 }
+1 -2
spacedust/src/server.rs
··· 29 29 b: broadcast::Sender<Arc<ClientMessage>>, 30 30 d: broadcast::Sender<Arc<ClientMessage>>, 31 31 shutdown: CancellationToken, 32 - bind: std::net::SocketAddr, 33 32 ) -> Result<(), ServerError> { 34 33 let config_logging = ConfigLogging::StderrTerminal { 35 34 level: ConfigLoggingLevel::Info, ··· 73 72 74 73 let server = ServerBuilder::new(api, ctx, log) 75 74 .config(ConfigDropshot { 76 - bind_address: bind, 75 + bind_address: "0.0.0.0:9998".parse().unwrap(), 77 76 ..Default::default() 78 77 }) 79 78 .start()?;
+1 -1
ufos/Cargo.toml
··· 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 11 cardinality-estimator-safe = { version = "4.0.2", features = ["with_serde", "with_digest"] } 12 12 chrono = { version = "0.4.41", features = ["serde"] } 13 - clap = { workspace = true } 13 + clap = { version = "4.5.31", features = ["derive"] } 14 14 dropshot = "0.16.0" 15 15 env_logger = "0.11.7" 16 16 fjall = { git = "https://github.com/fjall-rs/fjall.git", rev = "fb229572bb7d1d6966a596994dc1708e47ec57d8", features = ["lz4"] }
+21 -27
ufos/src/main.rs
··· 26 26 struct Args { 27 27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 28 28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 29 - #[arg(long, env = "UFOS_JETSTREAM")] 29 + #[arg(long)] 30 30 jetstream: String, 31 31 /// allow changing jetstream endpoints 32 - #[arg(long, action, env = "UFOS_JETSTREAM_FORCE")] 32 + #[arg(long, action)] 33 33 jetstream_force: bool, 34 34 /// don't request zstd-compressed jetstream events 35 35 /// 36 36 /// reduces CPU at the expense of more ingress bandwidth 37 - #[arg(long, action, env = "UFOS_JETSTREAM_NO_ZSTD")] 37 + #[arg(long, action)] 38 38 jetstream_no_zstd: bool, 39 - /// ufos server's listen address 40 - #[arg(long, env = "UFOS_BIND")] 41 - #[clap(default_value = "0.0.0.0:9990")] 42 - bind: std::net::SocketAddr, 43 39 /// Location to store persist data to disk 44 - #[arg(long, env = "UFOS_DATA")] 40 + #[arg(long)] 45 41 data: PathBuf, 46 42 /// DEBUG: don't start the jetstream consumer or its write loop 47 - #[arg(long, action, env = "UFOS_PAUSE_WRITER")] 43 + #[arg(long, action)] 48 44 pause_writer: bool, 49 45 /// Adjust runtime settings like background task intervals for efficient backfill 50 - #[arg(long, action, env = "UFOS_BACKFILL_MODE")] 46 + #[arg(long, action)] 51 47 backfill: bool, 52 48 /// DEBUG: force the rw loop to fall behind by pausing it 53 49 /// todo: restore this 54 50 #[arg(long, action)] 55 51 pause_rw: bool, 56 52 /// reset the rollup cursor, scrape through missed things in the past (backfill) 57 - #[arg(long, action, env = "UFOS_REROLL")] 53 + #[arg(long, action)] 58 54 reroll: bool, 59 55 /// DEBUG: interpret jetstream as a file fixture 60 - #[arg(long, action, env = "UFOS_JETSTREAM_FIXTURE")] 56 + #[arg(long, action)] 61 57 jetstream_fixture: bool, 62 - /// enable metrics collection and serving 63 - #[arg(long, action, env = "UFOS_COLLECT_METRICS")] 64 - collect_metrics: bool, 65 - /// metrics server's listen address 66 - #[arg(long, env = "UFOS_BIND_METRICS")] 67 - #[clap(default_value = "0.0.0.0:8765")] 68 - bind_metrics: std::net::SocketAddr, 69 58 } 70 59 71 60 #[tokio::main] ··· 95 84 let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 96 85 97 86 println!("starting server with storage..."); 98 - let serving = server::serve(read_store.clone(), args.bind); 87 + let serving = server::serve(read_store.clone()); 99 88 whatever_tasks.spawn(async move { 100 89 serving.await.map_err(|e| { 101 90 log::warn!("server ended: {e}"); ··· 148 137 Ok(()) 149 138 }); 150 139 151 - if args.collect_metrics { 152 - log::trace!("installing metrics server..."); 153 - install_metrics_server(args.bind_metrics)?; 154 - } 140 + install_metrics_server()?; 155 141 156 142 for (i, t) in consumer_tasks.join_all().await.iter().enumerate() { 157 143 log::warn!("task {i} done: {t:?}"); ··· 165 151 Ok(()) 166 152 } 167 153 168 - fn install_metrics_server(bind: std::net::SocketAddr) -> anyhow::Result<()> { 154 + fn install_metrics_server() -> anyhow::Result<()> { 169 155 log::info!("installing metrics server..."); 156 + let host = [0, 0, 0, 0]; 157 + let port = 8765; 170 158 PrometheusBuilder::new() 171 159 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 172 160 .set_bucket_duration(Duration::from_secs(60))? 173 161 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here. 174 162 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 175 - .with_http_listener(bind) 163 + .with_http_listener((host, port)) 176 164 .install()?; 177 - log::info!("metrics server installed! listening on {bind}"); 165 + log::info!( 166 + "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 167 + host[0], 168 + host[1], 169 + host[2], 170 + host[3] 171 + ); 178 172 Ok(()) 179 173 } 180 174
+2 -5
ufos/src/server/mod.rs
··· 716 716 .await 717 717 } 718 718 719 - pub async fn serve( 720 - storage: impl StoreReader + 'static, 721 - bind: std::net::SocketAddr, 722 - ) -> Result<(), String> { 719 + pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 723 720 describe_metrics(); 724 721 let log = ConfigLogging::StderrTerminal { 725 722 level: ConfigLoggingLevel::Warn, ··· 761 758 762 759 ServerBuilder::new(api, context, log) 763 760 .config(ConfigDropshot { 764 - bind_address: bind, 761 + bind_address: "0.0.0.0:9999".parse().unwrap(), 765 762 ..Default::default() 766 763 }) 767 764 .start()
+1 -1
who-am-i/Cargo.toml
··· 11 11 axum = "0.8.4" 12 12 axum-extra = { version = "0.10.1", features = ["cookie-signed", "typed-header"] } 13 13 axum-template = { version = "3.0.0", features = ["handlebars"] } 14 - clap = { workspace = true } 14 + clap = { version = "4.5.40", features = ["derive", "env"] } 15 15 ctrlc = "3.4.7" 16 16 dashmap = "6.1.0" 17 17 elliptic-curve = "0.13.8"

History

8 rounds 13 comments
sign up or login to add to the discussion
11 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
Address feedback from fig
expand 0 comments
pull request successfully merged
10 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
wip
Fix rocks-store to match mem-store composite cursor
expand 0 comments
8 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
Fix get_many_to_many pagination with composite cursor
Fix get_many_to_many_counts pagination with fetch N+1
expand 1 comment

Okay. I wrapped my head around the composite cursor you proposed and am working on refactoring both storage implementations towards that. I think I might re-submit another round tomorrow :)

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 3 comments

Found a bug in how we handle some of the pagination logic in cases where the number of items and the user selected limit are identical to very close too each other (already working on a fix)

thanks for the rebase! i tried to write things in the tiny text box but ended up needing to make a diagram: https://bsky.app/profile/did:plc:hdhoaan3xa3jiuq4fg4mefid/post/3mejuq44twc2t

key thing is that where the focus of getManyToManyCounts was the other subject (aggregation was against that, so grouping happened with it),

i think the focus of disagreggated many-to-many is on the linking records themselves

to me that takes me toward a few things

  • i don't think we should need to group the links by target (does the current code build up the full aggregation on every requested page? we should be able to avoid doing that)

  • i think the order of the response should actually be based on the linking record itself (since we have a row in the output), not the other subject, unlike with the aggregated/count version. this means you get eg. list items in order they were added instead of the order of the listed things being created. (i haven't fully wrapped my head around the grouping/ordering code here yet)

  • since any linking record can have a path_to_other with multiple links, i think a composite cursor could work here:

a 2-tuple of (backlink_vec_idx, forward_vec_idx).

for normal cases where the many-to-many record points to exactly one other subject, it would just be advancing backlink_vec_idx like normal backlinks

for cases where the many-to-many record actually has multiple foward links at the given path_to_other, the second part of the tuple would track progress through that list

i think that allows us to hold the necessary state between calls without needing to reconstruct too much in memory each time?

(also it's hard to write in this tiny tiny textbox and have a sense of whether what i'm saying makes sense)

Interesting approach! I have to think through this for a bit to be honest. Maybe I tried to follow the existing counts implementation too closely

Having said that, I added a new composite cursor to fix a couple of bugs that would arrive when hitting a couple of possible edge-cases in the pagination logic. This affects both the new get-many-to-many endpoint as well as the existing get-many-to-many-counts endpoint. As the changes are split over two distinct commits things should be straightforward to review.

Your assumption is still correct in the sense that we do indeed have to build up the aggregation again for every request. I have to double-check the get-backlinks endpoint to get a better sense of where you're going at.

Finally, I agree that the interface here doesn't necessarily make the whole thing easier to understand, unfortunately

6 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
Use record_id/subject tuple as return type for get_many_to_many
expand 2 comments

i think something got funky with a rebase or the way tangled is showing it -- some of my changes on main seem to be getting shown (reverted) in the diff.

i don't mind sorting it locally but will mostly get to it tomorrow, in case you want to see what's up before i do.

That's one on me, sorry! Rebased again on main and now everything seems fine

5 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
Fix conflicts after rebasing on main
expand 5 comments

Rebased on main. As we discussed in the PR for the order query parameter, I didn't include this here as it's not a particular sensible fit.

i need to get into the code properly but my initial thought is that this endpoint should return a flat list of results, like

{
  "items": [
    {
      "link": { did, collection, rkey }, // the m2m link record
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "a.com"
    },
    {
      "link": { did, collection, rkey },
      "subject": "b.com"
    },
  ]
}

this will require a bit of tricks in the cursor to track pages across half-finished groups of links

(also this isn't an immediate change request, just getting it down for discussion!)

(and separately, i've also been wondering about moving more toward returning at-uris instead of broken-out did/collection/rkey objects. which isn't specifically about this PR, but if that happens then switching before releasing it is nice)

Hmm, I wonder how this would then work with the path_to_other parameter. Currently we have this nested grouping in order to show and disambiguate different relationships between different links.

For instance take the following query and it's results:

http://localhost:6789/xrpc/blue.microcosm.links.getManyToMany?subject=at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y&source=app.bsky.feed.post:reply.root.uri&pathToOther=reply.parent.uri&limit=16

This query asks: "Show me all posts in this thread, grouped by who they're responding to."

A flat list would just give us all the posts in the thread. The nested structure answers a richer question: who's talking to whom? Some posts are direct responses to the original article. Others are replies to other commenters, forming side conversations that branch off from the main thread.

The pathToOther grouping preserves that distinction. Without it, we'd lose the information about who's talking to whom.

{
  "linking_records": [
    {
      "subject": "at://did:plc:2w45zyhuklwihpdc7oj3mi63/app.bsky.feed.post/3mdbbkuq6t32y",
      "records": [
        {
          "did": "did:plc:lznqwrsbnyf6fdxohikqj6h3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd27pja7s2y"
        },
        {
          "did": "did:plc:uffx77au6hoauuuumkbuvqdr",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2tt5efc2a"
        },
        {
          "did": "did:plc:y7qyxzo7dns5m54dlq3youu3",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd2wtjxgc2d"
        },
        {
          "did": "did:plc:yaakslxyqydb76ybgkhrr4jk",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd35hyads22"
        },
        {
          "did": "did:plc:fia7w2kbnrdjwp6zvxywt7qv",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd37j3ldk2m"
        },
        {
          "did": "did:plc:xtecipifublblkomwau5x2ok",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd3dbtbz22n"
        },
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd42hpw7c2e"
        },
        {
          "did": "did:plc:fgquypfh32pewivn3bcmzseb",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd46jteoc2m"
        }
      ]
    },
    {
      "subject": "at://did:plc:3rhjxwwui6wwfokh4at3q2dl/app.bsky.feed.post/3mdczc7c4gk2i",
      "records": [
        {
          "did": "did:plc:3rhjxwwui6wwfokh4at3q2dl",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdczt7cwhk2i"
        }
      ]
    },
    {
      "subject": "at://did:plc:6buibzhkqr4vkqu75ezr2uv2/app.bsky.feed.post/3mdby25hbbk2v",
      "records": [
        {
          "did": "did:plc:fgeie2bmzlmx37iglj3xbzuj",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd26ulf4k2j"
        }
      ]
    },
    {
      "subject": "at://did:plc:lwgvv5oqh5stzb6dxa5d7z3n/app.bsky.feed.post/3mdcxqbkkfk2i",
      "records": [
        {
          "did": "did:plc:hl5lhiy2qr4nf5e4eefldvme",
          "collection": "app.bsky.feed.post",
          "rkey": "3mdd45u56sk2e"
        }
      ]
    }
  ],
  "cursor": null
}

Correct me if I'm somehow wrong here!

Regarding returning at-uris: I think this might be a nice idea as users might be able to split these up when they feel the need to any way and it feels conceptually more complete. But, it might be easier to do this in a different PR over all existing XRPC endpoints. This would allow us to add this new endpoint already while working on the updated return values in the meantime. I'd like to avoid doing too much distinct stuff in one PR. :)

at-uris: totally fair, holding off for a follow-up.

flat list: i might have messed it up in my example but i think what i meant is actually equivalent to the grouped version: flattened, with the subject ("group by") included with every item in the flatted list.

clients can collect the flat list and group on subject to get back to your structured example, if they want.

my motivations are probably part sql-brain, part flat-list-enjoyer, and part cursor-related. i'm trying to disregard the first two, and i'm curious about your thoughts about how to handle the cursor:

with a flat list it's easy (from the client perspective at least) -- just keep chasing the cursor for as much of the data as you need. (cursors can happen in the middle of a subject)

with nested results grouped by subject it's less obvious to me. correct me if i'm wrong (need another block of time to actually get into the code) but i think the grouped item sub-list is unbounded size in the proposed code here? so cursors are only limiting the number of groups.

if we go with the grouped nested response, i think maybe we'd want something like:

  • a cursor at the end for fetching more groups, and
  • a cursor for each group-list that lets you fetch more items from just that group-list.

(i think this kind of nested paging is pretty neat!)

Interesting. Now that you mention it I feel I kinda get where you're going at!

I think the whole cursor thing, albeit possible for sure, is kinda creating more unnecessary complexity so I'll probably go with your suggestion.

It seems easier to create custom groupings on their own for most users (having more freedom is always great) and I think from an ergonomic perspective the two cursors might create more friction.

4 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
Replace tuple with RecordsBySubject struct
expand 1 comment

Added the missing lexicon entry for the new endpoint and changed the return type as well. Commented this wrongly at the other PR that I was working on. Sorry about that lol.

3 commits
expand
wip: m2m
Add tests for new get_many_to_many query handler
Fix get_m2m_empty test
expand 1 comment

I think the existing get_many_to_many_counts handler and the new get_many_to_many handler are similar enough that we might extract the bulk of their logic in a shared piece of logic. Maybe a method that takes the existing identical function parameters and a new additional callback parameter (that handles what we do with found matches, i.e. calculate counts or join URIs) might be one way to go for it.

I am not too sure yet though if this is indeed the right thing to do as the new shared implementation might be a bit complicated. But given the strong similarities between the two I think it's worth at least considering.