forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use jacquard_common::types::string::Did;
2use smol_str::SmolStr;
3
4use crate::db::types::{DbRkey, DbTid, TrimmedDid};
5
6/// separator used for composite keys
7pub const SEP: u8 = b'|';
8
9pub const CURSOR_KEY: &[u8] = b"firehose_cursor";
10
11// Key format: {DID}
12pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> {
13 let mut vec = Vec::with_capacity(32);
14 TrimmedDid::from(did).write_to_vec(&mut vec);
15 vec
16}
17
18pub fn pending_key(id: u64) -> [u8; 8] {
19 id.to_be_bytes()
20}
21
22// prefix format: {DID}| (DID trimmed)
23pub fn record_prefix_did(did: &Did) -> Vec<u8> {
24 let repo = TrimmedDid::from(did);
25 let mut prefix = Vec::with_capacity(repo.len() + 1);
26 repo.write_to_vec(&mut prefix);
27 prefix.push(SEP);
28 prefix
29}
30
31// prefix format: {DID}|{collection}|
32pub fn record_prefix_collection(did: &Did, collection: &str) -> Vec<u8> {
33 let repo = TrimmedDid::from(did);
34 let mut prefix = Vec::with_capacity(repo.len() + 1 + collection.len() + 1);
35 repo.write_to_vec(&mut prefix);
36 prefix.push(SEP);
37 prefix.extend_from_slice(collection.as_bytes());
38 prefix.push(SEP);
39 prefix
40}
41
42// key format: {DID}|{collection}|{rkey}
43pub fn record_key(did: &Did, collection: &str, rkey: &DbRkey) -> Vec<u8> {
44 let repo = TrimmedDid::from(did);
45 let mut key = Vec::with_capacity(repo.len() + 1 + collection.len() + 1 + rkey.len() + 1);
46 repo.write_to_vec(&mut key);
47 key.push(SEP);
48 key.extend_from_slice(collection.as_bytes());
49 key.push(SEP);
50 write_rkey(&mut key, rkey);
51 key
52}
53
54pub fn write_rkey(buf: &mut Vec<u8>, rkey: &DbRkey) {
55 match rkey {
56 DbRkey::Tid(tid) => {
57 buf.push(b't');
58 buf.extend_from_slice(tid.as_bytes());
59 }
60 DbRkey::Str(s) => {
61 buf.push(b's');
62 buf.extend_from_slice(s.as_bytes());
63 }
64 }
65}
66
67pub fn parse_rkey(raw: &[u8]) -> miette::Result<DbRkey> {
68 let Some(kind) = raw.first() else {
69 miette::bail!("record key is empty");
70 };
71 let rkey = match kind {
72 b't' => {
73 DbRkey::Tid(DbTid::new_from_bytes(raw[1..].try_into().map_err(|e| {
74 miette::miette!("record key '{raw:?}' is invalid: {e}")
75 })?))
76 }
77 b's' => DbRkey::Str(SmolStr::new(
78 std::str::from_utf8(&raw[1..])
79 .map_err(|e| miette::miette!("record key '{raw:?}' is invalid: {e}"))?,
80 )),
81 _ => miette::bail!("invalid record key kind: {}", *kind as char),
82 };
83 Ok(rkey)
84}
85
86// key format: {SEQ}
87pub fn event_key(seq: u64) -> [u8; 8] {
88 seq.to_be_bytes()
89}
90
91pub const COUNT_KS_PREFIX: &[u8] = &[b'k', SEP];
92
93// count keys for the counts keyspace
94// key format: k\x00{keyspace_name}
95pub fn count_keyspace_key(name: &str) -> Vec<u8> {
96 let mut key = Vec::with_capacity(COUNT_KS_PREFIX.len() + name.len());
97 key.extend_from_slice(COUNT_KS_PREFIX);
98 key.extend_from_slice(name.as_bytes());
99 key
100}
101
102pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP];
103
104// key format: r|{DID}|{collection} (DID trimmed)
105pub fn count_collection_key(did: &Did, collection: &str) -> Vec<u8> {
106 let repo = TrimmedDid::from(did);
107 let mut key =
108 Vec::with_capacity(COUNT_COLLECTION_PREFIX.len() + repo.len() + 1 + collection.len());
109 key.extend_from_slice(COUNT_COLLECTION_PREFIX);
110 repo.write_to_vec(&mut key);
111 key.push(SEP);
112 key.extend_from_slice(collection.as_bytes());
113 key
114}
115
116// key format: {DID}|{rev}
117pub fn resync_buffer_key(did: &Did, rev: DbTid) -> Vec<u8> {
118 let repo = TrimmedDid::from(did);
119 let mut key = Vec::with_capacity(repo.len() + 1 + 8);
120 repo.write_to_vec(&mut key);
121 key.push(SEP);
122 key.extend_from_slice(&rev.as_bytes());
123 key
124}
125
126// prefix format: {DID}| (DID trimmed)
127pub fn resync_buffer_prefix(did: &Did) -> Vec<u8> {
128 let repo = TrimmedDid::from(did);
129 let mut prefix = Vec::with_capacity(repo.len() + 1);
130 repo.write_to_vec(&mut prefix);
131 prefix.push(SEP);
132 prefix
133}