at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 163 lines 4.9 kB view raw
1use jacquard_common::types::string::Did; 2use smol_str::SmolStr; 3 4use crate::db::types::{DbRkey, DbTid, TrimmedDid}; 5 6/// separator used for composite keys 7pub const SEP: u8 = b'|'; 8 9pub const CURSOR_KEY: &[u8] = b"firehose_cursor"; 10 11pub const BLOCK_REFS_CHECKPOINT_SEQ_KEY: &[u8] = b"block_refs_checkpoint_seq"; 12 13pub const EVENT_WATERMARK_PREFIX: &[u8] = b"ewm|"; 14 15// key format: {DID} 16pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> { 17 let mut vec = Vec::with_capacity(32); 18 TrimmedDid::from(did).write_to_vec(&mut vec); 19 vec 20} 21 22pub fn pending_key(id: u64) -> [u8; 8] { 23 id.to_be_bytes() 24} 25 26pub fn reflog_key(seq: u64) -> [u8; 8] { 27 seq.to_be_bytes() 28} 29 30pub fn event_watermark_key(timestamp_secs: u64) -> Vec<u8> { 31 let mut key = Vec::with_capacity(EVENT_WATERMARK_PREFIX.len() + 8); 32 key.extend_from_slice(EVENT_WATERMARK_PREFIX); 33 key.extend_from_slice(&timestamp_secs.to_be_bytes()); 34 key 35} 36 37// prefix format: {DID}| (DID trimmed) 38pub fn record_prefix_did(did: &Did) -> Vec<u8> { 39 let repo = TrimmedDid::from(did); 40 let mut prefix = Vec::with_capacity(repo.len() + 1); 41 repo.write_to_vec(&mut prefix); 42 prefix.push(SEP); 43 prefix 44} 45 46// prefix format: {DID}|{collection}| 47pub fn record_prefix_collection(did: &Did, collection: &str) -> Vec<u8> { 48 let repo = TrimmedDid::from(did); 49 let mut prefix = Vec::with_capacity(repo.len() + 1 + collection.len() + 1); 50 repo.write_to_vec(&mut prefix); 51 prefix.push(SEP); 52 prefix.extend_from_slice(collection.as_bytes()); 53 prefix.push(SEP); 54 prefix 55} 56 57// key format: {DID}|{collection}|{rkey} 58pub fn record_key(did: &Did, collection: &str, rkey: &DbRkey) -> Vec<u8> { 59 let repo = TrimmedDid::from(did); 60 let mut key = Vec::with_capacity(repo.len() + 1 + collection.len() + 1 + rkey.len() + 1); 61 repo.write_to_vec(&mut key); 62 key.push(SEP); 63 key.extend_from_slice(collection.as_bytes()); 64 key.push(SEP); 65 write_rkey(&mut key, rkey); 66 key 67} 68 69pub fn write_rkey(buf: &mut Vec<u8>, rkey: &DbRkey) { 70 match rkey { 71 DbRkey::Tid(tid) => { 72 buf.push(b't'); 73 buf.extend_from_slice(tid.as_bytes()); 74 } 75 DbRkey::Str(s) => { 76 buf.push(b's'); 77 buf.extend_from_slice(s.as_bytes()); 78 } 79 } 80} 81 82pub fn parse_rkey(raw: &[u8]) -> miette::Result<DbRkey> { 83 let Some(kind) = raw.first() else { 84 miette::bail!("record key is empty"); 85 }; 86 let rkey = match kind { 87 b't' => { 88 DbRkey::Tid(DbTid::new_from_bytes(raw[1..].try_into().map_err(|e| { 89 miette::miette!("record key '{raw:?}' is invalid: {e}") 90 })?)) 91 } 92 b's' => DbRkey::Str(SmolStr::new( 93 std::str::from_utf8(&raw[1..]) 94 .map_err(|e| miette::miette!("record key '{raw:?}' is invalid: {e}"))?, 95 )), 96 _ => miette::bail!("invalid record key kind: {}", *kind as char), 97 }; 98 Ok(rkey) 99} 100 101// key format: {SEQ} 102pub fn event_key(seq: u64) -> [u8; 8] { 103 seq.to_be_bytes() 104} 105 106pub const COUNT_KS_PREFIX: &[u8] = &[b'k', SEP]; 107 108// count keys for the counts keyspace 109// key format: k\x00{keyspace_name} 110pub fn count_keyspace_key(name: &str) -> Vec<u8> { 111 let mut key = Vec::with_capacity(COUNT_KS_PREFIX.len() + name.len()); 112 key.extend_from_slice(COUNT_KS_PREFIX); 113 key.extend_from_slice(name.as_bytes()); 114 key 115} 116 117pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP]; 118 119// key format: r|{DID}|{collection} (DID trimmed) 120pub fn count_collection_key(did: &Did, collection: &str) -> Vec<u8> { 121 let repo = TrimmedDid::from(did); 122 let mut key = 123 Vec::with_capacity(COUNT_COLLECTION_PREFIX.len() + repo.len() + 1 + collection.len()); 124 key.extend_from_slice(COUNT_COLLECTION_PREFIX); 125 repo.write_to_vec(&mut key); 126 key.push(SEP); 127 key.extend_from_slice(collection.as_bytes()); 128 key 129} 130 131// key format: {DID}|{rev} 132pub fn resync_buffer_key(did: &Did, rev: DbTid) -> Vec<u8> { 133 let repo = TrimmedDid::from(did); 134 let mut key = Vec::with_capacity(repo.len() + 1 + 8); 135 repo.write_to_vec(&mut key); 136 key.push(SEP); 137 key.extend_from_slice(&rev.as_bytes()); 138 key 139} 140 141// prefix format: {DID}| (DID trimmed) 142pub fn resync_buffer_prefix(did: &Did) -> Vec<u8> { 143 let repo = TrimmedDid::from(did); 144 let mut prefix = Vec::with_capacity(repo.len() + 1); 145 repo.write_to_vec(&mut prefix); 146 prefix.push(SEP); 147 prefix 148} 149 150/// key format: `ret|<did bytes>` 151pub const CRAWLER_RETRY_PREFIX: &[u8] = b"ret|"; 152 153pub fn crawler_retry_key(did: &Did) -> Vec<u8> { 154 let repo = TrimmedDid::from(did); 155 let mut key = Vec::with_capacity(CRAWLER_RETRY_PREFIX.len() + repo.len()); 156 key.extend_from_slice(CRAWLER_RETRY_PREFIX); 157 repo.write_to_vec(&mut key); 158 key 159} 160 161pub fn crawler_retry_parse_key(key: &[u8]) -> miette::Result<TrimmedDid<'_>> { 162 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..]) 163}