at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use jacquard_common::types::string::Did;
2use smol_str::SmolStr;
3
4use crate::db::types::{DbRkey, DbTid, TrimmedDid};
5
6/// separator used for composite keys
7pub const SEP: u8 = b'|';
8
9pub const CURSOR_KEY: &[u8] = b"firehose_cursor";
10
11pub const BLOCK_REFS_CHECKPOINT_SEQ_KEY: &[u8] = b"block_refs_checkpoint_seq";
12
13pub const EVENT_WATERMARK_PREFIX: &[u8] = b"ewm|";
14
15// key format: {DID}
16pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> {
17 let mut vec = Vec::with_capacity(32);
18 TrimmedDid::from(did).write_to_vec(&mut vec);
19 vec
20}
21
22pub fn pending_key(id: u64) -> [u8; 8] {
23 id.to_be_bytes()
24}
25
26pub fn reflog_key(seq: u64) -> [u8; 8] {
27 seq.to_be_bytes()
28}
29
30pub fn event_watermark_key(timestamp_secs: u64) -> Vec<u8> {
31 let mut key = Vec::with_capacity(EVENT_WATERMARK_PREFIX.len() + 8);
32 key.extend_from_slice(EVENT_WATERMARK_PREFIX);
33 key.extend_from_slice(×tamp_secs.to_be_bytes());
34 key
35}
36
37// prefix format: {DID}| (DID trimmed)
38pub fn record_prefix_did(did: &Did) -> Vec<u8> {
39 let repo = TrimmedDid::from(did);
40 let mut prefix = Vec::with_capacity(repo.len() + 1);
41 repo.write_to_vec(&mut prefix);
42 prefix.push(SEP);
43 prefix
44}
45
46// prefix format: {DID}|{collection}|
47pub fn record_prefix_collection(did: &Did, collection: &str) -> Vec<u8> {
48 let repo = TrimmedDid::from(did);
49 let mut prefix = Vec::with_capacity(repo.len() + 1 + collection.len() + 1);
50 repo.write_to_vec(&mut prefix);
51 prefix.push(SEP);
52 prefix.extend_from_slice(collection.as_bytes());
53 prefix.push(SEP);
54 prefix
55}
56
57// key format: {DID}|{collection}|{rkey}
58pub fn record_key(did: &Did, collection: &str, rkey: &DbRkey) -> Vec<u8> {
59 let repo = TrimmedDid::from(did);
60 let mut key = Vec::with_capacity(repo.len() + 1 + collection.len() + 1 + rkey.len() + 1);
61 repo.write_to_vec(&mut key);
62 key.push(SEP);
63 key.extend_from_slice(collection.as_bytes());
64 key.push(SEP);
65 write_rkey(&mut key, rkey);
66 key
67}
68
69pub fn write_rkey(buf: &mut Vec<u8>, rkey: &DbRkey) {
70 match rkey {
71 DbRkey::Tid(tid) => {
72 buf.push(b't');
73 buf.extend_from_slice(tid.as_bytes());
74 }
75 DbRkey::Str(s) => {
76 buf.push(b's');
77 buf.extend_from_slice(s.as_bytes());
78 }
79 }
80}
81
82pub fn parse_rkey(raw: &[u8]) -> miette::Result<DbRkey> {
83 let Some(kind) = raw.first() else {
84 miette::bail!("record key is empty");
85 };
86 let rkey = match kind {
87 b't' => {
88 DbRkey::Tid(DbTid::new_from_bytes(raw[1..].try_into().map_err(|e| {
89 miette::miette!("record key '{raw:?}' is invalid: {e}")
90 })?))
91 }
92 b's' => DbRkey::Str(SmolStr::new(
93 std::str::from_utf8(&raw[1..])
94 .map_err(|e| miette::miette!("record key '{raw:?}' is invalid: {e}"))?,
95 )),
96 _ => miette::bail!("invalid record key kind: {}", *kind as char),
97 };
98 Ok(rkey)
99}
100
101// key format: {SEQ}
102pub fn event_key(seq: u64) -> [u8; 8] {
103 seq.to_be_bytes()
104}
105
106pub const COUNT_KS_PREFIX: &[u8] = &[b'k', SEP];
107
108// count keys for the counts keyspace
109// key format: k\x00{keyspace_name}
110pub fn count_keyspace_key(name: &str) -> Vec<u8> {
111 let mut key = Vec::with_capacity(COUNT_KS_PREFIX.len() + name.len());
112 key.extend_from_slice(COUNT_KS_PREFIX);
113 key.extend_from_slice(name.as_bytes());
114 key
115}
116
117pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP];
118
119// key format: r|{DID}|{collection} (DID trimmed)
120pub fn count_collection_key(did: &Did, collection: &str) -> Vec<u8> {
121 let repo = TrimmedDid::from(did);
122 let mut key =
123 Vec::with_capacity(COUNT_COLLECTION_PREFIX.len() + repo.len() + 1 + collection.len());
124 key.extend_from_slice(COUNT_COLLECTION_PREFIX);
125 repo.write_to_vec(&mut key);
126 key.push(SEP);
127 key.extend_from_slice(collection.as_bytes());
128 key
129}
130
131// key format: {DID}|{rev}
132pub fn resync_buffer_key(did: &Did, rev: DbTid) -> Vec<u8> {
133 let repo = TrimmedDid::from(did);
134 let mut key = Vec::with_capacity(repo.len() + 1 + 8);
135 repo.write_to_vec(&mut key);
136 key.push(SEP);
137 key.extend_from_slice(&rev.as_bytes());
138 key
139}
140
141// prefix format: {DID}| (DID trimmed)
142pub fn resync_buffer_prefix(did: &Did) -> Vec<u8> {
143 let repo = TrimmedDid::from(did);
144 let mut prefix = Vec::with_capacity(repo.len() + 1);
145 repo.write_to_vec(&mut prefix);
146 prefix.push(SEP);
147 prefix
148}
149
150/// key format: `ret|<did bytes>`
151pub const CRAWLER_RETRY_PREFIX: &[u8] = b"ret|";
152
153pub fn crawler_retry_key(did: &Did) -> Vec<u8> {
154 let repo = TrimmedDid::from(did);
155 let mut key = Vec::with_capacity(CRAWLER_RETRY_PREFIX.len() + repo.len());
156 key.extend_from_slice(CRAWLER_RETRY_PREFIX);
157 repo.write_to_vec(&mut key);
158 key
159}
160
161pub fn crawler_retry_parse_key(key: &[u8]) -> miette::Result<TrimmedDid<'_>> {
162 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..])
163}