at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[db,api] make block keys use cid raw bytes, deduplicate listRecords impl

ptr.pet 2d23aa5b 5d31bccd

verified
+197 -174
+1
Cargo.lock
··· 1621 1621 "async-stream", 1622 1622 "axum", 1623 1623 "chrono", 1624 + "cid", 1624 1625 "data-encoding", 1625 1626 "fjall", 1626 1627 "futures",
+1
Cargo.toml
··· 41 41 hex = "0.4" 42 42 scc = "3" 43 43 data-encoding = "2.10.0" 44 + cid = "0.11.1"
+11 -7
src/api/debug.rs
··· 42 42 .map_err(|_| StatusCode::BAD_REQUEST)?; 43 43 44 44 let db = &state.db; 45 - let ks = db.records.clone(); 45 + let ks = db 46 + .record_partition(&req.collection) 47 + .map_err(|_| StatusCode::NOT_FOUND)?; 46 48 47 - // {did_prefix}\x00{collection}\x00 49 + // {TrimmedDid}\x00 48 50 let mut prefix = Vec::new(); 49 51 TrimmedDid::from(&did).write_to_vec(&mut prefix); 50 - prefix.push(keys::SEP); 51 - prefix.extend_from_slice(req.collection.as_bytes()); 52 52 prefix.push(keys::SEP); 53 53 54 54 let count = tokio::task::spawn_blocking(move || { ··· 185 185 let items = tokio::task::spawn_blocking(move || { 186 186 let limit = req.limit.unwrap_or(50).min(1000); 187 187 188 - // Helper closure to avoid generic type complexity 189 188 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| { 190 189 let mut items = Vec::new(); 191 190 for guard in iter.take(limit) { ··· 247 246 fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> { 248 247 match name { 249 248 "repos" => Ok(db.repos.clone()), 250 - "records" => Ok(db.records.clone()), 251 249 "blocks" => Ok(db.blocks.clone()), 252 250 "cursors" => Ok(db.cursors.clone()), 253 251 "pending" => Ok(db.pending.clone()), 254 252 "resync" => Ok(db.resync.clone()), 255 253 "events" => Ok(db.events.clone()), 256 254 "counts" => Ok(db.counts.clone()), 257 - _ => Err(StatusCode::BAD_REQUEST), 255 + _ => { 256 + if let Some(col) = name.strip_prefix(crate::db::RECORDS_PARTITION_PREFIX) { 257 + db.record_partition(col).map_err(|_| StatusCode::NOT_FOUND) 258 + } else { 259 + Err(StatusCode::BAD_REQUEST) 260 + } 261 + } 258 262 } 259 263 }
+2 -5
src/api/stream.rs
··· 97 97 98 98 let marshallable = { 99 99 let mut record_val = None; 100 - if let Some(cid_struct) = &cid { 101 - let cid_str = cid_struct.to_string(); 102 - if let Ok(Some(block_bytes)) = 103 - db.blocks.get(keys::block_key(&cid_str)) 104 - { 100 + if let Some(cid) = &cid { 101 + if let Ok(Some(block_bytes)) = db.blocks.get(&cid.to_bytes()) { 105 102 if let Ok(raw_data) = 106 103 serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) 107 104 {
+73 -97
src/api/xrpc.rs
··· 3 3 use crate::db::{self, Db, keys}; 4 4 use axum::{Json, Router, extract::State, http::StatusCode}; 5 5 use futures::TryFutureExt; 6 + use jacquard::cowstr::ToCowStr; 6 7 use jacquard::types::ident::AtIdentifier; 7 8 use jacquard::{ 8 9 IntoStatic, ··· 21 22 }, 22 23 xrpc::{GenericXrpcError, XrpcError}, 23 24 }; 25 + use miette::IntoDiagnostic; 24 26 use serde::{Deserialize, Serialize}; 25 27 use smol_str::ToSmolStr; 26 28 use std::{fmt::Display, sync::Arc}; ··· 76 78 .await 77 79 .map_err(|e| bad_request(GetRecord::NSID, e))?; 78 80 79 - let db_key = keys::record_key(&did, req.collection.as_str(), req.rkey.0.as_str()); 81 + let partition = db 82 + .record_partition(req.collection.as_str()) 83 + .map_err(|e| internal_error(GetRecord::NSID, e))?; 80 84 81 - let cid_bytes = Db::get(db.records.clone(), db_key) 85 + let db_key = keys::record_key(&did, req.rkey.0.as_str()); // 2 args 86 + 87 + let cid_bytes = Db::get(partition, db_key) 82 88 .await 83 89 .map_err(|e| internal_error(GetRecord::NSID, e))?; 84 90 85 91 if let Some(cid_bytes) = cid_bytes { 86 - let cid_str = 87 - std::str::from_utf8(&cid_bytes).map_err(|e| internal_error(GetRecord::NSID, e))?; 88 - 89 - let block_bytes = Db::get(db.blocks.clone(), keys::block_key(cid_str)) 92 + // lookup block using binary cid 93 + let block_bytes = Db::get(db.blocks.clone(), &cid_bytes) 90 94 .await 91 95 .map_err(|e| internal_error(GetRecord::NSID, e))? 92 96 .ok_or_else(|| internal_error(GetRecord::NSID, "not found"))?; ··· 94 98 let value: Data = serde_ipld_dagcbor::from_slice(&block_bytes) 95 99 .map_err(|e| internal_error(GetRecord::NSID, e))?; 96 100 97 - let cid = Cid::new(cid_str.as_bytes()).unwrap().into_static(); 101 + let cid = Cid::new(&cid_bytes) 102 + .map_err(|e| internal_error(GetRecord::NSID, e))? 103 + .into_static(); 98 104 99 105 Ok(Json(GetRecordOutput { 100 106 uri: AtUri::from_parts_owned( ··· 103 109 req.rkey.0.as_str(), 104 110 ) 105 111 .unwrap(), 106 - cid: Some(cid), 112 + cid: Some(Cid::Str(cid.to_cowstr()).into_static()), 107 113 value: value.into_static(), 108 114 extra_data: Default::default(), 109 115 })) ··· 126 132 .await 127 133 .map_err(|e| bad_request(ListRecords::NSID, e))?; 128 134 129 - let prefix = format!( 130 - "{}{}{}{}", 131 - TrimmedDid::from(&did), 132 - keys::SEP as char, 133 - req.collection.as_str(), 134 - keys::SEP as char 135 - ); 135 + let ks = db 136 + .record_partition(req.collection.as_str()) 137 + .map_err(|e| internal_error(ListRecords::NSID, e))?; 138 + 139 + let mut prefix = Vec::new(); 140 + TrimmedDid::from(&did).write_to_vec(&mut prefix); 141 + prefix.push(keys::SEP); 136 142 137 143 let limit = req.limit.unwrap_or(50).min(100) as usize; 138 144 let reverse = req.reverse.unwrap_or(false); 139 - let ks = db.records.clone(); 140 145 let blocks_ks = db.blocks.clone(); 141 146 142 147 let did_str = smol_str::SmolStr::from(did.as_str()); ··· 146 151 let mut results = Vec::new(); 147 152 let mut cursor = None; 148 153 149 - let mut end_prefix = prefix.clone().into_bytes(); 150 - if let Some(last) = end_prefix.last_mut() { 151 - *last += 1; 152 - } 154 + let iter: Box<dyn Iterator<Item = _>> = if !reverse { 155 + let mut end_prefix = prefix.clone(); 156 + if let Some(last) = end_prefix.last_mut() { 157 + *last += 1; 158 + } 153 159 154 - if !reverse { 155 160 let end_key = if let Some(cursor) = &req.cursor { 156 - format!("{}{}", prefix, cursor).into_bytes() 161 + let mut k = prefix.clone(); 162 + k.extend_from_slice(cursor.as_bytes()); 163 + k 157 164 } else { 158 - end_prefix.clone() 165 + end_prefix 159 166 }; 160 167 161 - for item in ks.range(prefix.as_bytes()..end_key.as_slice()).rev() { 162 - let (key, cid_bytes) = item.into_inner().ok()?; 163 - 164 - if !key.starts_with(prefix.as_bytes()) { 165 - break; 166 - } 167 - if results.len() >= limit { 168 - let key_str = String::from_utf8_lossy(&key); 169 - if let Some(last_part) = key_str.split(keys::SEP as char).last() { 170 - cursor = Some(smol_str::SmolStr::from(last_part)); 171 - } 172 - break; 173 - } 174 - 175 - let key_str = String::from_utf8_lossy(&key); 176 - let parts: Vec<&str> = key_str.split(keys::SEP as char).collect(); 177 - if parts.len() == 3 { 178 - let rkey = parts[2]; 179 - let cid_str = std::str::from_utf8(&cid_bytes).ok()?; 180 - 181 - if let Ok(Some(block_bytes)) = blocks_ks.get(keys::block_key(cid_str)) { 182 - let val: Data = 183 - serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null); 184 - let cid = Cid::new(cid_str.as_bytes()).unwrap().into_static(); 185 - results.push(RepoRecord { 186 - uri: AtUri::from_parts_owned( 187 - did_str.as_str(), 188 - collection_str.as_str(), 189 - rkey, 190 - ) 191 - .unwrap(), 192 - cid, 193 - value: val.into_static(), 194 - extra_data: Default::default(), 195 - }); 196 - } 197 - } 198 - } 168 + Box::new(ks.range(prefix.as_slice()..end_key.as_slice()).rev()) 199 169 } else { 200 170 let start_key = if let Some(cursor) = &req.cursor { 201 - format!("{}{}\0", prefix, cursor).into_bytes() 171 + let mut k = prefix.clone(); 172 + k.extend_from_slice(cursor.as_bytes()); 173 + k.push(0); 174 + k 202 175 } else { 203 - prefix.clone().into_bytes() 176 + prefix.clone() 204 177 }; 205 178 206 - for item in ks.range(start_key.as_slice()..) { 207 - let (key, cid_bytes) = item.into_inner().ok()?; 179 + Box::new(ks.range(start_key.as_slice()..)) 180 + }; 208 181 209 - if !key.starts_with(prefix.as_bytes()) { 210 - break; 211 - } 212 - if results.len() >= limit { 213 - let key_str = String::from_utf8_lossy(&key); 214 - if let Some(last_part) = key_str.split(keys::SEP as char).last() { 215 - cursor = Some(smol_str::SmolStr::from(last_part)); 216 - } 217 - break; 218 - } 182 + for item in iter { 183 + let (key, cid_bytes) = item.into_inner().into_diagnostic()?; 219 184 185 + if !key.starts_with(prefix.as_slice()) { 186 + break; 187 + } 188 + if results.len() >= limit { 220 189 let key_str = String::from_utf8_lossy(&key); 221 - let parts: Vec<&str> = key_str.split(keys::SEP as char).collect(); 222 - if parts.len() == 3 { 223 - let rkey = parts[2]; 224 - let cid_str = std::str::from_utf8(&cid_bytes).ok()?; 190 + if let Some(last_part) = key_str.split(keys::SEP as char).last() { 191 + cursor = Some(smol_str::SmolStr::from(last_part)); 192 + } 193 + break; 194 + } 225 195 226 - if let Ok(Some(block_bytes)) = blocks_ks.get(keys::block_key(cid_str)) { 227 - let val: Data = 228 - serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null); 229 - let cid = Cid::new(cid_str.as_bytes()).unwrap().into_static(); 230 - results.push(RepoRecord { 231 - uri: AtUri::from_parts_owned( 232 - did_str.as_str(), 233 - collection_str.as_str(), 234 - rkey, 235 - ) 236 - .unwrap(), 237 - cid, 238 - value: val.into_static(), 239 - extra_data: Default::default(), 240 - }); 241 - } 196 + // key: {TrimmedDid}|{RKey} 197 + let key_str = String::from_utf8_lossy(&key); 198 + let parts: Vec<&str> = key_str.split(keys::SEP as char).collect(); 199 + if parts.len() == 2 { 200 + let rkey = parts[1]; 201 + // look up using binary cid bytes from the record 202 + if let Ok(Some(block_bytes)) = blocks_ks.get(&cid_bytes) { 203 + let val: Data = 204 + serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null); 205 + let cid = 206 + Cid::Str(Cid::new(&cid_bytes).into_diagnostic()?.to_cowstr()).into_static(); 207 + results.push(RepoRecord { 208 + uri: AtUri::from_parts_owned( 209 + did_str.as_str(), 210 + collection_str.as_str(), 211 + rkey, 212 + ) 213 + .into_diagnostic()?, 214 + cid, 215 + value: val.into_static(), 216 + extra_data: Default::default(), 217 + }); 242 218 } 243 219 } 244 220 } 245 - Some((results, cursor)) 221 + Result::<_, miette::Report>::Ok((results, cursor)) 246 222 }) 247 223 .await 248 224 .map_err(|e| internal_error(ListRecords::NSID, e))? 249 - .ok_or_else(|| internal_error(ListRecords::NSID, "not found"))?; 225 + .map_err(|e| internal_error(ListRecords::NSID, e))?; 250 226 251 227 Ok(Json(ListRecordsOutput { 252 228 records: results,
+36 -32
src/backfill/mod.rs
··· 11 11 use jacquard_common::xrpc::XrpcError; 12 12 use jacquard_repo::mst::Mst; 13 13 use jacquard_repo::{BlockStore, MemoryBlockStore}; 14 - use miette::{Context, IntoDiagnostic, Result}; 14 + use miette::{IntoDiagnostic, Result}; 15 15 use smol_str::{SmolStr, ToSmolStr}; 16 16 use std::collections::HashMap; 17 17 use std::sync::Arc; ··· 369 369 let mut batch = app_state.db.inner.batch(); 370 370 let store = mst.storage(); 371 371 372 - // pre-load existing record CIDs for this DID to detect duplicates/updates 373 372 let prefix = keys::record_prefix(&did); 374 - let prefix_len = prefix.len(); 375 373 let mut existing_cids: HashMap<(SmolStr, SmolStr), SmolStr> = HashMap::new(); 376 - for guard in app_state.db.records.prefix(&prefix) { 377 - let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 378 - // extract path (collection/rkey) from key by skipping the DID prefix 379 - let mut path_split = key[prefix_len..].split(|b| *b == keys::SEP); 380 - let collection = 381 - std::str::from_utf8(path_split.next().wrap_err("collection not found")?) 382 - .into_diagnostic()? 383 - .to_smolstr(); 384 - let rkey = std::str::from_utf8(path_split.next().wrap_err("rkey not found")?) 385 - .into_diagnostic()? 386 - .to_smolstr(); 387 - let cid = std::str::from_utf8(&cid_bytes) 388 - .into_diagnostic()? 389 - .to_smolstr(); 390 - existing_cids.insert((collection, rkey), cid); 374 + 375 + let mut partitions = Vec::new(); 376 + app_state.db.record_partitions.iter_sync(|col, ks| { 377 + partitions.push((col.clone(), ks.clone())); 378 + true 379 + }); 380 + 381 + for (col_name, ks) in partitions { 382 + for guard in ks.prefix(&prefix) { 383 + let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 384 + // key: {DID}|{RKey} 385 + let key_str = std::str::from_utf8(&key).into_diagnostic()?; 386 + let parts: Vec<&str> = key_str.split(keys::SEP as char).collect(); 387 + if parts.len() == 2 { 388 + let rkey = parts[1].to_smolstr(); 389 + let cid = if let Ok(c) = cid::Cid::read_bytes(cid_bytes.as_ref()) { 390 + c.to_string().to_smolstr() 391 + } else { 392 + continue; 393 + }; 394 + 395 + existing_cids.insert((col_name.as_str().into(), rkey), cid); 396 + } 397 + } 391 398 } 392 399 393 400 for (key, cid) in leaves { ··· 398 405 if let Some(val) = val_bytes { 399 406 let (collection, rkey) = ops::parse_path(&key)?; 400 407 let path = (collection.to_smolstr(), rkey.to_smolstr()); 401 - let cid = Cid::ipld(cid); 408 + let cid_obj = Cid::ipld(cid); 409 + let partition = app_state.db.record_partition(collection)?; 402 410 403 411 // check if this record already exists with same CID 404 412 let (action, is_new) = 405 413 if let Some(existing_cid) = existing_cids.remove(&path) { 406 - if existing_cid == cid.as_str() { 414 + if existing_cid == cid_obj.as_str() { 407 415 debug!("skip {did}/{collection}/{rkey} ({cid})"); 408 416 continue; // skip unchanged record 409 417 } ··· 413 421 }; 414 422 debug!("{action} {did}/{collection}/{rkey} ({cid})"); 415 423 416 - let db_key = keys::record_key(&did, &collection, &rkey); 424 + // Key is just did|rkey 425 + let db_key = keys::record_key(&did, &rkey); 417 426 418 - batch.insert( 419 - &app_state.db.blocks, 420 - keys::block_key(cid.as_str()), 421 - val.as_ref(), 422 - ); 423 - batch.insert(&app_state.db.records, db_key, cid.as_str().as_bytes()); 427 + batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 428 + batch.insert(&partition, db_key, cid.to_bytes()); 424 429 425 430 added_blocks += 1; 426 431 if is_new { ··· 436 441 collection: CowStr::Borrowed(collection), 437 442 rkey: DbRkey::new(rkey), 438 443 action: DbAction::from(action), 439 - cid: Some(cid.to_ipld().expect("valid cid")), 444 + cid: Some(cid_obj.to_ipld().expect("valid cid")), 440 445 }; 441 446 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 442 447 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); ··· 448 453 // remove any remaining existing records (they weren't in the new MST) 449 454 for ((collection, rkey), cid) in existing_cids { 450 455 debug!("remove {did}/{collection}/{rkey} ({cid})"); 451 - batch.remove( 452 - &app_state.db.records, 453 - keys::record_key(&did, &collection, &rkey), 454 - ); 456 + let partition = app_state.db.record_partition(collection.as_str())?; 457 + 458 + batch.remove(&partition, keys::record_key(&did, &rkey)); 455 459 456 460 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 457 461 let evt = StoredEvent {
+4 -11
src/db/keys.rs
··· 14 14 vec 15 15 } 16 16 17 - // key format: {DID}\x00{Collection}\x00{RKey} (DID trimmed) 18 - pub fn record_key(did: &Did, collection: &str, rkey: &str) -> Vec<u8> { 17 + // key format: {DID}\x00{RKey} (DID trimmed) 18 + pub fn record_key(did: &Did, rkey: &str) -> Vec<u8> { 19 19 let repo = TrimmedDid::from(did); 20 - let mut key = Vec::with_capacity(repo.len() + collection.len() + rkey.len() + 2); 20 + let mut key = Vec::with_capacity(repo.len() + rkey.len() + 1); 21 21 repo.write_to_vec(&mut key); 22 - key.push(SEP); 23 - key.extend_from_slice(collection.as_bytes()); 24 22 key.push(SEP); 25 23 key.extend_from_slice(rkey.as_bytes()); 26 24 key 27 25 } 28 26 29 - // prefix format: {DID}\x00 (DID trimmed) - for scanning all records of a DID 27 + // prefix format: {DID}\x00 (DID trimmed) - for scanning all records of a DID within a collection 30 28 pub fn record_prefix(did: &Did) -> Vec<u8> { 31 29 let repo = TrimmedDid::from(did); 32 30 let mut prefix = Vec::with_capacity(repo.len() + 1); ··· 40 38 // key format: {SEQ} 41 39 pub fn event_key(seq: u64) -> [u8; 8] { 42 40 seq.to_be_bytes() 43 - } 44 - 45 - // key format: {CID} 46 - pub fn block_key(cid: &str) -> &[u8] { 47 - cid.as_bytes() 48 41 } 49 42 50 43 pub const COUNT_KS_PREFIX: &[u8] = &[b'k', SEP];
+42 -5
src/db/mod.rs
··· 4 4 use jacquard_common::types::string::Did; 5 5 use miette::{Context, IntoDiagnostic, Result}; 6 6 use scc::HashMap; 7 - use smol_str::SmolStr; 7 + use smol_str::{SmolStr, format_smolstr}; 8 8 use std::path::Path; 9 9 use std::sync::Arc; 10 10 ··· 14 14 use std::sync::atomic::AtomicU64; 15 15 use tokio::sync::broadcast; 16 16 use tracing::error; 17 + 18 + pub const RECORDS_PARTITION_PREFIX: &str = "r:"; 19 + 20 + fn default_opts() -> KeyspaceCreateOptions { 21 + KeyspaceCreateOptions::default() 22 + } 23 + 24 + fn record_partition_opts() -> KeyspaceCreateOptions { 25 + default_opts().max_memtable_size(32 * 2_u64.pow(20)) 26 + } 17 27 18 28 pub struct Db { 19 29 pub inner: Arc<Database>, 20 30 pub repos: Keyspace, 21 - pub records: Keyspace, 31 + pub record_partitions: HashMap<String, Keyspace>, 22 32 pub blocks: Keyspace, 23 33 pub cursors: Keyspace, 24 34 pub pending: Keyspace, ··· 48 58 .into_diagnostic()?; 49 59 let db = Arc::new(db); 50 60 51 - let opts = KeyspaceCreateOptions::default; 61 + let opts = default_opts; 52 62 let open_ks = |name: &str, opts: KeyspaceCreateOptions| { 53 63 db.keyspace(name, move || opts).into_diagnostic() 54 64 }; 55 65 56 66 let repos = open_ks("repos", opts().expect_point_read_hits(true))?; 57 - let records = open_ks("records", opts().max_memtable_size(32 * 2_u64.pow(20)))?; 58 67 let blocks = open_ks( 59 68 "blocks", 60 69 opts() ··· 68 77 let events = open_ks("events", opts())?; 69 78 let counts = open_ks("counts", opts().expect_point_read_hits(true))?; 70 79 80 + let record_partitions = HashMap::new(); 81 + { 82 + let names = db.list_keyspace_names(); 83 + for name in names { 84 + let name_str: &str = name.as_ref(); 85 + if let Some(collection) = name_str.strip_prefix(RECORDS_PARTITION_PREFIX) { 86 + let popts = record_partition_opts(); 87 + let ks = db.keyspace(name_str, move || popts).into_diagnostic()?; 88 + let _ = record_partitions.insert_sync(collection.to_string(), ks); 89 + } 90 + } 91 + } 92 + 71 93 let mut last_id = 0; 72 94 if let Some(guard) = events.iter().next_back() { 73 95 let k = guard.key().into_diagnostic()?; ··· 97 119 Ok(Self { 98 120 inner: db, 99 121 repos, 100 - records, 122 + record_partitions, 101 123 blocks, 102 124 cursors, 103 125 pending, ··· 108 130 counts_map, 109 131 next_event_id: Arc::new(AtomicU64::new(last_id + 1)), 110 132 }) 133 + } 134 + 135 + pub fn record_partition(&self, collection: &str) -> Result<Keyspace> { 136 + use scc::hash_map::Entry; 137 + match self.record_partitions.entry_sync(collection.to_string()) { 138 + Entry::Occupied(o) => Ok(o.get().clone()), 139 + Entry::Vacant(v) => { 140 + let name = format_smolstr!("{}{}", RECORDS_PARTITION_PREFIX, collection); 141 + let ks = self 142 + .inner 143 + .keyspace(&name, record_partition_opts) 144 + .into_diagnostic()?; 145 + Ok(v.insert_entry(ks).get().clone()) 146 + } 147 + } 111 148 } 112 149 113 150 pub fn persist(&self) -> Result<()> {
+27 -17
src/ops.rs
··· 8 8 use fjall::OwnedWriteBatch; 9 9 use jacquard::CowStr; 10 10 use jacquard::IntoStatic; 11 - use jacquard::cowstr::ToCowStr; 11 + 12 12 use jacquard::types::cid::Cid; 13 13 use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 14 14 use jacquard_common::types::crypto::PublicKey; ··· 67 67 batch.remove(&db.pending, &repo_key); 68 68 batch.remove(&db.resync, &repo_key); 69 69 70 - // 2. delete from records (prefix: repo_key + SEP) 71 - let mut records_prefix = repo_key.clone(); 72 - records_prefix.push(keys::SEP); 73 - for guard in db.records.prefix(&records_prefix) { 74 - let k = guard.key().into_diagnostic()?; 75 - batch.remove(&db.records, k); 70 + // 2. delete from records (all partitions) 71 + let mut partitions = Vec::new(); 72 + db.record_partitions.iter_sync(|_, v| { 73 + partitions.push(v.clone()); 74 + true 75 + }); 76 + 77 + let records_prefix = keys::record_prefix(did); 78 + for ks in partitions { 79 + for guard in ks.prefix(&records_prefix) { 80 + let k = guard.key().into_diagnostic()?; 81 + batch.remove(&ks, k); 82 + } 76 83 } 77 84 78 85 // 3. reset collection counts ··· 218 225 219 226 // store all blocks in the CAS 220 227 for (cid, bytes) in &parsed.blocks { 221 - batch.insert( 222 - &db.blocks, 223 - keys::block_key(&cid.to_cowstr()), 224 - bytes.to_vec(), 225 - ); 228 + batch.insert(&db.blocks, cid.to_bytes(), bytes.to_vec()); 226 229 } 227 230 228 231 // 2. iterate ops and update records index ··· 231 234 232 235 for op in &commit.ops { 233 236 let (collection, rkey) = parse_path(&op.path)?; 234 - let db_key = keys::record_key(did, collection, rkey); 237 + let partition = db.record_partition(collection)?; 238 + let db_key = keys::record_key(did, rkey); // removed collection arg 235 239 236 240 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 237 241 ··· 240 244 let Some(cid) = &op.cid else { 241 245 continue; 242 246 }; 243 - let s = smol_str::SmolStr::from(cid.as_str()); 244 - batch.insert(&db.records, db_key, s.as_bytes().to_vec()); 247 + batch.insert( 248 + &partition, 249 + db_key.clone(), 250 + cid.to_ipld() 251 + .into_diagnostic() 252 + .wrap_err("expected valid cid from relay")? 253 + .to_bytes(), 254 + ); 245 255 246 256 // accumulate counts 247 257 if op.action.as_str() == "create" { ··· 250 260 } 251 261 } 252 262 "delete" => { 253 - batch.remove(&db.records, db_key); 263 + batch.remove(&partition, db_key); 254 264 255 265 // accumulate counts 256 266 records_delta -= 1; ··· 268 278 collection: CowStr::Borrowed(collection), 269 279 rkey: DbRkey::new(rkey), 270 280 action: DbAction::from(op.action.as_str()), 271 - cid: op.cid.as_ref().map(|c| c.0.to_ipld().expect("valid cid")), 281 + cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")), 272 282 }; 273 283 274 284 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;