Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

wip

authored by seoul.systems and committed by tangled.org 627eaa85 a84f1eab

+204 -234
+75 -88
constellation/src/storage/mem_store.rs
··· 1 1 use super::{ 2 2 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 3 }; 4 - use crate::storage::{decode_m2m_cursor, encode_m2m_cursor}; 5 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 6 - use anyhow::Result; 5 + 6 + use anyhow::{anyhow, Result}; 7 + use base64::engine::general_purpose as b64; 8 + use base64::Engine as _; 7 9 use links::CollectedLink; 10 + 8 11 use std::collections::{HashMap, HashSet}; 9 12 use std::sync::{Arc, Mutex}; 10 13 ··· 245 248 limit: u64, 246 249 after: Option<String>, 247 250 filter_dids: &HashSet<Did>, 248 - filter_to_targets: &HashSet<String>, 251 + filter_targets: &HashSet<String>, 249 252 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 250 - let empty_res = Ok(PagedOrderedCollection { 251 - items: Vec::new(), 252 - next: None, 253 - }); 253 + // setup variables that we need later 254 + let path_to_other = RecordPath(path_to_other.to_string()); 255 + let filter_targets: HashSet<Target> = 256 + HashSet::from_iter(filter_targets.iter().map(|s| Target::new(s))); 257 + 258 + // extract parts form composite cursor 259 + let (backward_idx, forward_idx) = match after { 260 + Some(a) => { 261 + let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?; 262 + let (b, f) = after_str 263 + .split_once(',') 264 + .ok_or_else(|| anyhow!("invalid cursor format"))?; 265 + ( 266 + (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?, 267 + (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?, 268 + ) 269 + } 270 + None => (None, None), 271 + }; 254 272 255 273 let data = self.0.lock().unwrap(); 256 - 257 274 let Some(sources) = data.targets.get(&Target::new(target)) else { 258 - return empty_res; 275 + return Ok(PagedOrderedCollection::empty()); 259 276 }; 260 277 let Some(linkers) = sources.get(&Source::new(collection, path)) else { 261 - return empty_res; 278 + return Ok(PagedOrderedCollection::empty()); 262 279 }; 263 - let path_to_other = RecordPath::new(path_to_other); 264 280 265 - // Convert filter_to_targets to Target objects for comparison 266 - let filter_to_target_objs: HashSet<Target> = 267 - HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s))); 281 + let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new(); 268 282 269 - let mut grouped_links: HashMap<Target, Vec<RecordId>> = HashMap::new(); 270 - for (did, rkey) in linkers.iter().flatten().cloned() { 271 - // Filter by DID if filter is provided 272 - if !filter_dids.is_empty() && !filter_dids.contains(&did) { 273 - continue; 274 - } 275 - if let Some(fwd_target) = data 276 - .links 277 - .get(&did) 278 - .unwrap_or(&HashMap::new()) 279 - .get(&RepoId { 283 + // iterate backwards (who linked to the target?) 284 + for (linker_idx, (did, rkey)) in linkers 285 + .iter() 286 + .enumerate() 287 + .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v))) 288 + .skip_while(|(linker_idx, _)| { 289 + backward_idx.is_some_and(|idx| match forward_idx { 290 + Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping 291 + None => *linker_idx <= idx as usize, // exclusive: skip right here 292 + }) 293 + }) 294 + .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(&did)) 295 + { 296 + let Some(links) = data.links.get(&did).and_then(|m| { 297 + m.get(&RepoId { 280 298 collection: collection.to_string(), 281 299 rkey: rkey.clone(), 282 300 }) 283 - .unwrap_or(&Vec::new()) 301 + }) else { 302 + continue; 303 + }; 304 + 305 + // iterate forward (which of these links point to the __other__ target?) 306 + for (link_idx, (_, fwd_target)) in links 284 307 .iter() 285 - .find_map(|(path, target)| { 286 - if *path == path_to_other 287 - && (filter_to_target_objs.is_empty() 288 - || filter_to_target_objs.contains(target)) 289 - { 290 - Some(target) 291 - } else { 292 - None 293 - } 308 + .enumerate() 309 + .filter(|(_, (p, t))| { 310 + *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t)) 294 311 }) 312 + .skip_while(|(link_idx, _)| { 313 + backward_idx.is_some_and(|bl_idx| { 314 + linker_idx == bl_idx as usize 315 + && forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize) 316 + }) 317 + }) 318 + .take(limit as usize + 1 - items.len()) 295 319 { 296 - let record_ids = grouped_links.entry(fwd_target.clone()).or_default(); 297 - record_ids.push(RecordId { 298 - did, 299 - collection: collection.to_string(), 300 - rkey: rkey.0, 301 - }); 320 + items.push(( 321 + linker_idx, 322 + link_idx, 323 + RecordId { 324 + did: did.clone(), 325 + collection: collection.to_string(), 326 + rkey: rkey.0.clone(), 327 + }, 328 + fwd_target.0.clone(), 329 + )); 302 330 } 303 - } 304 331 305 - let mut items = grouped_links 306 - .into_iter() 307 - .flat_map(|(target, records)| { 308 - records 309 - .iter() 310 - .map(move |r| (r.clone(), target.0.clone())) 311 - .collect::<Vec<_>>() 312 - }) 313 - .collect::<Vec<_>>(); 314 - 315 - // first try to sort by subject, then by did, collection and finally rkey 316 - items.sort_by(|a, b| { 317 - if a.1 == b.1 { 318 - a.0.cmp(&b.0) 319 - } else { 320 - a.1.cmp(&b.1) 332 + // page full - eject 333 + if items.len() > limit as usize { 334 + break; 321 335 } 322 - }); 323 - 324 - // Parse cursor if provided (malformed cursor silently ignored) 325 - let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok()); 326 - 327 - // Apply cursor: skip everything up to and including the cursor position 328 - items = items 329 - .into_iter() 330 - .skip_while(|item| { 331 - let Some((after_did, after_rkey, after_subject)) = &after_cursor else { 332 - return false; 333 - }; 336 + } 334 337 335 - if &item.1 == after_subject { 336 - // Same subject — compare by RecordId to find our position 337 - let cursor_id = RecordId { 338 - did: Did(after_did.clone()), 339 - collection: collection.to_string(), 340 - rkey: after_rkey.clone(), 341 - }; 342 - item.0.cmp(&cursor_id).is_le() 343 - } else { 344 - // Different subject — compare subjects directly 345 - item.1.cmp(after_subject).is_le() 346 - } 347 - }) 348 - .take(limit as usize + 1) 349 - .collect(); 350 - 351 - // Build the new cursor from last item, if needed 352 338 let next = if items.len() as u64 > limit { 353 339 items.truncate(limit as usize); 354 340 items 355 341 .last() 356 - .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1)) 342 + .map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64))) 357 343 } else { 358 344 None 359 345 }; 360 346 347 + let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect(); 361 348 Ok(PagedOrderedCollection { items, next }) 362 349 } 363 350
-34
constellation/src/storage/mod.rs
··· 6 6 pub mod mem_store; 7 7 pub use mem_store::MemStorage; 8 8 9 - use anyhow::anyhow; 10 - 11 - use base64::engine::general_purpose as b64; 12 - use base64::Engine as _; 13 - 14 9 #[cfg(feature = "rocks")] 15 10 pub mod rocks_store; 16 11 #[cfg(feature = "rocks")] ··· 159 154 160 155 /// assume all stats are estimates, since exact counts are very challenging for LSMs 161 156 fn get_stats(&self) -> Result<StorageStats>; 162 - } 163 - 164 - // Shared helpers 165 - 166 - /// Decode a base64 cursor into its component parts (did, rkey, subject). 167 - /// The subject is placed last because it may contain '|' characters. 168 - pub(crate) fn decode_m2m_cursor(cursor: &str) -> Result<(String, String, String)> { 169 - let decoded = String::from_utf8(b64::URL_SAFE.decode(cursor)?)?; 170 - let mut parts = decoded.splitn(3, '|').map(String::from); 171 - 172 - // Using .next() to pull each part out of the iterator in order. 173 - // This avoids collecting into a Vec just to index and clone back out. 174 - let did = parts 175 - .next() 176 - .ok_or_else(|| anyhow!("missing did in cursor"))?; 177 - let rkey = parts 178 - .next() 179 - .ok_or_else(|| anyhow!("missing rkey in cursor"))?; 180 - let subject = parts 181 - .next() 182 - .ok_or_else(|| anyhow!("missing subject in cursor"))?; 183 - 184 - Ok((did, rkey, subject)) 185 - } 186 - 187 - /// Encode cursor components into a base64 string. 188 - pub(crate) fn encode_m2m_cursor(did: &str, rkey: &str, subject: &str) -> String { 189 - let raw = format!("{did}|{rkey}|{subject}"); 190 - b64::URL_SAFE.encode(&raw) 191 157 } 192 158 193 159 #[cfg(test)]
+129 -112
constellation/src/storage/rocks_store.rs
··· 2 2 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 3 PagedOrderedCollection, StorageStats, 4 4 }; 5 - use crate::storage::{decode_m2m_cursor, encode_m2m_cursor}; 6 5 use crate::{CountsByCount, Did, RecordId}; 7 - use anyhow::{bail, Result}; 6 + 7 + use anyhow::{anyhow, bail, Result}; 8 + use base64::engine::general_purpose as b64; 9 + use base64::Engine as _; 8 10 use bincode::Options as BincodeOptions; 9 11 use links::CollectedLink; 10 12 use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; ··· 15 17 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 16 18 }; 17 19 use serde::{Deserialize, Serialize}; 20 + use tokio_util::sync::CancellationToken; 21 + 18 22 use std::collections::{BTreeMap, HashMap, HashSet}; 19 23 use std::io::Read; 20 24 use std::marker::PhantomData; ··· 25 29 }; 26 30 use std::thread; 27 31 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 28 - use tokio_util::sync::CancellationToken; 29 32 30 33 static DID_IDS_CF: &str = "did_ids"; 31 34 static TARGET_IDS_CF: &str = "target_ids"; ··· 1136 1139 filter_dids: &HashSet<Did>, 1137 1140 filter_to_targets: &HashSet<String>, 1138 1141 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> { 1142 + // helper to resolve dids 1143 + let resolve_active_did = |did_id: &DidId| -> Option<Did> { 1144 + let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0).ok()? else { 1145 + eprintln!("failed to look up did from did_id {did_id:?}"); 1146 + return None; 1147 + }; 1148 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did).ok()? 1149 + else { 1150 + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1151 + return None; 1152 + }; 1153 + active.then_some(did) 1154 + }; 1155 + 1156 + // setup variables that we need later 1139 1157 let collection = Collection(collection.to_string()); 1140 1158 let path = RPath(path.to_string()); 1141 1159 1142 - let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1160 + // extract parts form composite cursor 1161 + let (backward_idx, forward_idx) = match after { 1162 + Some(a) => { 1163 + eprintln!("a: {:#?}", a); 1143 1164 1144 - // Parse cursor if provided (malformed cursor silently ignored) 1145 - let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok()); 1165 + let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?; 1146 1166 1147 - let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1148 - eprintln!("Target not found for {target_key:?}"); 1149 - return Ok(PagedOrderedCollection::empty()); 1167 + eprintln!("after_str: {:#?}", after_str); 1168 + let (b, f) = after_str 1169 + .split_once(',') 1170 + .ok_or_else(|| anyhow!("invalid cursor format"))?; 1171 + ( 1172 + (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?, 1173 + (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?, 1174 + ) 1175 + } 1176 + None => (None, None), 1150 1177 }; 1151 1178 1179 + eprintln!("backward_idx: {:#?}", backward_idx); 1180 + eprintln!("forward_idx: {:#?}", forward_idx); 1181 + 1182 + // (__active__) did ids and filter targets 1152 1183 let filter_did_ids: HashMap<DidId, bool> = filter_dids 1153 1184 .iter() 1154 1185 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) ··· 1156 1187 .into_iter() 1157 1188 .map(|DidIdValue(id, active)| (id, active)) 1158 1189 .collect(); 1159 - 1160 1190 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 1161 1191 for t in filter_to_targets { 1162 1192 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { ··· 1164 1194 } 1165 1195 } 1166 1196 1197 + let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1198 + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1199 + eprintln!("Target not found for {target_key:?}"); 1200 + return Ok(PagedOrderedCollection::empty()); 1201 + }; 1202 + 1167 1203 let linkers = self.get_target_linkers(&target_id)?; 1168 1204 1169 - // we want to provide many to many which effectively means that we want to show a specific 1170 - // list of reords that is linked to by a specific number of linkers 1171 - let mut grouped_links: BTreeMap<TargetId, Vec<RecordId>> = BTreeMap::new(); 1172 - for (did_id, rkey) in linkers.0 { 1173 - if did_id.is_empty() { 1205 + eprintln!("linkers: {:#?}", linkers); 1206 + 1207 + let mut items: Vec<(usize, TargetId, RecordId)> = Vec::new(); 1208 + 1209 + // iterate backwards (who linked to the target?) 1210 + for (linker_idx, (did_id, rkey)) in 1211 + linkers.0.iter().enumerate().skip_while(|(linker_idx, _)| { 1212 + backward_idx.is_some_and(|idx| match forward_idx { 1213 + Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping 1214 + None => *linker_idx <= idx as usize, // exclusive: skip right here 1215 + }) 1216 + }) 1217 + { 1218 + // filter target did 1219 + if did_id.is_empty() 1220 + || (!filter_did_ids.is_empty() && filter_did_ids.get(&did_id).is_none()) 1221 + { 1174 1222 continue; 1175 1223 } 1176 1224 1177 - if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) { 1178 - continue; 1179 - } 1225 + eprintln!("did_did: {:#?}", did_id); 1180 1226 1181 - // Make sure the current did is active 1182 - let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1183 - eprintln!("failed to look up did from did_id {did_id:?}"); 1227 + let Some(targets) = self.get_record_link_targets(&RecordLinkKey( 1228 + *did_id, 1229 + collection.clone(), 1230 + rkey.clone(), 1231 + ))? 1232 + else { 1184 1233 continue; 1185 1234 }; 1186 - let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1187 - eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1235 + let Some(fwd_target_id) = 1236 + targets 1237 + .0 1238 + .into_iter() 1239 + .find_map(|RecordLinkTarget(rpath, target_id)| { 1240 + eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0); 1241 + if rpath.0 == path_to_other 1242 + && (filter_to_target_ids.is_empty() 1243 + || filter_to_target_ids.contains(&target_id)) 1244 + { 1245 + Some(target_id) 1246 + } else { 1247 + None 1248 + } 1249 + }) 1250 + else { 1188 1251 continue; 1189 1252 }; 1190 - if !active { 1253 + 1254 + if backward_idx.is_some_and(|bl_idx| { 1255 + linker_idx == bl_idx as usize 1256 + && forward_idx.is_some_and(|fwd_idx| fwd_target_id.0 <= fwd_idx) 1257 + }) { 1191 1258 continue; 1192 1259 } 1193 1260 1194 - let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey.clone()); 1195 - let Some(targets) = self.get_record_link_targets(&record_link_key)? else { 1196 - continue; 1197 - }; 1261 + let page_is_full = items.len() as u64 >= limit; 1198 1262 1199 - let Some(fwd_target) = targets 1200 - .0 1201 - .into_iter() 1202 - .filter_map(|RecordLinkTarget(rpath, target_id)| { 1203 - if rpath.0 == path_to_other 1204 - && (filter_to_target_ids.is_empty() 1205 - || filter_to_target_ids.contains(&target_id)) 1206 - { 1207 - Some(target_id) 1208 - } else { 1209 - None 1210 - } 1211 - }) 1212 - .take(1) 1213 - .next() 1214 - else { 1215 - eprintln!("no forward match found."); 1216 - continue; 1217 - }; 1263 + eprintln!( 1264 + "page_is_full: {page_is_full} for items.len(): {}", 1265 + items.len() 1266 + ); 1218 1267 1219 - let page_is_full = grouped_links.len() as u64 >= limit; 1220 1268 if page_is_full { 1221 - let current_max = grouped_links.keys().next_back().unwrap(); 1222 - if fwd_target > *current_max { 1269 + let current_max = items.iter().next_back().unwrap().1; 1270 + if fwd_target_id > current_max { 1223 1271 continue; 1224 1272 } 1225 1273 } 1274 + 1275 + // extract forward target did (target that links to the __other__ target) 1276 + let Some(did) = resolve_active_did(did_id) else { 1277 + continue; 1278 + }; 1226 1279 1227 1280 // link to be added 1228 1281 let record_id = RecordId { 1229 1282 did, 1230 1283 collection: collection.0.clone(), 1231 - rkey: rkey.0, 1284 + rkey: rkey.0.clone(), 1232 1285 }; 1286 + items.push((linker_idx, fwd_target_id, record_id)); 1287 + } 1233 1288 1234 - // pagination: 1235 - if after_cursor.is_some() { 1236 - // extract composite-cursor parts 1237 - let Some((after_did, after_rkey, after_subject)) = &after_cursor else { 1238 - continue; 1239 - }; 1240 - 1241 - let Some(fwd_target_key) = self 1289 + let mut backward_idx = None; 1290 + let mut forward_idx = None; 1291 + let mut items: Vec<_> = items 1292 + .iter() 1293 + .filter_map(|(b_idx, fwd_target_id, record)| { 1294 + let Some(target_key) = self 1242 1295 .target_id_table 1243 - .get_val_from_id(&self.db, fwd_target.0)? 1296 + .get_val_from_id(&self.db, fwd_target_id.0) 1297 + .ok()? 1244 1298 else { 1245 - eprintln!("failed to look up target from target_id {fwd_target:?}"); 1246 - continue; 1299 + eprintln!("failed to look up target from target_id {fwd_target_id:?}"); 1300 + return None; 1247 1301 }; 1248 1302 1249 - // first try and compare by subject only 1250 - if &fwd_target_key.0 .0 != after_subject 1251 - && fwd_target_key.0 .0.cmp(after_subject).is_le() 1252 - { 1253 - continue; 1254 - } 1303 + backward_idx = Some(b_idx); 1304 + forward_idx = Some(fwd_target_id.0 - 1); 1255 1305 1256 - // then, if needed, we compare by record id 1257 - let cursor_id = RecordId { 1258 - did: Did(after_did.clone()), 1259 - collection: collection.0.clone(), 1260 - rkey: after_rkey.clone(), 1261 - }; 1262 - if record_id.cmp(&cursor_id).is_le() { 1263 - continue; 1264 - } 1265 - } 1266 - 1267 - // pagination, continued 1268 - let mut should_evict = false; 1269 - let entry = grouped_links.entry(fwd_target.clone()).or_insert_with(|| { 1270 - should_evict = page_is_full; 1271 - Vec::default() 1272 - }); 1273 - entry.push(record_id); 1274 - 1275 - if should_evict { 1276 - grouped_links.pop_last(); 1277 - } 1278 - } 1279 - 1280 - let mut items: Vec<(RecordId, String)> = Vec::with_capacity(grouped_links.len()); 1281 - for (fwd_target_id, records) in &grouped_links { 1282 - let Some(target_key) = self 1283 - .target_id_table 1284 - .get_val_from_id(&self.db, fwd_target_id.0)? 1285 - else { 1286 - eprintln!("failed to look up target from target_id {fwd_target_id:?}"); 1287 - continue; 1288 - }; 1289 - 1290 - let target_string = target_key.0 .0; 1291 - 1292 - records 1293 - .iter() 1294 - .for_each(|r| items.push((r.clone(), target_string.clone()))); 1295 - } 1306 + Some((record.clone(), target_key.0 .0)) 1307 + }) 1308 + .collect(); 1296 1309 1297 1310 // Build new cursor from last the item, if needed 1298 1311 let next = if items.len() as u64 > limit { 1299 1312 items.truncate(limit as usize); 1300 - items 1301 - .last() 1302 - .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1)) 1313 + items.last().and_then(|_| { 1314 + Some(b64::URL_SAFE.encode(format!( 1315 + "{},{}", 1316 + backward_idx?.to_string(), 1317 + forward_idx?.to_string() 1318 + ))) 1319 + }) 1303 1320 } else { 1304 1321 None 1305 1322 }; ··· 1645 1662 } 1646 1663 1647 1664 // target ids 1648 - #[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1665 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1649 1666 struct TargetId(u64); // key 1650 1667 1651 1668 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]