Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Address feedback from fig

Most importantly we transformed the composite cursor into an `Option<(u64, u64)>`. This removes a couple of invalid cursor combinations automatically: if the composite cursor is provided it can be reasonable expected that both parts are present and valid (especially considering that we're the ones building the cursor in the first place). When this is not the case we bail early.

In addition, we removed the redundant base64 dependency (the `OpaqueApiCursor` handles the url-safe hex-encoded representation, and addressed some minor instances where we could have written some parts in a more idiomatic way.

No tests were changed and all tests still pass.

authored by seoul.systems and committed by tangled.org 2dde6d90 7db6c6fc

+76 -85
-1
Cargo.lock
··· 1058 "axum", 1059 "axum-extra", 1060 "axum-metrics", 1061 - "base64 0.22.1", 1062 "bincode 1.3.3", 1063 "clap", 1064 "ctrlc",
··· 1058 "axum", 1059 "axum-extra", 1060 "axum-metrics", 1061 "bincode 1.3.3", 1062 "clap", 1063 "ctrlc",
-1
constellation/Cargo.toml
··· 10 axum = "0.8.1" 11 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 12 axum-metrics = "0.2" 13 - base64 = "0.22.1" 14 bincode = "1.3.3" 15 clap = { workspace = true } 16 ctrlc = "3.4.5"
··· 10 axum = "0.8.1" 11 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 12 axum-metrics = "0.2" 13 bincode = "1.3.3" 14 clap = { workspace = true } 15 ctrlc = "3.4.5"
+7 -8
constellation/src/server/mod.rs
··· 725 return Err(http::StatusCode::BAD_REQUEST); 726 } 727 728 - let filter_dids: HashSet<Did> = HashSet::from_iter( 729 - query 730 - .did 731 - .iter() 732 - .map(|d| d.trim()) 733 - .filter(|d| !d.is_empty()) 734 - .map(|d| Did(d.to_string())), 735 - ); 736 737 let filter_other_subjects: HashSet<String> = HashSet::from_iter( 738 query
··· 725 return Err(http::StatusCode::BAD_REQUEST); 726 } 727 728 + let filter_dids: HashSet<Did> = query 729 + .did 730 + .iter() 731 + .map(|d| d.trim()) 732 + .filter(|d| !d.is_empty()) 733 + .map(Did::from) 734 + .collect(); 735 736 let filter_other_subjects: HashSet<String> = HashSet::from_iter( 737 query
+27 -30
constellation/src/storage/mem_store.rs
··· 1 use super::{ 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 6 use anyhow::{anyhow, Result}; 7 - use base64::engine::general_purpose as b64; 8 - use base64::Engine as _; 9 use links::CollectedLink; 10 11 use std::collections::{HashMap, HashSet}; ··· 256 HashSet::from_iter(filter_targets.iter().map(|s| Target::new(s))); 257 258 // extract parts form composite cursor 259 - let (backward_idx, forward_idx) = match after { 260 Some(a) => { 261 - let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?; 262 - let (b, f) = after_str 263 - .split_once(',') 264 - .ok_or_else(|| anyhow!("invalid cursor format"))?; 265 - ( 266 - (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?, 267 - (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?, 268 - ) 269 } 270 - None => (None, None), 271 }; 272 273 let data = self.0.lock().unwrap(); ··· 285 .iter() 286 .enumerate() 287 .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v))) 288 - .skip_while(|(linker_idx, _)| { 289 - backward_idx.is_some_and(|idx| match forward_idx { 290 - Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping 291 - None => *linker_idx <= idx as usize, // exclusive: skip right here 292 - }) 293 - }) 294 .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(&did)) 295 { 296 let Some(links) = data.links.get(&did).and_then(|m| { ··· 310 *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t)) 311 }) 312 .skip_while(|(link_idx, _)| { 313 - backward_idx.is_some_and(|bl_idx| { 314 - linker_idx == bl_idx as usize 315 - && forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize) 316 }) 317 }) 318 .take(limit as usize + 1 - items.len()) ··· 335 } 336 } 337 338 - let next = if items.len() as u64 > limit { 339 - items.truncate(limit as usize); 340 - items 341 - .last() 342 - .map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64))) 343 - } else { 344 - None 345 - }; 346 347 - let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect(); 348 Ok(PagedOrderedCollection { items, next }) 349 } 350
··· 1 use super::{ 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 + use crate::storage::CompositeCursor; 5 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 6 7 use anyhow::{anyhow, Result}; 8 use links::CollectedLink; 9 10 use std::collections::{HashMap, HashSet}; ··· 255 HashSet::from_iter(filter_targets.iter().map(|s| Target::new(s))); 256 257 // extract parts form composite cursor 258 + let cursor = match after { 259 Some(a) => { 260 + let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 261 + let b = b 262 + .parse::<u64>() 263 + .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 264 + let f = f 265 + .parse::<u64>() 266 + .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 267 + Some(CompositeCursor { 268 + backward: b, 269 + forward: f, 270 + }) 271 } 272 + None => None, 273 }; 274 275 let data = self.0.lock().unwrap(); ··· 287 .iter() 288 .enumerate() 289 .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v))) 290 + .skip_while(|(linker_idx, _)| cursor.is_some_and(|c| *linker_idx < c.backward as usize)) 291 .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(&did)) 292 { 293 let Some(links) = data.links.get(&did).and_then(|m| { ··· 307 *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t)) 308 }) 309 .skip_while(|(link_idx, _)| { 310 + cursor.is_some_and(|c| { 311 + linker_idx == c.backward as usize && *link_idx <= c.forward as usize 312 }) 313 }) 314 .take(limit as usize + 1 - items.len()) ··· 331 } 332 } 333 334 + let next = (items.len() > limit as usize).then(|| { 335 + let (l, f, _, _) = items[limit as usize - 1]; 336 + format!("{l},{f}") 337 + }); 338 + 339 + let items = items 340 + .into_iter() 341 + .take(limit as usize) 342 + .map(|(_, _, rid, t)| (rid, t)) 343 + .collect(); 344 345 Ok(PagedOrderedCollection { items, next }) 346 } 347
+7
constellation/src/storage/mod.rs
··· 39 } 40 } 41 42 /// A paged collection whose keys are sorted instead of indexed 43 /// 44 /// this has weaker guarantees than PagedAppendingCollection: it might
··· 39 } 40 } 41 42 + // get-many-to-many composite cursor 43 + #[derive(Copy, Clone, Debug)] 44 + struct CompositeCursor { 45 + backward: u64, 46 + forward: u64, 47 + } 48 + 49 /// A paged collection whose keys are sorted instead of indexed 50 /// 51 /// this has weaker guarantees than PagedAppendingCollection: it might
+35 -45
constellation/src/storage/rocks_store.rs
··· 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 PagedOrderedCollection, StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 7 use anyhow::{anyhow, bail, Result}; 8 - use base64::engine::general_purpose as b64; 9 - use base64::Engine as _; 10 use bincode::Options as BincodeOptions; 11 use links::CollectedLink; 12 use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; ··· 1140 filter_to_targets: &HashSet<String>, 1141 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1142 // helper to resolve dids 1143 - let resolve_active_did = |did_id: &DidId| -> Option<Did> { 1144 - let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0).ok()? else { 1145 eprintln!("failed to look up did from did_id {did_id:?}"); 1146 - return None; 1147 }; 1148 - let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did).ok()? 1149 - else { 1150 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1151 - return None; 1152 }; 1153 - active.then_some(did) 1154 }; 1155 1156 // setup variables that we need later ··· 1158 let path = RPath(path.to_string()); 1159 1160 // extract parts form composite cursor 1161 - let (backward_idx, forward_idx) = match after { 1162 Some(a) => { 1163 - eprintln!("a: {:#?}", a); 1164 - 1165 - let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?; 1166 - 1167 - eprintln!("after_str: {:#?}", after_str); 1168 - let (b, f) = after_str 1169 - .split_once(',') 1170 - .ok_or_else(|| anyhow!("invalid cursor format"))?; 1171 - ( 1172 - (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?, 1173 - (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?, 1174 - ) 1175 } 1176 - None => (None, None), 1177 }; 1178 1179 - eprintln!("backward_idx: {:#?}", backward_idx); 1180 - eprintln!("forward_idx: {:#?}", forward_idx); 1181 1182 // (__active__) did ids and filter targets 1183 let filter_did_ids: HashMap<DidId, bool> = filter_dids ··· 1206 // iterate backwards (who linked to the target?) 1207 for (linker_idx, (did_id, rkey)) in 1208 linkers.0.iter().enumerate().skip_while(|(linker_idx, _)| { 1209 - backward_idx.is_some_and(|idx| match forward_idx { 1210 - Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping 1211 - None => *linker_idx <= idx as usize, // exclusive: skip right here 1212 - }) 1213 }) 1214 { 1215 if did_id.is_empty() ··· 1239 || filter_to_target_ids.contains(target_id)) 1240 }) 1241 .skip_while(|(link_idx, _)| { 1242 - backward_idx.is_some_and(|bl_idx| { 1243 - linker_idx == bl_idx as usize 1244 - && forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize) 1245 }) 1246 }) 1247 .take(limit as usize + 1 - items.len()) 1248 { 1249 // extract forward target did (target that links to the __other__ target) 1250 - let Some(did) = resolve_active_did(did_id) else { 1251 continue; 1252 }; 1253 // resolve to target string 1254 let Some(fwd_target_key) = self 1255 .target_id_table 1256 - .get_val_from_id(&self.db, fwd_target_id.0) 1257 - .ok() 1258 - .flatten() 1259 else { 1260 continue; 1261 }; ··· 1284 // and at backlink_vec_idx itself, forward links at or before 1285 // forward_link_idx are skipped. This correctly resumes mid-record when 1286 // a single backlinker has multiple forward links at path_to_other. 1287 - let next = if items.len() as u64 > limit { 1288 - items.truncate(limit as usize); 1289 - items 1290 - .last() 1291 - .map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64))) 1292 - } else { 1293 - None 1294 - }; 1295 1296 - let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect(); 1297 1298 Ok(PagedOrderedCollection { items, next }) 1299 }
··· 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 PagedOrderedCollection, StorageStats, 4 }; 5 + use crate::storage::CompositeCursor; 6 use crate::{CountsByCount, Did, RecordId}; 7 8 use anyhow::{anyhow, bail, Result}; 9 use bincode::Options as BincodeOptions; 10 use links::CollectedLink; 11 use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; ··· 1139 filter_to_targets: &HashSet<String>, 1140 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1141 // helper to resolve dids 1142 + let resolve_active_did = |did_id: &DidId| -> Result<Option<Did>> { 1143 + let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1144 eprintln!("failed to look up did from did_id {did_id:?}"); 1145 + return Ok(None); 1146 }; 1147 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1148 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1149 + return Ok(None); 1150 }; 1151 + Ok(active.then_some(did)) 1152 }; 1153 1154 // setup variables that we need later ··· 1156 let path = RPath(path.to_string()); 1157 1158 // extract parts form composite cursor 1159 + let cursor = match after { 1160 Some(a) => { 1161 + let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 1162 + let b = b 1163 + .parse::<u64>() 1164 + .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 1165 + let f = f 1166 + .parse::<u64>() 1167 + .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 1168 + Some(CompositeCursor { 1169 + backward: b, 1170 + forward: f, 1171 + }) 1172 } 1173 + None => None, 1174 }; 1175 1176 + eprintln!("cursor: {:#?}", cursor); 1177 1178 // (__active__) did ids and filter targets 1179 let filter_did_ids: HashMap<DidId, bool> = filter_dids ··· 1202 // iterate backwards (who linked to the target?) 1203 for (linker_idx, (did_id, rkey)) in 1204 linkers.0.iter().enumerate().skip_while(|(linker_idx, _)| { 1205 + cursor.is_some_and(|c| *linker_idx < c.backward as usize) 1206 }) 1207 { 1208 if did_id.is_empty() ··· 1232 || filter_to_target_ids.contains(target_id)) 1233 }) 1234 .skip_while(|(link_idx, _)| { 1235 + cursor.is_some_and(|c| { 1236 + linker_idx == c.backward as usize && *link_idx <= c.forward as usize 1237 }) 1238 }) 1239 .take(limit as usize + 1 - items.len()) 1240 { 1241 // extract forward target did (target that links to the __other__ target) 1242 + let Some(did) = resolve_active_did(did_id)? else { 1243 continue; 1244 }; 1245 // resolve to target string 1246 let Some(fwd_target_key) = self 1247 .target_id_table 1248 + .get_val_from_id(&self.db, fwd_target_id.0)? 1249 else { 1250 continue; 1251 }; ··· 1274 // and at backlink_vec_idx itself, forward links at or before 1275 // forward_link_idx are skipped. This correctly resumes mid-record when 1276 // a single backlinker has multiple forward links at path_to_other. 1277 + let next = (items.len() > limit as usize).then(|| { 1278 + let (l, f, _, _) = items[limit as usize - 1]; 1279 + format!("{l},{f}") 1280 + }); 1281 1282 + let items = items 1283 + .into_iter() 1284 + .take(limit as usize) 1285 + .map(|(_, _, rid, t)| (rid, t)) 1286 + .collect(); 1287 1288 Ok(PagedOrderedCollection { items, next }) 1289 }