Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Merge branch 'main' of github.com:at-microcosm/links into jv-bind

+839 -188
+1
.gitignore
··· 1 /target 2 local/
··· 1 /target 2 local/ 3 + rocks.test
+4
.prettierrc
···
··· 1 + { 2 + "tabWidth": 2, 3 + "useTabs": false 4 + }
+7 -8
Cargo.lock
··· 1375 checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" 1376 dependencies = [ 1377 "data-encoding", 1378 - "syn 2.0.106", 1379 ] 1380 1381 [[package]] ··· 1796 [[package]] 1797 name = "fjall" 1798 version = "2.11.2" 1799 - source = "git+https://github.com/fjall-rs/fjall.git#42d811f7c8cc9004407d520d37d2a1d8d246c03d" 1800 dependencies = [ 1801 "byteorder", 1802 "byteview", ··· 3626 3627 [[package]] 3628 name = "num-bigint-dig" 3629 - version = "0.8.4" 3630 source = "registry+https://github.com/rust-lang/crates.io-index" 3631 - checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" 3632 dependencies = [ 3633 - "byteorder", 3634 "lazy_static", 3635 "libm", 3636 "num-integer", ··· 4644 4645 [[package]] 4646 name = "rsa" 4647 - version = "0.9.8" 4648 source = "registry+https://github.com/rust-lang/crates.io-index" 4649 - checksum = "78928ac1ed176a5ca1d17e578a1825f3d81ca54cf41053a592584b020cfd691b" 4650 dependencies = [ 4651 "const-oid", 4652 "digest", ··· 6049 "clap", 6050 "dropshot", 6051 "env_logger", 6052 - "fjall 2.11.2 (git+https://github.com/fjall-rs/fjall.git)", 6053 "getrandom 0.3.3", 6054 "http", 6055 "jetstream",
··· 1375 checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" 1376 dependencies = [ 1377 "data-encoding", 1378 + "syn 1.0.109", 1379 ] 1380 1381 [[package]] ··· 1796 [[package]] 1797 name = "fjall" 1798 version = "2.11.2" 1799 + source = "git+https://github.com/fjall-rs/fjall.git?rev=fb229572bb7d1d6966a596994dc1708e47ec57d8#fb229572bb7d1d6966a596994dc1708e47ec57d8" 1800 dependencies = [ 1801 "byteorder", 1802 "byteview", ··· 3626 3627 [[package]] 3628 name = "num-bigint-dig" 3629 + version = "0.8.6" 3630 source = "registry+https://github.com/rust-lang/crates.io-index" 3631 + checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7" 3632 dependencies = [ 3633 "lazy_static", 3634 "libm", 3635 "num-integer", ··· 4643 4644 [[package]] 4645 name = "rsa" 4646 + version = "0.9.10" 4647 source = "registry+https://github.com/rust-lang/crates.io-index" 4648 + checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" 4649 dependencies = [ 4650 "const-oid", 4651 "digest", ··· 6048 "clap", 6049 "dropshot", 6050 "env_logger", 6051 + "fjall 2.11.2 (git+https://github.com/fjall-rs/fjall.git?rev=fb229572bb7d1d6966a596994dc1708e47ec57d8)", 6052 "getrandom 0.3.3", 6053 "http", 6054 "jetstream",
+19 -3
constellation/src/bin/main.rs
··· 26 #[arg(long)] 27 #[clap(default_value = "0.0.0.0:6789")] 28 bind: SocketAddr, 29 /// metrics server's listen address 30 #[arg(long)] 31 #[clap(default_value = "0.0.0.0:8765")] ··· 92 let bind = args.bind; 93 let metrics_bind = args.bind_metrics; 94 95 let stay_alive = CancellationToken::new(); 96 97 match args.backend { ··· 102 stream, 103 bind, 104 metrics_bind, 105 stay_alive, 106 ), 107 #[cfg(feature = "rocks")] ··· 136 stream, 137 bind, 138 metrics_bind, 139 stay_alive, 140 ); 141 eprintln!("run finished: {r:?}"); ··· 147 } 148 } 149 150 fn run( 151 mut storage: impl LinkStorage, 152 fixture: Option<PathBuf>, ··· 154 stream: String, 155 bind: SocketAddr, 156 metrics_bind: SocketAddr, 157 stay_alive: CancellationToken, 158 ) -> Result<()> { 159 ctrlc::set_handler({ ··· 198 .build() 199 .expect("axum startup") 200 .block_on(async { 201 - install_metrics_server(metrics_bind)?; 202 serve(readable, bind, staying_alive).await 203 }) 204 .unwrap(); ··· 206 } 207 }); 208 209 - s.spawn(move || { // monitor thread 210 let stay_alive = stay_alive.clone(); 211 let check_alive = stay_alive.clone(); 212 ··· 258 } 259 } 260 stay_alive.drop_guard(); 261 - }); 262 }); 263 264 println!("byeeee");
··· 26 #[arg(long)] 27 #[clap(default_value = "0.0.0.0:6789")] 28 bind: SocketAddr, 29 + /// optionally disable the metrics server 30 + #[arg(long)] 31 + #[clap(default_value_t = false)] 32 + collect_metrics: bool, 33 /// metrics server's listen address 34 #[arg(long)] 35 #[clap(default_value = "0.0.0.0:8765")] ··· 96 let bind = args.bind; 97 let metrics_bind = args.bind_metrics; 98 99 + let collect_metrics = args.collect_metrics; 100 let stay_alive = CancellationToken::new(); 101 102 match args.backend { ··· 107 stream, 108 bind, 109 metrics_bind, 110 + collect_metrics, 111 stay_alive, 112 ), 113 #[cfg(feature = "rocks")] ··· 142 stream, 143 bind, 144 metrics_bind, 145 + collect_metrics, 146 stay_alive, 147 ); 148 eprintln!("run finished: {r:?}"); ··· 154 } 155 } 156 157 + #[allow(clippy::too_many_lines)] 158 + #[allow(clippy::too_many_arguments)] 159 fn run( 160 mut storage: impl LinkStorage, 161 fixture: Option<PathBuf>, ··· 163 stream: String, 164 bind: SocketAddr, 165 metrics_bind: SocketAddr, 166 + collect_metrics: bool, 167 stay_alive: CancellationToken, 168 ) -> Result<()> { 169 ctrlc::set_handler({ ··· 208 .build() 209 .expect("axum startup") 210 .block_on(async { 211 + // Install metrics server only if requested 212 + if collect_metrics { 213 + install_metrics_server(metrics_bind)?; 214 + } 215 serve(readable, bind, staying_alive).await 216 }) 217 .unwrap(); ··· 219 } 220 }); 221 222 + // only spawn monitoring thread if the metrics server is running 223 + if collect_metrics { 224 + s.spawn(move || { // monitor thread 225 let stay_alive = stay_alive.clone(); 226 let check_alive = stay_alive.clone(); 227 ··· 273 } 274 } 275 stay_alive.drop_guard(); 276 + }); 277 + } 278 }); 279 280 println!("byeeee");
+13 -6
constellation/src/consumer/jetstream.rs
··· 226 println!("jetstream closed the websocket cleanly."); 227 break; 228 } 229 - r => eprintln!("jetstream: close result after error: {r:?}"), 230 } 231 - counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 232 - .increment(1); 233 - // if we didn't immediately get ConnectionClosed, we should keep polling read 234 - // until we get it. 235 - continue; 236 } 237 }; 238
··· 226 println!("jetstream closed the websocket cleanly."); 227 break; 228 } 229 + Err(_) => { 230 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1); 231 + println!("jetstream failed to close the websocket cleanly."); 232 + break; 233 + } 234 + Ok(r) => { 235 + eprintln!("jetstream: close result after error: {r:?}"); 236 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 237 + .increment(1); 238 + // if we didn't immediately get ConnectionClosed, we should keep polling read 239 + // until we get it. 240 + continue; 241 + } 242 } 243 } 244 }; 245
+62 -5
constellation/src/server/mod.rs
··· 17 use tokio::task::spawn_blocking; 18 use tokio_util::sync::CancellationToken; 19 20 - use crate::storage::{LinkReader, StorageStats}; 21 use crate::{CountsByCount, Did, RecordId}; 22 23 mod acceptable; ··· 25 26 use acceptable::{acceptable, ExtractAccept}; 27 28 - const DEFAULT_CURSOR_LIMIT: u64 = 16; 29 - const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100; 30 31 fn get_default_cursor_limit() -> u64 { 32 DEFAULT_CURSOR_LIMIT ··· 66 } 67 }), 68 ) 69 .route( 70 "/links/count", 71 get({ 72 let store = store.clone(); 73 move |accept, query| async { 74 spawn_blocking(|| count_links(accept, query, store)) 75 .await 76 .map_err(to500)? 77 } ··· 343 #[serde(skip_serializing)] 344 query: GetLinksCountQuery, 345 } 346 fn count_links( 347 accept: ExtractAccept, 348 query: Query<GetLinksCountQuery>, ··· 361 } 362 363 #[derive(Clone, Deserialize)] 364 struct GetDidsCountQuery { 365 target: String, 366 collection: String, ··· 409 /// Set the max number of links to return per page of results 410 #[serde(default = "get_default_cursor_limit")] 411 limit: u64, 412 - // TODO: allow reverse (er, forward) order as well 413 } 414 #[derive(Template, Serialize)] 415 #[template(path = "get-backlinks.html.j2")] ··· 455 }; 456 let path = format!(".{path}"); 457 458 let paged = store 459 .get_links( 460 &query.subject, 461 collection, 462 &path, 463 limit, 464 until, 465 &filter_dids, ··· 508 from_dids: Option<String>, // comma separated: gross 509 #[serde(default = "get_default_cursor_limit")] 510 limit: u64, 511 - // TODO: allow reverse (er, forward) order as well 512 } 513 #[derive(Template, Serialize)] 514 #[template(path = "links.html.j2")] ··· 562 &query.target, 563 &query.collection, 564 &query.path, 565 limit, 566 until, 567 &filter_dids, ··· 659 #[serde(skip_serializing)] 660 query: GetAllLinksQuery, 661 } 662 fn count_all_links( 663 accept: ExtractAccept, 664 query: Query<GetAllLinksQuery>,
··· 17 use tokio::task::spawn_blocking; 18 use tokio_util::sync::CancellationToken; 19 20 + use crate::storage::{LinkReader, Order, StorageStats}; 21 use crate::{CountsByCount, Did, RecordId}; 22 23 mod acceptable; ··· 25 26 use acceptable::{acceptable, ExtractAccept}; 27 28 + const DEFAULT_CURSOR_LIMIT: u64 = 100; 29 + const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000; 30 31 fn get_default_cursor_limit() -> u64 { 32 DEFAULT_CURSOR_LIMIT ··· 66 } 67 }), 68 ) 69 + // deprecated 70 .route( 71 "/links/count", 72 get({ 73 let store = store.clone(); 74 move |accept, query| async { 75 spawn_blocking(|| count_links(accept, query, store)) 76 + .await 77 + .map_err(to500)? 78 + } 79 + }), 80 + ) 81 + .route( 82 + "/xrpc/blue.microcosm.links.getBacklinksCount", 83 + get({ 84 + let store = store.clone(); 85 + move |accept, query| async { 86 + spawn_blocking(|| get_backlink_counts(accept, query, store)) 87 .await 88 .map_err(to500)? 89 } ··· 355 #[serde(skip_serializing)] 356 query: GetLinksCountQuery, 357 } 358 + #[deprecated] 359 fn count_links( 360 accept: ExtractAccept, 361 query: Query<GetLinksCountQuery>, ··· 374 } 375 376 #[derive(Clone, Deserialize)] 377 + struct GetItemsCountQuery { 378 + subject: String, 379 + source: String, 380 + } 381 + #[derive(Template, Serialize)] 382 + #[template(path = "get-backlinks-count.html.j2")] 383 + struct GetItemsCountResponse { 384 + total: u64, 385 + #[serde(skip_serializing)] 386 + query: GetItemsCountQuery, 387 + } 388 + fn get_backlink_counts( 389 + accept: ExtractAccept, 390 + query: axum_extra::extract::Query<GetItemsCountQuery>, 391 + store: impl LinkReader, 392 + ) -> Result<impl IntoResponse, http::StatusCode> { 393 + let Some((collection, path)) = query.source.split_once(':') else { 394 + return Err(http::StatusCode::BAD_REQUEST); 395 + }; 396 + let path = format!(".{path}"); 397 + let total = store 398 + .get_count(&query.subject, collection, &path) 399 + .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 400 + 401 + Ok(acceptable( 402 + accept, 403 + GetItemsCountResponse { 404 + total, 405 + query: (*query).clone(), 406 + }, 407 + )) 408 + } 409 + 410 + #[derive(Clone, Deserialize)] 411 struct GetDidsCountQuery { 412 target: String, 413 collection: String, ··· 456 /// Set the max number of links to return per page of results 457 #[serde(default = "get_default_cursor_limit")] 458 limit: u64, 459 + /// Allow returning links in reverse order (default: false) 460 + #[serde(default)] 461 + reverse: bool, 462 } 463 #[derive(Template, Serialize)] 464 #[template(path = "get-backlinks.html.j2")] ··· 504 }; 505 let path = format!(".{path}"); 506 507 + let order = if query.reverse { 508 + Order::OldestToNewest 509 + } else { 510 + Order::NewestToOldest 511 + }; 512 + 513 let paged = store 514 .get_links( 515 &query.subject, 516 collection, 517 &path, 518 + order, 519 limit, 520 until, 521 &filter_dids, ··· 564 from_dids: Option<String>, // comma separated: gross 565 #[serde(default = "get_default_cursor_limit")] 566 limit: u64, 567 } 568 #[derive(Template, Serialize)] 569 #[template(path = "links.html.j2")] ··· 617 &query.target, 618 &query.collection, 619 &query.path, 620 + Order::NewestToOldest, 621 limit, 622 until, 623 &filter_dids, ··· 715 #[serde(skip_serializing)] 716 query: GetAllLinksQuery, 717 } 718 + #[deprecated] 719 fn count_all_links( 720 accept: ExtractAccept, 721 query: Query<GetAllLinksQuery>,
+55 -53
constellation/src/storage/mem_store.rs
··· 1 use super::{ 2 - LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 use anyhow::Result; ··· 147 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 148 let data = self.0.lock().unwrap(); 149 let Some(paths) = data.targets.get(&Target::new(target)) else { 150 - return Ok(PagedOrderedCollection::default()); 151 }; 152 let Some(linkers) = paths.get(&Source::new(collection, path)) else { 153 - return Ok(PagedOrderedCollection::default()); 154 }; 155 156 let path_to_other = RecordPath::new(path_to_other); ··· 239 target: &str, 240 collection: &str, 241 path: &str, 242 limit: u64, 243 - until: Option<u64>, 244 filter_dids: &HashSet<Did>, 245 ) -> Result<PagedAppendingCollection<RecordId>> { 246 let data = self.0.lock().unwrap(); 247 let Some(paths) = data.targets.get(&Target::new(target)) else { 248 - return Ok(PagedAppendingCollection { 249 - version: (0, 0), 250 - items: Vec::new(), 251 - next: None, 252 - total: 0, 253 - }); 254 }; 255 let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 256 - return Ok(PagedAppendingCollection { 257 - version: (0, 0), 258 - items: Vec::new(), 259 - next: None, 260 - total: 0, 261 - }); 262 }; 263 264 let did_rkeys: Vec<_> = if !filter_dids.is_empty() { ··· 275 did_rkeys.to_vec() 276 }; 277 278 - let total = did_rkeys.len(); 279 - let end = until 280 - .map(|u| std::cmp::min(u as usize, total)) 281 - .unwrap_or(total); 282 - let begin = end.saturating_sub(limit as usize); 283 - let next = if begin == 0 { None } else { Some(begin as u64) }; 284 285 - let alive = did_rkeys.iter().flatten().count(); 286 let gone = total - alive; 287 288 - let items: Vec<_> = did_rkeys[begin..end] 289 .iter() 290 - .rev() 291 .flatten() 292 .filter(|(did, _)| *data.dids.get(did).expect("did must be in dids")) 293 .map(|(did, rkey)| RecordId { 294 did: did.clone(), 295 rkey: rkey.0.clone(), 296 collection: collection.to_string(), 297 - }) 298 - .collect(); 299 300 Ok(PagedAppendingCollection { 301 - version: (total as u64, gone as u64), 302 items, 303 - next, 304 - total: alive as u64, 305 }) 306 } 307 ··· 315 ) -> Result<PagedAppendingCollection<Did>> { 316 let data = self.0.lock().unwrap(); 317 let Some(paths) = data.targets.get(&Target::new(target)) else { 318 - return Ok(PagedAppendingCollection { 319 - version: (0, 0), 320 - items: Vec::new(), 321 - next: None, 322 - total: 0, 323 - }); 324 }; 325 let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 326 - return Ok(PagedAppendingCollection { 327 - version: (0, 0), 328 - items: Vec::new(), 329 - next: None, 330 - total: 0, 331 - }); 332 }; 333 334 let dids: Vec<Option<Did>> = { ··· 348 .collect() 349 }; 350 351 - let total = dids.len(); 352 - let end = until 353 - .map(|u| std::cmp::min(u as usize, total)) 354 - .unwrap_or(total); 355 - let begin = end.saturating_sub(limit as usize); 356 - let next = if begin == 0 { None } else { Some(begin as u64) }; 357 358 - let alive = dids.iter().flatten().count(); 359 let gone = total - alive; 360 361 - let items: Vec<Did> = dids[begin..end] 362 .iter() 363 .rev() 364 .flatten() 365 .filter(|did| *data.dids.get(did).expect("did must be in dids")) ··· 367 .collect(); 368 369 Ok(PagedAppendingCollection { 370 - version: (total as u64, gone as u64), 371 items, 372 - next, 373 - total: alive as u64, 374 }) 375 } 376
··· 1 use super::{ 2 + LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 use anyhow::Result; ··· 147 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 148 let data = self.0.lock().unwrap(); 149 let Some(paths) = data.targets.get(&Target::new(target)) else { 150 + return Ok(PagedOrderedCollection::empty()); 151 }; 152 let Some(linkers) = paths.get(&Source::new(collection, path)) else { 153 + return Ok(PagedOrderedCollection::empty()); 154 }; 155 156 let path_to_other = RecordPath::new(path_to_other); ··· 239 target: &str, 240 collection: &str, 241 path: &str, 242 + order: Order, 243 limit: u64, 244 + until: Option<u64>, // paged iteration endpoint 245 filter_dids: &HashSet<Did>, 246 ) -> Result<PagedAppendingCollection<RecordId>> { 247 let data = self.0.lock().unwrap(); 248 let Some(paths) = data.targets.get(&Target::new(target)) else { 249 + return Ok(PagedAppendingCollection::empty()); 250 }; 251 let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 252 + return Ok(PagedAppendingCollection::empty()); 253 }; 254 255 let did_rkeys: Vec<_> = if !filter_dids.is_empty() { ··· 266 did_rkeys.to_vec() 267 }; 268 269 + let total = did_rkeys.len() as u64; 270 271 + // backlinks are stored oldest-to-newest (ascending index with increasing age) 272 + let (start, take, next_until) = match order { 273 + Order::OldestToNewest => { 274 + let start = until.unwrap_or(0); 275 + let next = start + limit + 1; 276 + let next_until = if next < total { Some(next) } else { None }; 277 + (start, limit, next_until) 278 + } 279 + Order::NewestToOldest => { 280 + let until = until.unwrap_or(total); 281 + match until.checked_sub(limit) { 282 + Some(s) if s > 0 => (s, limit, Some(s)), 283 + Some(s) => (s, limit, None), 284 + None => (0, until, None), 285 + } 286 + } 287 + }; 288 + 289 + let alive = did_rkeys.iter().flatten().count() as u64; 290 let gone = total - alive; 291 292 + let items = did_rkeys 293 .iter() 294 + .skip(start as usize) 295 + .take(take as usize) 296 .flatten() 297 .filter(|(did, _)| *data.dids.get(did).expect("did must be in dids")) 298 .map(|(did, rkey)| RecordId { 299 did: did.clone(), 300 rkey: rkey.0.clone(), 301 collection: collection.to_string(), 302 + }); 303 + 304 + let items: Vec<_> = match order { 305 + Order::OldestToNewest => items.collect(), // links are stored oldest first 306 + Order::NewestToOldest => items.rev().collect(), 307 + }; 308 309 Ok(PagedAppendingCollection { 310 + version: (total, gone), 311 items, 312 + next: next_until, 313 + total: alive, 314 }) 315 } 316 ··· 324 ) -> Result<PagedAppendingCollection<Did>> { 325 let data = self.0.lock().unwrap(); 326 let Some(paths) = data.targets.get(&Target::new(target)) else { 327 + return Ok(PagedAppendingCollection::empty()); 328 }; 329 let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 330 + return Ok(PagedAppendingCollection::empty()); 331 }; 332 333 let dids: Vec<Option<Did>> = { ··· 347 .collect() 348 }; 349 350 + let total = dids.len() as u64; 351 + let until = until.unwrap_or(total); 352 + let (start, take, next_until) = match until.checked_sub(limit) { 353 + Some(s) if s > 0 => (s, limit, Some(s)), 354 + Some(s) => (s, limit, None), 355 + None => (0, until, None), 356 + }; 357 358 + let alive = dids.iter().flatten().count() as u64; 359 let gone = total - alive; 360 361 + let items: Vec<Did> = dids 362 .iter() 363 + .skip(start as usize) 364 + .take(take as usize) 365 .rev() 366 .flatten() 367 .filter(|did| *data.dids.get(did).expect("did must be in dids")) ··· 369 .collect(); 370 371 Ok(PagedAppendingCollection { 372 + version: (total, gone), 373 items, 374 + next: next_until, 375 + total: alive, 376 }) 377 } 378
+195 -76
constellation/src/storage/mod.rs
··· 11 #[cfg(feature = "rocks")] 12 pub use rocks_store::RocksStorage; 13 14 - #[derive(Debug, PartialEq)] 15 pub struct PagedAppendingCollection<T> { 16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 17 pub items: Vec<T>, ··· 19 pub total: u64, 20 } 21 22 /// A paged collection whose keys are sorted instead of indexed 23 /// 24 /// this has weaker guarantees than PagedAppendingCollection: it might 25 /// return a totally consistent snapshot. but it should avoid duplicates 26 /// and each page should at least be internally consistent. 27 - #[derive(Debug, PartialEq, Default)] 28 pub struct PagedOrderedCollection<T, K: Ord> { 29 pub items: Vec<T>, 30 pub next: Option<K>, 31 } 32 33 #[derive(Debug, Deserialize, Serialize, PartialEq)] 34 pub struct StorageStats { 35 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. ··· 82 83 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 84 85 fn get_links( 86 &self, 87 target: &str, 88 collection: &str, 89 path: &str, 90 limit: u64, 91 until: Option<u64>, 92 filter_dids: &HashSet<Did>, ··· 180 "a.com", 181 "app.t.c", 182 ".abc.uri", 183 100, 184 None, 185 &HashSet::default() 186 )?, 187 - PagedAppendingCollection { 188 - version: (0, 0), 189 - items: vec![], 190 - next: None, 191 - total: 0, 192 - } 193 ); 194 assert_eq!( 195 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 196 - PagedAppendingCollection { 197 - version: (0, 0), 198 - items: vec![], 199 - next: None, 200 - total: 0, 201 - } 202 ); 203 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 204 assert_eq!( ··· 683 "a.com", 684 "app.t.c", 685 ".abc.uri", 686 100, 687 None, 688 &HashSet::default() ··· 727 0, 728 )?; 729 } 730 - let links = 731 - storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 732 - let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?; 733 assert_eq!( 734 links, 735 PagedAppendingCollection { ··· 737 items: vec![ 738 RecordId { 739 did: "did:plc:asdf-5".into(), 740 - collection: "app.t.c".into(), 741 rkey: "asdf".into(), 742 }, 743 RecordId { 744 did: "did:plc:asdf-4".into(), 745 - collection: "app.t.c".into(), 746 rkey: "asdf".into(), 747 }, 748 ], ··· 750 total: 5, 751 } 752 ); 753 assert_eq!( 754 dids, 755 PagedAppendingCollection { ··· 759 total: 5, 760 } 761 ); 762 - let links = storage.get_links( 763 - "a.com", 764 - "app.t.c", 765 - ".abc.uri", 766 - 2, 767 - links.next, 768 - &HashSet::default(), 769 - )?; 770 - let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 771 assert_eq!( 772 links, 773 PagedAppendingCollection { ··· 775 items: vec![ 776 RecordId { 777 did: "did:plc:asdf-3".into(), 778 - collection: "app.t.c".into(), 779 rkey: "asdf".into(), 780 }, 781 RecordId { 782 did: "did:plc:asdf-2".into(), 783 - collection: "app.t.c".into(), 784 rkey: "asdf".into(), 785 }, 786 ], ··· 788 total: 5, 789 } 790 ); 791 assert_eq!( 792 dids, 793 PagedAppendingCollection { ··· 797 total: 5, 798 } 799 ); 800 - let links = storage.get_links( 801 - "a.com", 802 - "app.t.c", 803 - ".abc.uri", 804 - 2, 805 - links.next, 806 - &HashSet::default(), 807 - )?; 808 - let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 809 assert_eq!( 810 links, 811 PagedAppendingCollection { 812 version: (5, 0), 813 items: vec![RecordId { 814 did: "did:plc:asdf-1".into(), 815 - collection: "app.t.c".into(), 816 rkey: "asdf".into(), 817 },], 818 next: None, 819 total: 5, 820 } 821 ); 822 assert_eq!( 823 dids, 824 PagedAppendingCollection { ··· 828 total: 5, 829 } 830 ); 831 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 832 }); 833 834 - test_each_storage!(get_filtered_links, |storage| { 835 let links = storage.get_links( 836 "a.com", 837 "app.t.c", 838 ".abc.uri", 839 2, 840 None, 841 - &HashSet::from([Did("did:plc:linker".to_string())]), 842 )?; 843 assert_eq!( 844 links, 845 PagedAppendingCollection { 846 - version: (0, 0), 847 - items: vec![], 848 - next: None, 849 - total: 0, 850 } 851 ); 852 853 storage.push( 854 &ActionableEvent::CreateLinks { ··· 869 "a.com", 870 "app.t.c", 871 ".abc.uri", 872 2, 873 None, 874 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 891 "a.com", 892 "app.t.c", 893 ".abc.uri", 894 2, 895 None, 896 &HashSet::from([Did("did:plc:someone-else".to_string())]), 897 )?; 898 - assert_eq!( 899 - links, 900 - PagedAppendingCollection { 901 - version: (0, 0), 902 - items: vec![], 903 - next: None, 904 - total: 0, 905 - } 906 - ); 907 908 storage.push( 909 &ActionableEvent::CreateLinks { ··· 938 "a.com", 939 "app.t.c", 940 ".abc.uri", 941 2, 942 None, 943 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 967 "a.com", 968 "app.t.c", 969 ".abc.uri", 970 2, 971 None, 972 &HashSet::from([ ··· 999 "a.com", 1000 "app.t.c", 1001 ".abc.uri", 1002 2, 1003 None, 1004 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), 1005 )?; 1006 - assert_eq!( 1007 - links, 1008 - PagedAppendingCollection { 1009 - version: (0, 0), 1010 - items: vec![], 1011 - next: None, 1012 - total: 0, 1013 - } 1014 - ); 1015 }); 1016 1017 test_each_storage!(get_links_exact_multiple, |storage| { ··· 1031 0, 1032 )?; 1033 } 1034 - let links = 1035 - storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1036 assert_eq!( 1037 links, 1038 PagedAppendingCollection { ··· 1057 "a.com", 1058 "app.t.c", 1059 ".abc.uri", 1060 2, 1061 links.next, 1062 &HashSet::default(), ··· 1101 0, 1102 )?; 1103 } 1104 - let links = 1105 - storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1106 assert_eq!( 1107 links, 1108 PagedAppendingCollection { ··· 1141 "a.com", 1142 "app.t.c", 1143 ".abc.uri", 1144 2, 1145 links.next, 1146 &HashSet::default(), ··· 1185 0, 1186 )?; 1187 } 1188 - let links = 1189 - storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1190 assert_eq!( 1191 links, 1192 PagedAppendingCollection { ··· 1219 "a.com", 1220 "app.t.c", 1221 ".abc.uri", 1222 2, 1223 links.next, 1224 &HashSet::default(), ··· 1256 0, 1257 )?; 1258 } 1259 - let links = 1260 - storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1261 assert_eq!( 1262 links, 1263 PagedAppendingCollection { ··· 1286 "a.com", 1287 "app.t.c", 1288 ".abc.uri", 1289 2, 1290 links.next, 1291 &HashSet::default(), ··· 1372 &HashSet::new(), 1373 &HashSet::new(), 1374 )?, 1375 - PagedOrderedCollection { 1376 - items: vec![], 1377 - next: None, 1378 - } 1379 ); 1380 }); 1381
··· 11 #[cfg(feature = "rocks")] 12 pub use rocks_store::RocksStorage; 13 14 + /// Ordering for paginated link queries 15 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 16 + pub enum Order { 17 + /// Newest links first (default) 18 + NewestToOldest, 19 + /// Oldest links first 20 + OldestToNewest, 21 + } 22 + 23 + #[derive(Debug, Default, PartialEq)] 24 pub struct PagedAppendingCollection<T> { 25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 26 pub items: Vec<T>, ··· 28 pub total: u64, 29 } 30 31 + impl<T> PagedAppendingCollection<T> { 32 + pub(crate) fn empty() -> Self { 33 + Self { 34 + version: (0, 0), 35 + items: Vec::new(), 36 + next: None, 37 + total: 0, 38 + } 39 + } 40 + } 41 + 42 /// A paged collection whose keys are sorted instead of indexed 43 /// 44 /// this has weaker guarantees than PagedAppendingCollection: it might 45 /// return a totally consistent snapshot. but it should avoid duplicates 46 /// and each page should at least be internally consistent. 47 + #[derive(Debug, PartialEq)] 48 pub struct PagedOrderedCollection<T, K: Ord> { 49 pub items: Vec<T>, 50 pub next: Option<K>, 51 } 52 53 + impl<T, K: Ord> PagedOrderedCollection<T, K> { 54 + pub(crate) fn empty() -> Self { 55 + Self { 56 + items: Vec::new(), 57 + next: None, 58 + } 59 + } 60 + } 61 + 62 #[derive(Debug, Deserialize, Serialize, PartialEq)] 63 pub struct StorageStats { 64 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. ··· 111 112 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 113 114 + #[allow(clippy::too_many_arguments)] 115 fn get_links( 116 &self, 117 target: &str, 118 collection: &str, 119 path: &str, 120 + order: Order, 121 limit: u64, 122 until: Option<u64>, 123 filter_dids: &HashSet<Did>, ··· 211 "a.com", 212 "app.t.c", 213 ".abc.uri", 214 + Order::NewestToOldest, 215 100, 216 None, 217 &HashSet::default() 218 )?, 219 + PagedAppendingCollection::empty() 220 ); 221 assert_eq!( 222 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 223 + PagedAppendingCollection::empty() 224 ); 225 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 226 assert_eq!( ··· 705 "a.com", 706 "app.t.c", 707 ".abc.uri", 708 + Order::NewestToOldest, 709 100, 710 None, 711 &HashSet::default() ··· 750 0, 751 )?; 752 } 753 + 754 + let sub = "a.com"; 755 + let col = "app.t.c"; 756 + let path = ".abc.uri"; 757 + let order = Order::NewestToOldest; 758 + let dids_filter = HashSet::new(); 759 + 760 + // --- --- round one! --- --- // 761 + // all backlinks 762 + let links = storage.get_links(sub, col, path, order, 2, None, &dids_filter)?; 763 assert_eq!( 764 links, 765 PagedAppendingCollection { ··· 767 items: vec![ 768 RecordId { 769 did: "did:plc:asdf-5".into(), 770 + collection: col.into(), 771 rkey: "asdf".into(), 772 }, 773 RecordId { 774 did: "did:plc:asdf-4".into(), 775 + collection: col.into(), 776 rkey: "asdf".into(), 777 }, 778 ], ··· 780 total: 5, 781 } 782 ); 783 + // distinct dids 784 + let dids = storage.get_distinct_dids(sub, col, path, 2, None)?; 785 assert_eq!( 786 dids, 787 PagedAppendingCollection { ··· 791 total: 5, 792 } 793 ); 794 + 795 + // --- --- round two! --- --- // 796 + // all backlinks 797 + let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 798 assert_eq!( 799 links, 800 PagedAppendingCollection { ··· 802 items: vec![ 803 RecordId { 804 did: "did:plc:asdf-3".into(), 805 + collection: col.into(), 806 rkey: "asdf".into(), 807 }, 808 RecordId { 809 did: "did:plc:asdf-2".into(), 810 + collection: col.into(), 811 rkey: "asdf".into(), 812 }, 813 ], ··· 815 total: 5, 816 } 817 ); 818 + // distinct dids 819 + let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 820 assert_eq!( 821 dids, 822 PagedAppendingCollection { ··· 826 total: 5, 827 } 828 ); 829 + 830 + // --- --- round three! --- --- // 831 + // all backlinks 832 + let links = storage.get_links(sub, col, path, order, 2, links.next, &dids_filter)?; 833 assert_eq!( 834 links, 835 PagedAppendingCollection { 836 version: (5, 0), 837 items: vec![RecordId { 838 did: "did:plc:asdf-1".into(), 839 + collection: col.into(), 840 rkey: "asdf".into(), 841 },], 842 next: None, 843 total: 5, 844 } 845 ); 846 + // distinct dids 847 + let dids = storage.get_distinct_dids(sub, col, path, 2, dids.next)?; 848 assert_eq!( 849 dids, 850 PagedAppendingCollection { ··· 854 total: 5, 855 } 856 ); 857 + 858 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 859 }); 860 861 + test_each_storage!(get_links_reverse_order, |storage| { 862 + for i in 1..=5 { 863 + storage.push( 864 + &ActionableEvent::CreateLinks { 865 + record_id: RecordId { 866 + did: format!("did:plc:asdf-{i}").into(), 867 + collection: "app.t.c".into(), 868 + rkey: "asdf".into(), 869 + }, 870 + links: vec![CollectedLink { 871 + target: Link::Uri("a.com".into()), 872 + path: ".abc.uri".into(), 873 + }], 874 + }, 875 + 0, 876 + )?; 877 + } 878 + 879 + // Test OldestToNewest order (oldest first) 880 + let links = storage.get_links( 881 + "a.com", 882 + "app.t.c", 883 + ".abc.uri", 884 + Order::OldestToNewest, 885 + 2, 886 + None, 887 + &HashSet::default(), 888 + )?; 889 + assert_eq!( 890 + links, 891 + PagedAppendingCollection { 892 + version: (5, 0), 893 + items: vec![ 894 + RecordId { 895 + did: "did:plc:asdf-1".into(), 896 + collection: "app.t.c".into(), 897 + rkey: "asdf".into(), 898 + }, 899 + RecordId { 900 + did: "did:plc:asdf-2".into(), 901 + collection: "app.t.c".into(), 902 + rkey: "asdf".into(), 903 + }, 904 + ], 905 + next: Some(3), 906 + total: 5, 907 + } 908 + ); 909 + // Test NewestToOldest order (newest first) 910 let links = storage.get_links( 911 "a.com", 912 "app.t.c", 913 ".abc.uri", 914 + Order::NewestToOldest, 915 2, 916 None, 917 + &HashSet::default(), 918 )?; 919 assert_eq!( 920 links, 921 PagedAppendingCollection { 922 + version: (5, 0), 923 + items: vec![ 924 + RecordId { 925 + did: "did:plc:asdf-5".into(), 926 + collection: "app.t.c".into(), 927 + rkey: "asdf".into(), 928 + }, 929 + RecordId { 930 + did: "did:plc:asdf-4".into(), 931 + collection: "app.t.c".into(), 932 + rkey: "asdf".into(), 933 + }, 934 + ], 935 + next: Some(3), 936 + total: 5, 937 } 938 ); 939 + assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 940 + }); 941 + 942 + test_each_storage!(get_filtered_links, |storage| { 943 + let links = storage.get_links( 944 + "a.com", 945 + "app.t.c", 946 + ".abc.uri", 947 + Order::NewestToOldest, 948 + 2, 949 + None, 950 + &HashSet::from([Did("did:plc:linker".to_string())]), 951 + )?; 952 + assert_eq!(links, PagedAppendingCollection::empty()); 953 954 storage.push( 955 &ActionableEvent::CreateLinks { ··· 970 "a.com", 971 "app.t.c", 972 ".abc.uri", 973 + Order::NewestToOldest, 974 2, 975 None, 976 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 993 "a.com", 994 "app.t.c", 995 ".abc.uri", 996 + Order::NewestToOldest, 997 2, 998 None, 999 &HashSet::from([Did("did:plc:someone-else".to_string())]), 1000 )?; 1001 + assert_eq!(links, PagedAppendingCollection::empty()); 1002 1003 storage.push( 1004 &ActionableEvent::CreateLinks { ··· 1033 "a.com", 1034 "app.t.c", 1035 ".abc.uri", 1036 + Order::NewestToOldest, 1037 2, 1038 None, 1039 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 1063 "a.com", 1064 "app.t.c", 1065 ".abc.uri", 1066 + Order::NewestToOldest, 1067 2, 1068 None, 1069 &HashSet::from([ ··· 1096 "a.com", 1097 "app.t.c", 1098 ".abc.uri", 1099 + Order::NewestToOldest, 1100 2, 1101 None, 1102 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), 1103 )?; 1104 + assert_eq!(links, PagedAppendingCollection::empty()); 1105 }); 1106 1107 test_each_storage!(get_links_exact_multiple, |storage| { ··· 1121 0, 1122 )?; 1123 } 1124 + let links = storage.get_links( 1125 + "a.com", 1126 + "app.t.c", 1127 + ".abc.uri", 1128 + Order::NewestToOldest, 1129 + 2, 1130 + None, 1131 + &HashSet::default(), 1132 + )?; 1133 assert_eq!( 1134 links, 1135 PagedAppendingCollection { ··· 1154 "a.com", 1155 "app.t.c", 1156 ".abc.uri", 1157 + Order::NewestToOldest, 1158 2, 1159 links.next, 1160 &HashSet::default(), ··· 1199 0, 1200 )?; 1201 } 1202 + let links = storage.get_links( 1203 + "a.com", 1204 + "app.t.c", 1205 + ".abc.uri", 1206 + Order::NewestToOldest, 1207 + 2, 1208 + None, 1209 + &HashSet::default(), 1210 + )?; 1211 assert_eq!( 1212 links, 1213 PagedAppendingCollection { ··· 1246 "a.com", 1247 "app.t.c", 1248 ".abc.uri", 1249 + Order::NewestToOldest, 1250 2, 1251 links.next, 1252 &HashSet::default(), ··· 1291 0, 1292 )?; 1293 } 1294 + let links = storage.get_links( 1295 + "a.com", 1296 + "app.t.c", 1297 + ".abc.uri", 1298 + Order::NewestToOldest, 1299 + 2, 1300 + None, 1301 + &HashSet::default(), 1302 + )?; 1303 assert_eq!( 1304 links, 1305 PagedAppendingCollection { ··· 1332 "a.com", 1333 "app.t.c", 1334 ".abc.uri", 1335 + Order::NewestToOldest, 1336 2, 1337 links.next, 1338 &HashSet::default(), ··· 1370 0, 1371 )?; 1372 } 1373 + let links = storage.get_links( 1374 + "a.com", 1375 + "app.t.c", 1376 + ".abc.uri", 1377 + Order::NewestToOldest, 1378 + 2, 1379 + None, 1380 + &HashSet::default(), 1381 + )?; 1382 assert_eq!( 1383 links, 1384 PagedAppendingCollection { ··· 1407 "a.com", 1408 "app.t.c", 1409 ".abc.uri", 1410 + Order::NewestToOldest, 1411 2, 1412 links.next, 1413 &HashSet::default(), ··· 1494 &HashSet::new(), 1495 &HashSet::new(), 1496 )?, 1497 + PagedOrderedCollection::empty() 1498 ); 1499 }); 1500
+41 -25
constellation/src/storage/rocks_store.rs
··· 1 use super::{ 2 - ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, 3 - StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 use anyhow::{bail, Result}; ··· 960 961 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 962 eprintln!("nothin doin for this target, {target_key:?}"); 963 - return Ok(Default::default()); 964 }; 965 966 let filter_did_ids: HashMap<DidId, bool> = filter_dids ··· 1127 target: &str, 1128 collection: &str, 1129 path: &str, 1130 limit: u64, 1131 until: Option<u64>, 1132 filter_dids: &HashSet<Did>, ··· 1138 ); 1139 1140 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1141 - return Ok(PagedAppendingCollection { 1142 - version: (0, 0), 1143 - items: Vec::new(), 1144 - next: None, 1145 - total: 0, 1146 - }); 1147 }; 1148 1149 let mut linkers = self.get_target_linkers(&target_id)?; ··· 1167 1168 let (alive, gone) = linkers.count(); 1169 let total = alive + gone; 1170 - let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1171 - let begin = end.saturating_sub(limit as usize); 1172 - let next = if begin == 0 { None } else { Some(begin as u64) }; 1173 1174 - let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1175 1176 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1177 // TODO: use get-many (or multi-get or whatever it's called) ··· 1201 Ok(PagedAppendingCollection { 1202 version: (total, gone), 1203 items, 1204 - next, 1205 total: alive, 1206 }) 1207 } ··· 1221 ); 1222 1223 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1224 - return Ok(PagedAppendingCollection { 1225 - version: (0, 0), 1226 - items: Vec::new(), 1227 - next: None, 1228 - total: 0, 1229 - }); 1230 }; 1231 1232 let linkers = self.get_distinct_target_linkers(&target_id)?; 1233 1234 let (alive, gone) = linkers.count(); 1235 let total = alive + gone; 1236 - let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1237 - let begin = end.saturating_sub(limit as usize); 1238 - let next = if begin == 0 { None } else { Some(begin as u64) }; 1239 1240 - let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1241 1242 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1243 // TODO: use get-many (or multi-get or whatever it's called) ··· 1263 Ok(PagedAppendingCollection { 1264 version: (total, gone), 1265 items, 1266 - next, 1267 total: alive, 1268 }) 1269 }
··· 1 use super::{ 2 + ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 + PagedOrderedCollection, StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 use anyhow::{bail, Result}; ··· 960 961 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 962 eprintln!("nothin doin for this target, {target_key:?}"); 963 + return Ok(PagedOrderedCollection::empty()); 964 }; 965 966 let filter_did_ids: HashMap<DidId, bool> = filter_dids ··· 1127 target: &str, 1128 collection: &str, 1129 path: &str, 1130 + order: Order, 1131 limit: u64, 1132 until: Option<u64>, 1133 filter_dids: &HashSet<Did>, ··· 1139 ); 1140 1141 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1142 + return Ok(PagedAppendingCollection::empty()); 1143 }; 1144 1145 let mut linkers = self.get_target_linkers(&target_id)?; ··· 1163 1164 let (alive, gone) = linkers.count(); 1165 let total = alive + gone; 1166 1167 + let (start, take, next_until) = match order { 1168 + // OldestToNewest: start from the beginning, paginate forward 1169 + Order::OldestToNewest => { 1170 + let start = until.unwrap_or(0); 1171 + let next = start + limit + 1; 1172 + let next_until = if next < total { Some(next) } else { None }; 1173 + (start, limit, next_until) 1174 + } 1175 + // NewestToOldest: start from the end, paginate backward 1176 + Order::NewestToOldest => { 1177 + let until = until.unwrap_or(total); 1178 + match until.checked_sub(limit) { 1179 + Some(s) if s > 0 => (s, limit, Some(s)), 1180 + Some(s) => (s, limit, None), 1181 + None => (0, until, None), 1182 + } 1183 + } 1184 + }; 1185 + 1186 + let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize); 1187 + let did_id_rkeys: Vec<_> = match order { 1188 + Order::OldestToNewest => did_id_rkeys.collect(), 1189 + Order::NewestToOldest => did_id_rkeys.rev().collect(), 1190 + }; 1191 1192 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1193 // TODO: use get-many (or multi-get or whatever it's called) ··· 1217 Ok(PagedAppendingCollection { 1218 version: (total, gone), 1219 items, 1220 + next: next_until, 1221 total: alive, 1222 }) 1223 } ··· 1237 ); 1238 1239 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1240 + return Ok(PagedAppendingCollection::empty()); 1241 }; 1242 1243 let linkers = self.get_distinct_target_linkers(&target_id)?; 1244 1245 let (alive, gone) = linkers.count(); 1246 let total = alive + gone; 1247 + 1248 + let until = until.unwrap_or(total); 1249 + let (start, take, next_until) = match until.checked_sub(limit) { 1250 + Some(s) if s > 0 => (s, limit, Some(s)), 1251 + Some(s) => (s, limit, None), 1252 + None => (0, until, None), 1253 + }; 1254 1255 + let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize); 1256 + let did_id_rkeys: Vec<_> = did_id_rkeys.rev().collect(); 1257 1258 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1259 // TODO: use get-many (or multi-get or whatever it's called) ··· 1279 Ok(PagedAppendingCollection { 1280 version: (total, gone), 1281 items, 1282 + next: next_until, 1283 total: alive, 1284 }) 1285 }
+4
constellation/templates/base.html.j2
··· 40 padding: 0.5em 0.3em; 41 max-width: 100%; 42 } 43 .stat { 44 color: #f90; 45 font-size: 1.618rem;
··· 40 padding: 0.5em 0.3em; 41 max-width: 100%; 42 } 43 + pre.code input { 44 + margin: 0; 45 + padding: 0; 46 + } 47 .stat { 48 color: #f90; 49 font-size: 1.618rem;
+1 -1
constellation/templates/dids.html.j2
··· 27 {% for did in linking_dids %} 28 <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }} 29 -> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a> 30 - -> browse <a href="https://pdsls.dev/at://{{ did.0|urlencode }}">this DID record</a></pre> 31 {% endfor %} 32 33 {% if let Some(c) = cursor %}
··· 27 {% for did in linking_dids %} 28 <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }} 29 -> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a> 30 + -> browse <a href="https://pdsls.dev/at://{{ did.0 }}">this DID record</a></pre> 31 {% endfor %} 32 33 {% if let Some(c) = cursor %}
+2 -1
constellation/templates/get-backlinks.html.j2
··· 6 7 {% block content %} 8 9 - {% call try_it::get_backlinks(query.subject, query.source, query.did, query.limit) %} 10 11 <h2> 12 Links to <code>{{ query.subject }}</code> ··· 40 <input type="hidden" name="did" value="{{ did }}" /> 41 {% endfor %} 42 <input type="hidden" name="cursor" value={{ c|json|safe }} /> 43 <button type="submit">next page&hellip;</button> 44 </form> 45 {% else %}
··· 6 7 {% block content %} 8 9 + {% call try_it::get_backlinks(query.subject, query.source, query.did, query.limit, query.reverse) %} 10 11 <h2> 12 Links to <code>{{ query.subject }}</code> ··· 40 <input type="hidden" name="did" value="{{ did }}" /> 41 {% endfor %} 42 <input type="hidden" name="cursor" value={{ c|json|safe }} /> 43 + <input type="hidden" name="reverse" value="{{ query.reverse }}"> 44 <button type="submit">next page&hellip;</button> 45 </form> 46 {% else %}
+18 -2
constellation/templates/hello.html.j2
··· 49 <li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li> 50 <li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 51 <li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li> 52 </ul> 53 54 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 55 - {% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %} 56 57 58 <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3> ··· 96 <li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 97 <li><p><code>from_dids</code> [deprecated]: optional. Use <code>did</code> instead. Example: <code>from_dids=did:plc:vc7f4oafdgxsihk4cry2xpze,did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 98 <li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li> 99 </ul> 100 101 <p style="margin-bottom: 0"><strong>Try it:</strong></p> ··· 118 {% call try_it::dids("at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r", "app.bsky.feed.like", ".subject.uri") %} 119 120 121 - <h3 class="route"><code>GET /links/count</code></h3> 122 123 <p>The total number of links pointing at a given target.</p> 124 ··· 134 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 135 {% call try_it::links_count("did:plc:vc7f4oafdgxsihk4cry2xpze", "app.bsky.graph.block", ".subject") %} 136 137 138 <h3 class="route"><code>GET /links/count/distinct-dids</code></h3> 139
··· 49 <li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li> 50 <li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 51 <li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li> 52 + <li><p><code>reverse</code>: optional, return links in reverse order. Default: <code>false</code></p></li> 53 </ul> 54 55 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 56 + {% call 57 + try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16, false) %} 58 59 60 <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3> ··· 98 <li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 99 <li><p><code>from_dids</code> [deprecated]: optional. Use <code>did</code> instead. Example: <code>from_dids=did:plc:vc7f4oafdgxsihk4cry2xpze,did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 100 <li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li> 101 + <li><p><code>reverse</code>: optional, return links in reverse order. Default: <code>false</code></p></li> 102 </ul> 103 104 <p style="margin-bottom: 0"><strong>Try it:</strong></p> ··· 121 {% call try_it::dids("at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r", "app.bsky.feed.like", ".subject.uri") %} 122 123 124 + <h3 class="route deprecated"><code>[deprecated] GET /links/count</code></h3> 125 126 <p>The total number of links pointing at a given target.</p> 127 ··· 137 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 138 {% call try_it::links_count("did:plc:vc7f4oafdgxsihk4cry2xpze", "app.bsky.graph.block", ".subject") %} 139 140 + <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getBacklinksCount</code></h3> 141 + 142 + <p>The total number of links pointing at a given target.</p> 143 + 144 + <h4>Query parameters:</h4> 145 + 146 + <ul> 147 + <li><code>subject</code>: required, must url-encode. The target being linked to. Example: <code>did:plc:vc7f4oafdgxsihk4cry2xpze</code> or <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></li> 148 + <li><code>source</code>: required. Collection and path specification for the primary link. Example: <code>app.bsky.feed.like:subject.uri</code></li> 149 + </ul> 150 + 151 + <p style="margin-bottom: 0"><strong>Try it:</strong></p> 152 + {% call try_it::get_backlinks_count("did:plc:vc7f4oafdgxsihk4cry2xpze", "app.bsky.graph.block:subject") %} 153 154 <h3 class="route"><code>GET /links/count/distinct-dids</code></h3> 155
+11 -2
constellation/templates/try-it-macros.html.j2
··· 1 - {% macro get_backlinks(subject, source, dids, limit) %} 2 <form method="get" action="/xrpc/blue.microcosm.links.getBacklinks"> 3 <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks 4 ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." /> ··· 6 {%- for did in dids %}{% if !did.is_empty() %} 7 &did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %} 8 <span id="did-placeholder"></span> <button id="add-did">+ did filter</button> 9 - &limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre> 10 </form> 11 <script> 12 const addDidButton = document.getElementById('add-did'); ··· 102 </form> 103 {% endmacro %} 104 105 106 {% macro links_count(target, collection, path) %} 107 <form method="get" action="/links/count">
··· 1 + {% macro get_backlinks(subject, source, dids, limit, reverse) %} 2 <form method="get" action="/xrpc/blue.microcosm.links.getBacklinks"> 3 <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks 4 ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." /> ··· 6 {%- for did in dids %}{% if !did.is_empty() %} 7 &did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %} 8 <span id="did-placeholder"></span> <button id="add-did">+ did filter</button> 9 + &limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> 10 + &reverse= <input type="checkbox" name="reverse" value="true" {% if reverse %}checked{% endif %}> 11 + <button type="submit">get links</button></pre> 12 </form> 13 <script> 14 const addDidButton = document.getElementById('add-did'); ··· 104 </form> 105 {% endmacro %} 106 107 + {% macro get_backlinks_count(subject, source) %} 108 + <form method="get" action="/xrpc/blue.microcosm.links.getBacklinksCount"> 109 + <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinksCount 110 + ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="subject" /> 111 + &source= <input type="text" name="source" value="{{ source }}" placeholder="source" /> <button type="submit">get links count</button></pre> 112 + </form> 113 + {% endmacro %} 114 115 {% macro links_count(target, collection, path) %} 116 <form method="get" action="/links/count">
+95
lexicons/blue.microcosm/links/getBacklinks.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "blue.microcosm.links.getBacklinks", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "a list of records linking to any record, identity, or uri", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "subject", 12 + "source" 13 + ], 14 + "properties": { 15 + "subject": { 16 + "type": "string", 17 + "format": "uri", 18 + "description": "the target being linked to (at-uri, did, or uri)" 19 + }, 20 + "source": { 21 + "type": "string", 22 + "description": "collection and path specification (e.g., 'app.bsky.feed.like:subject.uri')" 23 + }, 24 + "did": { 25 + "type": "array", 26 + "description": "filter links to those from specific users", 27 + "items": { 28 + "type": "string", 29 + "format": "did" 30 + } 31 + }, 32 + "limit": { 33 + "type": "integer", 34 + "minimum": 1, 35 + "maximum": 100, 36 + "default": 16, 37 + "description": "number of results to return" 38 + } 39 + } 40 + }, 41 + "output": { 42 + "encoding": "application/json", 43 + "schema": { 44 + "type": "object", 45 + "required": [ 46 + "total", 47 + "records" 48 + ], 49 + "properties": { 50 + "total": { 51 + "type": "integer", 52 + "description": "total number of matching links" 53 + }, 54 + "records": { 55 + "type": "array", 56 + "items": { 57 + "type": "ref", 58 + "ref": "#linkRecord" 59 + } 60 + }, 61 + "cursor": { 62 + "type": "string", 63 + "description": "pagination cursor" 64 + } 65 + } 66 + } 67 + } 68 + }, 69 + "linkRecord": { 70 + "type": "object", 71 + "required": [ 72 + "did", 73 + "collection", 74 + "rkey" 75 + ], 76 + "properties": { 77 + "did": { 78 + "type": "string", 79 + "format": "did", 80 + "description": "the DID of the linking record's repository" 81 + }, 82 + "collection": { 83 + "type": "string", 84 + "format": "nsid", 85 + "description": "the collection of the linking record" 86 + }, 87 + "rkey": { 88 + "type": "string", 89 + "format": "record-key", 90 + "description": "the record key of the linking record" 91 + } 92 + } 93 + } 94 + } 95 + }
+38
lexicons/blue.microcosm/links/getBacklinksCount.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "blue.microcosm.links.getBacklinksCount", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "count records that link to another record", 8 + "parameters": { 9 + "type": "params", 10 + "required": ["subject", "source"], 11 + "properties": { 12 + "subject": { 13 + "type": "string", 14 + "format": "uri", 15 + "description": "the primary target being linked to (at-uri, did, or uri)" 16 + }, 17 + "source": { 18 + "type": "string", 19 + "description": "collection and path specification for the primary link" 20 + } 21 + } 22 + }, 23 + "output": { 24 + "encoding": "application/json", 25 + "schema": { 26 + "type": "object", 27 + "required": ["total"], 28 + "properties": { 29 + "total": { 30 + "type": "integer", 31 + "description": "total number of matching links" 32 + } 33 + } 34 + } 35 + } 36 + } 37 + } 38 + }
+99
lexicons/blue.microcosm/links/getManyToManyCounts.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "blue.microcosm.links.getManyToManyCounts", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "count many-to-many relationships with secondary link paths", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "subject", 12 + "source", 13 + "pathToOther" 14 + ], 15 + "properties": { 16 + "subject": { 17 + "type": "string", 18 + "format": "uri", 19 + "description": "the primary target being linked to (at-uri, did, or uri)" 20 + }, 21 + "source": { 22 + "type": "string", 23 + "description": "collection and path specification for the primary link" 24 + }, 25 + "pathToOther": { 26 + "type": "string", 27 + "description": "path to the secondary link in the many-to-many record (e.g., 'otherThing.uri')" 28 + }, 29 + "did": { 30 + "type": "array", 31 + "description": "filter links to those from specific users", 32 + "items": { 33 + "type": "string", 34 + "format": "did" 35 + } 36 + }, 37 + "otherSubject": { 38 + "type": "array", 39 + "description": "filter secondary links to specific subjects", 40 + "items": { 41 + "type": "string" 42 + } 43 + }, 44 + "limit": { 45 + "type": "integer", 46 + "minimum": 1, 47 + "maximum": 100, 48 + "default": 16, 49 + "description": "number of results to return" 50 + } 51 + } 52 + }, 53 + "output": { 54 + "encoding": "application/json", 55 + "schema": { 56 + "type": "object", 57 + "required": [ 58 + "counts_by_other_subject" 59 + ], 60 + "properties": { 61 + "counts_by_other_subject": { 62 + "type": "array", 63 + "items": { 64 + "type": "ref", 65 + "ref": "#countBySubject" 66 + } 67 + }, 68 + "cursor": { 69 + "type": "string", 70 + "description": "pagination cursor" 71 + } 72 + } 73 + } 74 + } 75 + }, 76 + "countBySubject": { 77 + "type": "object", 78 + "required": [ 79 + "subject", 80 + "total", 81 + "distinct" 82 + ], 83 + "properties": { 84 + "subject": { 85 + "type": "string", 86 + "description": "the secondary subject being counted" 87 + }, 88 + "total": { 89 + "type": "integer", 90 + "description": "total number of links to this subject" 91 + }, 92 + "distinct": { 93 + "type": "integer", 94 + "description": "number of distinct DIDs linking to this subject" 95 + } 96 + } 97 + } 98 + } 99 + }
+56
lexicons/com.bad-example/identity/resolveMiniDoc.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "com.bad-example.identity.resolveMiniDoc", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "like com.atproto.identity.resolveIdentity but instead of the full didDoc it returns an atproto-relevant subset", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "identifier" 12 + ], 13 + "properties": { 14 + "identifier": { 15 + "type": "string", 16 + "format": "at-identifier", 17 + "description": "handle or DID to resolve" 18 + } 19 + } 20 + }, 21 + "output": { 22 + "encoding": "application/json", 23 + "schema": { 24 + "type": "object", 25 + "required": [ 26 + "did", 27 + "handle", 28 + "pds", 29 + "signing_key" 30 + ], 31 + "properties": { 32 + "did": { 33 + "type": "string", 34 + "format": "did", 35 + "description": "DID, bi-directionally verified if a handle was provided in the query" 36 + }, 37 + "handle": { 38 + "type": "string", 39 + "format": "handle", 40 + "description": "the validated handle of the account or 'handle.invalid' if the handle did not bi-directionally match the DID document" 41 + }, 42 + "pds": { 43 + "type": "string", 44 + "format": "uri", 45 + "description": "the identity's PDS URL" 46 + }, 47 + "signing_key": { 48 + "type": "string", 49 + "description": "the atproto signing key publicKeyMultibase" 50 + } 51 + } 52 + } 53 + } 54 + } 55 + } 56 + }
+54
lexicons/com.bad-example/repo/getUriRecord.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "com.bad-example.repo.getUriRecord", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "ergonomic complement to com.atproto.repo.getRecord which accepts an at-uri instead of individual repo/collection/rkey params", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "at_uri" 12 + ], 13 + "properties": { 14 + "at_uri": { 15 + "type": "string", 16 + "format": "at-uri", 17 + "description": "the at-uri of the record (identifier can be a DID or handle)" 18 + }, 19 + "cid": { 20 + "type": "string", 21 + "format": "cid", 22 + "description": "optional CID of the version of the record. if not specified, return the most recent version. if specified and a newer version exists, returns 404." 23 + } 24 + } 25 + }, 26 + "output": { 27 + "encoding": "application/json", 28 + "schema": { 29 + "type": "object", 30 + "required": [ 31 + "uri", 32 + "value" 33 + ], 34 + "properties": { 35 + "uri": { 36 + "type": "string", 37 + "format": "at-uri", 38 + "description": "at-uri for this record" 39 + }, 40 + "cid": { 41 + "type": "string", 42 + "format": "cid", 43 + "description": "CID for this exact version of the record" 44 + }, 45 + "value": { 46 + "type": "unknown", 47 + "description": "the record itself" 48 + } 49 + } 50 + } 51 + } 52 + } 53 + } 54 + }
+1 -1
readme.md
··· 10 Tutorials, how-to guides, and client SDK libraries are all in the works for gentler on-ramps, but are not quite ready yet. But don't let that stop you! Hop in the [microcosm discord](https://discord.gg/tcDfe4PGVB), or post questions and tag [@bad-example.com](https://bsky.app/profile/bad-example.com) on Bluesky if you get stuck anywhere. 11 12 > [!tip] 13 - > This repository's primary home is moving to tangled: [@microcosm.blue/microcosm-rs](https://tangled.sh/@microcosm.blue/microcosm-rs). It will continue to be mirrored on [github](https://github.com/at-microcosm/microcosm-rs) for the forseeable future, and it's fine to open issues or pulls in either place! 14 15 16 🌌 [Constellation](./constellation/)
··· 10 Tutorials, how-to guides, and client SDK libraries are all in the works for gentler on-ramps, but are not quite ready yet. But don't let that stop you! Hop in the [microcosm discord](https://discord.gg/tcDfe4PGVB), or post questions and tag [@bad-example.com](https://bsky.app/profile/bad-example.com) on Bluesky if you get stuck anywhere. 11 12 > [!tip] 13 + > This repository's primary home is moving to tangled: [@microcosm.blue/microcosm-rs](https://tangled.org/microcosm.blue/microcosm-rs). It will continue to be mirrored on [github](https://github.com/at-microcosm/microcosm-rs) for the forseeable future, and it's fine to open issues or pulls in either place! 14 15 16 🌌 [Constellation](./constellation/)
+1 -1
slingshot/api-description.md
··· 90 - [🎇 Spacedust](https://spacedust.microcosm.blue/), a firehose of all social interactions 91 92 > [!success] 93 - > All microcosm projects are [open source](https://tangled.sh/@bad-example.com/microcosm-links). **You can help sustain Slingshot** and all of microcosm by becoming a [Github sponsor](https://github.com/sponsors/uniphil/) or a [Ko-fi supporter](https://ko-fi.com/bad_example)!
··· 90 - [🎇 Spacedust](https://spacedust.microcosm.blue/), a firehose of all social interactions 91 92 > [!success] 93 + > All microcosm projects are [open source](https://tangled.org/bad-example.com/microcosm-links). **You can help sustain Slingshot** and all of microcosm by becoming a [Github sponsor](https://github.com/sponsors/uniphil/) or a [Ko-fi supporter](https://ko-fi.com/bad_example)!
+2
spacedust/src/error.rs
··· 30 TooManySourcesWanted, 31 #[error("more wantedSubjectDids were requested than allowed (max 10,000)")] 32 TooManyDidsWanted, 33 #[error("more wantedSubjects were requested than allowed (max 50,000)")] 34 TooManySubjectsWanted, 35 }
··· 30 TooManySourcesWanted, 31 #[error("more wantedSubjectDids were requested than allowed (max 10,000)")] 32 TooManyDidsWanted, 33 + #[error("more wantedSubjectPrefixes were requested than allowed (max 100)")] 34 + TooManySubjectPrefixesWanted, 35 #[error("more wantedSubjects were requested than allowed (max 50,000)")] 36 TooManySubjectsWanted, 37 }
+11 -2
spacedust/src/server.rs
··· 228 #[serde(default)] 229 pub wanted_subjects: HashSet<String>, 230 #[serde(default)] 231 pub wanted_subject_dids: HashSet<String>, 232 #[serde(default)] 233 pub wanted_sources: HashSet<String>, ··· 242 /// 243 /// The at-uri must be url-encoded 244 /// 245 - /// Pass this parameter multiple times to specify multiple collections, like 246 /// `wantedSubjects=[...]&wantedSubjects=[...]` 247 pub wanted_subjects: String, 248 /// One or more DIDs to receive links about 249 /// 250 - /// Pass this parameter multiple times to specify multiple collections 251 pub wanted_subject_dids: String, 252 /// One or more link sources to receive links about 253 ///
··· 228 #[serde(default)] 229 pub wanted_subjects: HashSet<String>, 230 #[serde(default)] 231 + pub wanted_subject_prefixes: HashSet<String>, 232 + #[serde(default)] 233 pub wanted_subject_dids: HashSet<String>, 234 #[serde(default)] 235 pub wanted_sources: HashSet<String>, ··· 244 /// 245 /// The at-uri must be url-encoded 246 /// 247 + /// Pass this parameter multiple times to specify multiple subjects, like 248 /// `wantedSubjects=[...]&wantedSubjects=[...]` 249 pub wanted_subjects: String, 250 + /// One or more at-uri, URI, or DID prefixes to receive links about 251 + /// 252 + /// The uri must be url-encoded 253 + /// 254 + /// Pass this parameter multiple times to specify multiple prefixes, like 255 + /// `wantedSubjectPrefixes=[...]&wantedSubjectPrefixes=[...]` 256 + pub wanted_subject_prefixes: String, 257 /// One or more DIDs to receive links about 258 /// 259 + /// Pass this parameter multiple times to specify multiple subjects 260 pub wanted_subject_dids: String, 261 /// One or more link sources to receive links about 262 ///
+10 -1
spacedust/src/subscriber.rs
··· 124 let query = &self.query; 125 126 // subject + subject DIDs are logical OR 127 - if !(query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty() 128 || query.wanted_subjects.contains(&properties.subject) 129 || properties 130 .subject_did 131 .as_ref() ··· 154 } 155 if opts.wanted_subject_dids.len() > 10_000 { 156 return Err(SubscriberUpdateError::TooManyDidsWanted); 157 } 158 if opts.wanted_subjects.len() > 50_000 { 159 return Err(SubscriberUpdateError::TooManySubjectsWanted);
··· 124 let query = &self.query; 125 126 // subject + subject DIDs are logical OR 127 + if !(query.wanted_subjects.is_empty() 128 + && query.wanted_subject_prefixes.is_empty() 129 + && query.wanted_subject_dids.is_empty() 130 || query.wanted_subjects.contains(&properties.subject) 131 + || query 132 + .wanted_subject_prefixes 133 + .iter() 134 + .any(|p| properties.subject.starts_with(p)) 135 || properties 136 .subject_did 137 .as_ref() ··· 160 } 161 if opts.wanted_subject_dids.len() > 10_000 { 162 return Err(SubscriberUpdateError::TooManyDidsWanted); 163 + } 164 + if opts.wanted_subject_prefixes.len() > 100 { 165 + return Err(SubscriberUpdateError::TooManySubjectPrefixesWanted); 166 } 167 if opts.wanted_subjects.len() > 50_000 { 168 return Err(SubscriberUpdateError::TooManySubjectsWanted);
+1 -1
ufos/Cargo.toml
··· 13 clap = { version = "4.5.31", features = ["derive"] } 14 dropshot = "0.16.0" 15 env_logger = "0.11.7" 16 - fjall = { git = "https://github.com/fjall-rs/fjall.git", features = ["lz4"] } 17 getrandom = "0.3.3" 18 http = "1.3.1" 19 jetstream = { path = "../jetstream", features = ["metrics"] }
··· 13 clap = { version = "4.5.31", features = ["derive"] } 14 dropshot = "0.16.0" 15 env_logger = "0.11.7" 16 + fjall = { git = "https://github.com/fjall-rs/fjall.git", rev = "fb229572bb7d1d6966a596994dc1708e47ec57d8", features = ["lz4"] } 17 getrandom = "0.3.3" 18 http = "1.3.1" 19 jetstream = { path = "../jetstream", features = ["metrics"] }