at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[db] explicit record key encoding

ptr.pet a366d89f d8426be3

verified
+46 -28
+4 -16
src/api/xrpc.rs
··· 180 180 if !key.starts_with(prefix.as_slice()) { 181 181 break; 182 182 } 183 + 184 + let rkey = keys::parse_rkey(&key[prefix.len()..])?; 183 185 if results.len() >= limit { 184 - let key_str = String::from_utf8_lossy(&key); 185 - if let Some(last_part) = key_str.split(keys::SEP as char).last() { 186 - cursor = Some(smol_str::SmolStr::from(last_part)); 187 - } 186 + cursor = Some(rkey); 188 187 break; 189 188 } 190 - 191 - // manual deserialization of the key suffix since it is raw bytes, not msgpack 192 - let suffix = &key[prefix.len()..]; 193 - let rkey = if suffix.len() == 8 { 194 - let mut bytes = [0u8; 8]; 195 - bytes.copy_from_slice(suffix); 196 - DbRkey::Tid(crate::db::types::DbTid::new_from_bytes(bytes)) 197 - } else { 198 - let s = String::from_utf8_lossy(suffix); 199 - DbRkey::Str(smol_str::SmolStr::from(s.as_ref())) 200 - }; 201 189 202 190 // look up using binary cid bytes from the record 203 191 if let Ok(Some(block_bytes)) = blocks_ks.get(&cid_bytes) { ··· 225 213 226 214 Ok(Json(ListRecordsOutput { 227 215 records: results, 228 - cursor: cursor.map(|c| c.into()), 216 + cursor: cursor.map(|c| jacquard::CowStr::Owned(c.to_smolstr())), 229 217 extra_data: Default::default(), 230 218 })) 231 219 }
+5 -8
src/backfill/mod.rs
··· 506 506 for (col_name, ks) in partitions { 507 507 for guard in ks.prefix(&prefix) { 508 508 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 509 - let rkey: DbRkey = 510 - rmp_serde::from_slice(&key[prefix.len()..]).into_diagnostic()?; 511 - let cid = if let Ok(c) = cid::Cid::read_bytes(cid_bytes.as_ref()) { 512 - c.to_string().to_smolstr() 513 - } else { 514 - error!("invalid cid for {did}: {cid_bytes:?}"); 515 - continue; 516 - }; 509 + let rkey = keys::parse_rkey(&key[prefix.len()..]) 510 + .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 511 + let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 512 + .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 513 + .to_smolstr(); 517 514 518 515 existing_cids.insert((col_name.as_str().into(), rkey), cid); 519 516 }
+37 -4
src/db/keys.rs
··· 1 1 use jacquard_common::types::string::Did; 2 + use smol_str::SmolStr; 2 3 3 4 use crate::db::types::{DbRkey, DbTid, TrimmedDid}; 4 5 ··· 7 8 8 9 pub const CURSOR_KEY: &[u8] = b"firehose_cursor"; 9 10 10 - // Key format: {DID} (trimmed) 11 + // Key format: {DID} 11 12 pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> { 12 13 let mut vec = Vec::with_capacity(32); 13 14 TrimmedDid::from(did).write_to_vec(&mut vec); 14 15 vec 15 16 } 16 17 17 - // prefix format: {DID}\x00 (DID trimmed) - for scanning all records of a DID within a collection 18 + // prefix format: {DID}\x00 18 19 pub fn record_prefix(did: &Did) -> Vec<u8> { 19 20 let repo = TrimmedDid::from(did); 20 21 let mut prefix = Vec::with_capacity(repo.len() + 1); ··· 23 24 prefix 24 25 } 25 26 26 - // key format: {DID}\x00{rkey} (DID trimmed) 27 + // key format: {DID}\x00{rkey} 27 28 pub fn record_key(did: &Did, rkey: &DbRkey) -> Vec<u8> { 28 29 let repo = TrimmedDid::from(did); 29 30 let mut key = Vec::with_capacity(repo.len() + rkey.len() + 1); 30 31 repo.write_to_vec(&mut key); 31 32 key.push(SEP); 32 - key.extend_from_slice(rkey.as_bytes()); 33 + write_rkey(&mut key, rkey); 33 34 key 35 + } 36 + 37 + pub fn write_rkey(buf: &mut Vec<u8>, rkey: &DbRkey) { 38 + match rkey { 39 + DbRkey::Tid(tid) => { 40 + buf.push(b't'); 41 + buf.extend_from_slice(tid.as_bytes()); 42 + } 43 + DbRkey::Str(s) => { 44 + buf.push(b's'); 45 + buf.extend_from_slice(s.as_bytes()); 46 + } 47 + } 48 + } 49 + 50 + pub fn parse_rkey(raw: &[u8]) -> miette::Result<DbRkey> { 51 + let Some(kind) = raw.first() else { 52 + miette::bail!("record key is empty"); 53 + }; 54 + let rkey = match kind { 55 + b't' => { 56 + DbRkey::Tid(DbTid::new_from_bytes(raw[1..].try_into().map_err(|e| { 57 + miette::miette!("record key '{raw:?}' is invalid: {e}") 58 + })?)) 59 + } 60 + b's' => DbRkey::Str(SmolStr::new( 61 + std::str::from_utf8(&raw[1..]) 62 + .map_err(|e| miette::miette!("record key '{raw:?}' is invalid: {e}"))?, 63 + )), 64 + _ => miette::bail!("invalid record key kind: {}", *kind as char), 65 + }; 66 + Ok(rkey) 34 67 } 35 68 36 69 // key format: {SEQ}