tangled
alpha
login
or
join now
ptr.pet
/
hydrant
26
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
26
fork
atom
overview
issues
6
pulls
pipelines
[types,db] make what we store more efficient
ptr.pet
1 month ago
5d31bccd
68107d5a
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+394
-113
15 changed files
expand all
collapse all
unified
split
Cargo.lock
Cargo.toml
src
api
debug.rs
mod.rs
stream.rs
xrpc.rs
backfill
mod.rs
crawler
mod.rs
db
keys.rs
mod.rs
types.rs
ingest
worker.rs
main.rs
ops.rs
types.rs
+1
Cargo.lock
···
1621
1621
"async-stream",
1622
1622
"axum",
1623
1623
"chrono",
1624
1624
+
"data-encoding",
1624
1625
"fjall",
1625
1626
"futures",
1626
1627
"hex",
+1
Cargo.toml
···
40
40
mimalloc = { version = "0.1", features = ["v3"] }
41
41
hex = "0.4"
42
42
scc = "3"
43
43
+
data-encoding = "2.10.0"
+1
-1
src/api/debug.rs
···
46
46
47
47
// {did_prefix}\x00{collection}\x00
48
48
let mut prefix = Vec::new();
49
49
-
prefix.extend_from_slice(TrimmedDid::from(&did).as_bytes());
49
49
+
TrimmedDid::from(&did).write_to_vec(&mut prefix);
50
50
prefix.push(keys::SEP);
51
51
prefix.extend_from_slice(req.collection.as_bytes());
52
52
prefix.push(keys::SEP);
+1
-1
src/api/mod.rs
···
1
1
use crate::state::AppState;
2
2
-
use axum::{routing::get, Router};
2
2
+
use axum::{Router, routing::get};
3
3
use jacquard::xrpc::GenericXrpcError;
4
4
use jacquard_axum::XrpcErrorResponse;
5
5
use std::{net::SocketAddr, sync::Arc};
+8
-9
src/api/stream.rs
···
8
8
},
9
9
response::IntoResponse,
10
10
};
11
11
+
use jacquard::CowStr;
11
12
use jacquard_common::types::value::RawData;
12
13
use miette::{Context, IntoDiagnostic};
13
14
use serde::Deserialize;
···
96
97
97
98
let marshallable = {
98
99
let mut record_val = None;
99
99
-
if let Some(cid_str) = &cid {
100
100
+
if let Some(cid_struct) = &cid {
101
101
+
let cid_str = cid_struct.to_string();
100
102
if let Ok(Some(block_bytes)) =
101
101
-
db.blocks.get(keys::block_key(cid_str))
103
103
+
db.blocks.get(keys::block_key(&cid_str))
102
104
{
103
105
if let Ok(raw_data) =
104
106
serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes)
···
114
116
record: Some(RecordEvt {
115
117
live,
116
118
did: did.to_did(),
117
117
-
rev,
119
119
+
rev: CowStr::Owned(rev.to_tid().into()),
118
120
collection,
119
119
-
rkey,
120
120
-
action,
121
121
+
rkey: CowStr::Owned(rkey.to_smolstr().into()),
122
122
+
action: CowStr::Borrowed(action.as_str()),
121
123
record: record_val,
122
122
-
cid: cid.map(|c| match c {
123
123
-
jacquard::types::cid::Cid::Ipld { s, .. } => s,
124
124
-
jacquard::types::cid::Cid::Str(s) => s,
125
125
-
}),
124
124
+
cid: cid.map(|c| jacquard::types::cid::Cid::ipld(c).into()),
126
125
}),
127
126
identity: None,
128
127
account: None,
+4
-4
src/api/xrpc.rs
···
1
1
use crate::api::{AppState, XrpcResult};
2
2
use crate::db::types::TrimmedDid;
3
3
-
use crate::db::{self, keys, Db};
4
4
-
use axum::{extract::State, http::StatusCode, Json, Router};
3
3
+
use crate::db::{self, Db, keys};
4
4
+
use axum::{Json, Router, extract::State, http::StatusCode};
5
5
use futures::TryFutureExt;
6
6
use jacquard::types::ident::AtIdentifier;
7
7
use jacquard::{
8
8
+
IntoStatic,
8
9
api::com_atproto::repo::{
9
10
get_record::{GetRecordError, GetRecordOutput, GetRecordRequest},
10
11
list_records::{ListRecordsOutput, ListRecordsRequest, Record as RepoRecord},
11
12
},
12
13
xrpc::XrpcRequest,
13
13
-
IntoStatic,
14
14
};
15
15
use jacquard_api::com_atproto::repo::{get_record::GetRecord, list_records::ListRecords};
16
16
use jacquard_axum::{ExtractXrpc, IntoRouter, XrpcErrorResponse};
···
128
128
129
129
let prefix = format!(
130
130
"{}{}{}{}",
131
131
-
TrimmedDid::from(&did).as_str(),
131
131
+
TrimmedDid::from(&did),
132
132
keys::SEP as char,
133
133
req.collection.as_str(),
134
134
keys::SEP as char
+10
-10
src/backfill/mod.rs
···
1
1
-
use crate::db::types::TrimmedDid;
1
1
+
use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
2
2
use crate::db::{self, Db, keys, ser_repo_state};
3
3
use crate::ops;
4
4
use crate::state::{AppState, BackfillRx};
···
432
432
let evt = StoredEvent {
433
433
live: false,
434
434
did: TrimmedDid::from(&did),
435
435
-
rev: CowStr::Borrowed(rev.as_str()),
435
435
+
rev: DbTid::from(rev.clone()),
436
436
collection: CowStr::Borrowed(collection),
437
437
-
rkey: CowStr::Borrowed(rkey),
438
438
-
action: CowStr::Borrowed(action),
439
439
-
cid: Some(cid),
437
437
+
rkey: DbRkey::new(rkey),
438
438
+
action: DbAction::from(action),
439
439
+
cid: Some(cid.to_ipld().expect("valid cid")),
440
440
};
441
441
let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
442
442
batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
···
457
457
let evt = StoredEvent {
458
458
live: false,
459
459
did: TrimmedDid::from(&did),
460
460
-
rev: CowStr::Borrowed(rev.as_str()),
460
460
+
rev: DbTid::from(rev.clone()),
461
461
collection: CowStr::Borrowed(&collection),
462
462
-
rkey: CowStr::Borrowed(&rkey),
463
463
-
action: CowStr::Borrowed("delete"),
462
462
+
rkey: DbRkey::new(&rkey),
463
463
+
action: DbAction::Delete,
464
464
cid: None,
465
465
};
466
466
let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
···
472
472
473
473
// 6. update status to synced
474
474
state.status = RepoStatus::Synced;
475
475
-
state.rev = Some(rev);
476
476
-
state.data = Some(Cid::ipld(root_commit.data));
475
475
+
state.rev = Some(rev.clone().into());
476
476
+
state.data = Some(root_commit.data);
477
477
state.last_updated_at = chrono::Utc::now().timestamp();
478
478
479
479
batch.insert(
+1
-1
src/crawler/mod.rs
···
1
1
-
use crate::db::{keys, ser_repo_state, Db};
1
1
+
use crate::db::{Db, keys, ser_repo_state};
2
2
use crate::ops::send_backfill_req;
3
3
use crate::state::AppState;
4
4
use crate::types::RepoState;
+7
-5
src/db/keys.rs
···
8
8
pub const CURSOR_KEY: &[u8] = b"firehose_cursor";
9
9
10
10
// Key format: {DID} (trimmed)
11
11
-
pub fn repo_key<'a>(did: &'a Did) -> TrimmedDid<'a> {
12
12
-
TrimmedDid::from(did)
11
11
+
pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> {
12
12
+
let mut vec = Vec::with_capacity(32);
13
13
+
TrimmedDid::from(did).write_to_vec(&mut vec);
14
14
+
vec
13
15
}
14
16
15
17
// key format: {DID}\x00{Collection}\x00{RKey} (DID trimmed)
16
18
pub fn record_key(did: &Did, collection: &str, rkey: &str) -> Vec<u8> {
17
19
let repo = TrimmedDid::from(did);
18
20
let mut key = Vec::with_capacity(repo.len() + collection.len() + rkey.len() + 2);
19
19
-
key.extend_from_slice(repo.as_bytes());
21
21
+
repo.write_to_vec(&mut key);
20
22
key.push(SEP);
21
23
key.extend_from_slice(collection.as_bytes());
22
24
key.push(SEP);
···
28
30
pub fn record_prefix(did: &Did) -> Vec<u8> {
29
31
let repo = TrimmedDid::from(did);
30
32
let mut prefix = Vec::with_capacity(repo.len() + 1);
31
31
-
prefix.extend_from_slice(repo.as_bytes());
33
33
+
repo.write_to_vec(&mut prefix);
32
34
prefix.push(SEP);
33
35
prefix
34
36
}
···
64
66
let mut key =
65
67
Vec::with_capacity(COUNT_COLLECTION_PREFIX.len() + repo.len() + 1 + collection.len());
66
68
key.extend_from_slice(COUNT_COLLECTION_PREFIX);
67
67
-
key.extend_from_slice(repo.as_bytes());
69
69
+
repo.write_to_vec(&mut key);
68
70
key.push(SEP);
69
71
key.extend_from_slice(collection.as_bytes());
70
72
key
+1
-1
src/db/mod.rs
···
183
183
let key = keys::repo_key(did);
184
184
if let Some(bytes) = repos.get(&key).into_diagnostic()? {
185
185
let mut state: RepoState = deser_repo_state(bytes.as_ref())?.into_static();
186
186
-
let (changed, result) = f(&mut state, (key.as_bytes(), batch))?;
186
186
+
let (changed, result) = f(&mut state, (key.as_slice(), batch))?;
187
187
if changed {
188
188
batch.insert(repos, key, ser_repo_state(&state)?);
189
189
}
+329
-49
src/db/types.rs
···
1
1
+
use data_encoding::BASE32_NOPAD;
1
2
use fjall::UserKey;
2
3
use jacquard::{CowStr, IntoStatic};
3
4
use jacquard_common::types::string::Did;
4
4
-
use miette::{Context, IntoDiagnostic, Result};
5
5
+
use jacquard_common::types::tid::Tid;
5
6
use serde::{Deserialize, Deserializer, Serialize, Serializer};
6
6
-
use smol_str::format_smolstr;
7
7
+
use smol_str::{SmolStr, SmolStrBuilder, format_smolstr};
8
8
+
use std::fmt::Display;
7
9
8
8
-
#[derive(Clone, Debug)]
9
9
-
pub struct TrimmedDid<'s>(CowStr<'s>);
10
10
+
const S32_CHAR: &str = "234567abcdefghijklmnopqrstuvwxyz";
10
11
11
11
-
impl<'s> TrimmedDid<'s> {
12
12
-
pub fn as_str(&self) -> &str {
13
13
-
self.0.as_ref()
14
14
-
}
12
12
+
#[derive(Clone, Debug, PartialEq, Eq)]
13
13
+
pub enum TrimmedDid<'s> {
14
14
+
Plc([u8; 15]),
15
15
+
Web(CowStr<'s>),
16
16
+
Other(CowStr<'s>),
17
17
+
}
15
18
16
16
-
pub fn as_bytes(&self) -> &[u8] {
17
17
-
self.0.as_bytes()
18
18
-
}
19
19
+
const TAG_PLC: u8 = 1;
20
20
+
const TAG_WEB: u8 = 2;
19
21
22
22
+
impl<'s> TrimmedDid<'s> {
20
23
pub fn len(&self) -> usize {
21
21
-
self.0.len()
24
24
+
match self {
25
25
+
TrimmedDid::Plc(_) => 16,
26
26
+
TrimmedDid::Web(s) => 1 + s.len(),
27
27
+
TrimmedDid::Other(s) => s.len(),
28
28
+
}
22
29
}
23
30
24
31
pub fn into_static(self) -> TrimmedDid<'static> {
25
25
-
TrimmedDid(self.0.into_static())
32
32
+
match self {
33
33
+
TrimmedDid::Plc(bytes) => TrimmedDid::Plc(bytes),
34
34
+
TrimmedDid::Web(s) => TrimmedDid::Web(s.into_static()),
35
35
+
TrimmedDid::Other(s) => TrimmedDid::Other(s.into_static()),
36
36
+
}
26
37
}
27
38
28
39
pub fn to_did(&self) -> Did<'static> {
29
29
-
Did::new_owned(format_smolstr!("did:{}", self.0)).expect("expected valid trimmed did")
40
40
+
match self {
41
41
+
TrimmedDid::Plc(_) => {
42
42
+
let s = self.to_string();
43
43
+
Did::new_owned(format_smolstr!("did:{}", s)).expect("valid did from plc")
44
44
+
}
45
45
+
TrimmedDid::Web(s) => {
46
46
+
Did::new_owned(format_smolstr!("did:web:{}", s)).expect("valid did from web")
47
47
+
}
48
48
+
TrimmedDid::Other(s) => {
49
49
+
Did::new_owned(format_smolstr!("did:{}", s)).expect("valid did from other")
50
50
+
}
51
51
+
}
30
52
}
31
31
-
}
32
53
33
33
-
impl<'a> TryFrom<&'a [u8]> for TrimmedDid<'a> {
34
34
-
type Error = miette::Report;
54
54
+
pub fn write_to_vec(&self, buf: &mut Vec<u8>) {
55
55
+
match self {
56
56
+
TrimmedDid::Plc(bytes) => {
57
57
+
buf.push(TAG_PLC);
58
58
+
buf.extend_from_slice(bytes);
59
59
+
}
60
60
+
TrimmedDid::Web(s) => {
61
61
+
buf.push(TAG_WEB);
62
62
+
buf.extend_from_slice(s.as_bytes());
63
63
+
}
64
64
+
TrimmedDid::Other(s) => buf.extend_from_slice(s.as_bytes()),
65
65
+
}
66
66
+
}
35
67
36
36
-
fn try_from(value: &'a [u8]) -> Result<Self> {
37
37
-
let s = std::str::from_utf8(value)
38
38
-
.into_diagnostic()
39
39
-
.wrap_err("expected trimmed did to be valid utf-8")?;
68
68
+
pub fn to_string(&self) -> String {
69
69
+
match self {
70
70
+
TrimmedDid::Plc(bytes) => {
71
71
+
let mut s = String::with_capacity(28);
72
72
+
s.push_str("plc:");
73
73
+
s.push_str(&BASE32_NOPAD.encode(bytes).to_ascii_lowercase());
74
74
+
s
75
75
+
}
76
76
+
TrimmedDid::Web(s) => {
77
77
+
format!("web:{}", s)
78
78
+
}
79
79
+
TrimmedDid::Other(s) => s.to_string(),
80
80
+
}
81
81
+
}
82
82
+
}
40
83
41
41
-
// validate using Did::new with stack-allocated buffer
42
42
-
const PREFIX: &[u8] = b"did:";
43
43
-
const MAX_DID_LEN: usize = 2048;
44
44
-
let full_len = PREFIX.len() + value.len();
45
45
-
if full_len > MAX_DID_LEN {
46
46
-
miette::bail!("trimmed did too long");
84
84
+
impl Display for TrimmedDid<'_> {
85
85
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86
86
+
match self {
87
87
+
TrimmedDid::Plc(bytes) => {
88
88
+
f.write_str("plc:")?;
89
89
+
f.write_str(&BASE32_NOPAD.encode(bytes).to_ascii_lowercase())
90
90
+
}
91
91
+
TrimmedDid::Web(s) => {
92
92
+
f.write_str("web:")?;
93
93
+
f.write_str(s)
94
94
+
}
95
95
+
TrimmedDid::Other(s) => f.write_str(s),
47
96
}
48
48
-
let mut buf = [0u8; MAX_DID_LEN];
49
49
-
buf[..PREFIX.len()].copy_from_slice(PREFIX);
50
50
-
buf[PREFIX.len()..full_len].copy_from_slice(value);
51
51
-
let full_did = std::str::from_utf8(&buf[..full_len]).expect("already validated utf-8");
52
52
-
Did::new(full_did)
53
53
-
.into_diagnostic()
54
54
-
.wrap_err("expected trimmed did to be valid did")?;
97
97
+
}
98
98
+
}
55
99
56
56
-
Ok(TrimmedDid(CowStr::Borrowed(s)))
100
100
+
impl<'a> From<&'a Did<'a>> for TrimmedDid<'a> {
101
101
+
fn from(did: &'a Did<'a>) -> Self {
102
102
+
let s = did.as_str();
103
103
+
if let Some(rest) = s.strip_prefix("did:plc:") {
104
104
+
if rest.len() == 24 {
105
105
+
// decode
106
106
+
if let Ok(bytes) = BASE32_NOPAD.decode(rest.to_ascii_uppercase().as_bytes()) {
107
107
+
if bytes.len() == 15 {
108
108
+
return TrimmedDid::Plc(bytes.try_into().unwrap());
109
109
+
}
110
110
+
}
111
111
+
}
112
112
+
} else if let Some(rest) = s.strip_prefix("did:web:") {
113
113
+
return TrimmedDid::Web(CowStr::Borrowed(rest));
114
114
+
}
115
115
+
TrimmedDid::Other(CowStr::Borrowed(s.trim_start_matches("did:")))
57
116
}
58
117
}
59
118
60
60
-
impl<'a> AsRef<[u8]> for TrimmedDid<'a> {
61
61
-
fn as_ref(&self) -> &[u8] {
62
62
-
self.as_bytes()
119
119
+
impl<'a> TryFrom<&'a [u8]> for TrimmedDid<'a> {
120
120
+
type Error = miette::Report;
121
121
+
122
122
+
fn try_from(value: &'a [u8]) -> miette::Result<Self> {
123
123
+
if value.is_empty() {
124
124
+
miette::bail!("empty did key");
125
125
+
}
126
126
+
match value[0] {
127
127
+
TAG_PLC => {
128
128
+
if value.len() == 16 {
129
129
+
let mut arr = [0u8; 15];
130
130
+
arr.copy_from_slice(&value[1..]);
131
131
+
return Ok(TrimmedDid::Plc(arr));
132
132
+
}
133
133
+
miette::bail!("invalid length for tagged plc did");
134
134
+
}
135
135
+
TAG_WEB => {
136
136
+
if let Ok(s) = std::str::from_utf8(&value[1..]) {
137
137
+
return Ok(TrimmedDid::Web(CowStr::Borrowed(s)));
138
138
+
}
139
139
+
miette::bail!("invalid utf8 for tagged web did");
140
140
+
}
141
141
+
_ => {
142
142
+
if let Ok(s) = std::str::from_utf8(value) {
143
143
+
return Ok(TrimmedDid::Other(CowStr::Borrowed(s)));
144
144
+
}
145
145
+
miette::bail!("invalid utf8 for other did");
146
146
+
}
147
147
+
}
63
148
}
64
149
}
65
150
66
151
impl<'a> Into<UserKey> for TrimmedDid<'a> {
67
152
fn into(self) -> UserKey {
68
68
-
UserKey::new(self.as_bytes())
153
153
+
let mut vec = Vec::with_capacity(32);
154
154
+
self.write_to_vec(&mut vec);
155
155
+
UserKey::new(&vec)
69
156
}
70
157
}
71
158
72
159
impl<'a> Into<UserKey> for &TrimmedDid<'a> {
73
160
fn into(self) -> UserKey {
74
74
-
UserKey::new(self.as_bytes())
75
75
-
}
76
76
-
}
77
77
-
78
78
-
impl<'a> From<&'a Did<'a>> for TrimmedDid<'a> {
79
79
-
fn from(did: &'a Did<'a>) -> Self {
80
80
-
TrimmedDid(CowStr::Borrowed(did.as_str().trim_start_matches("did:")))
161
161
+
let mut vec = Vec::with_capacity(32);
162
162
+
self.write_to_vec(&mut vec);
163
163
+
UserKey::new(&vec)
81
164
}
82
165
}
83
166
84
167
impl Serialize for TrimmedDid<'_> {
85
168
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
86
86
-
self.0.serialize(serializer)
169
169
+
let mut vec = Vec::with_capacity(32);
170
170
+
self.write_to_vec(&mut vec);
171
171
+
serializer.serialize_bytes(&vec)
87
172
}
88
173
}
89
174
90
175
impl<'de> Deserialize<'de> for TrimmedDid<'de> {
91
176
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
92
92
-
let s = <&'de str>::deserialize(deserializer)?;
93
93
-
Ok(TrimmedDid(CowStr::Borrowed(s)))
177
177
+
struct DidVisitor;
178
178
+
impl<'de> serde::de::Visitor<'de> for DidVisitor {
179
179
+
type Value = TrimmedDid<'de>;
180
180
+
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
181
181
+
formatter.write_str("bytes (tagged) or string (legacy)")
182
182
+
}
183
183
+
184
184
+
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
185
185
+
where
186
186
+
E: serde::de::Error,
187
187
+
{
188
188
+
TrimmedDid::try_from(v)
189
189
+
.map(|td| td.into_static())
190
190
+
.map_err(E::custom)
191
191
+
}
192
192
+
193
193
+
fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
194
194
+
where
195
195
+
E: serde::de::Error,
196
196
+
{
197
197
+
TrimmedDid::try_from(v).map_err(E::custom)
198
198
+
}
199
199
+
}
200
200
+
deserializer.deserialize_any(DidVisitor)
201
201
+
}
202
202
+
}
203
203
+
204
204
+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
205
205
+
#[repr(transparent)]
206
206
+
pub struct DbTid(u64);
207
207
+
208
208
+
impl DbTid {
209
209
+
pub fn new(tid: u64) -> Self {
210
210
+
Self(tid)
211
211
+
}
212
212
+
213
213
+
pub fn as_u64(&self) -> u64 {
214
214
+
self.0
215
215
+
}
216
216
+
217
217
+
pub fn to_tid(&self) -> Tid {
218
218
+
Tid::raw(self.to_smolstr())
219
219
+
}
220
220
+
221
221
+
fn to_smolstr(&self) -> SmolStr {
222
222
+
let mut i = self.0;
223
223
+
let mut s = SmolStrBuilder::new();
224
224
+
for _ in 0..13 {
225
225
+
let c = i & 0x1F;
226
226
+
s.push(S32_CHAR.chars().nth(c as usize).unwrap());
227
227
+
i >>= 5;
228
228
+
}
229
229
+
230
230
+
let mut builder = SmolStrBuilder::new();
231
231
+
for c in s.finish().chars().rev() {
232
232
+
builder.push(c);
233
233
+
}
234
234
+
builder.finish()
235
235
+
}
236
236
+
}
237
237
+
238
238
+
impl From<&Tid> for DbTid {
239
239
+
fn from(tid: &Tid) -> Self {
240
240
+
DbTid(jacquard_common::types::tid::s32decode(tid.to_string()))
241
241
+
}
242
242
+
}
243
243
+
244
244
+
impl From<Tid> for DbTid {
245
245
+
fn from(tid: Tid) -> Self {
246
246
+
DbTid::from(&tid)
247
247
+
}
248
248
+
}
249
249
+
250
250
+
impl From<DbTid> for Tid {
251
251
+
fn from(val: DbTid) -> Self {
252
252
+
val.to_tid()
253
253
+
}
254
254
+
}
255
255
+
256
256
+
impl Display for DbTid {
257
257
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258
258
+
write!(f, "{}", self.to_smolstr())
259
259
+
}
260
260
+
}
261
261
+
262
262
+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
263
263
+
#[repr(u8)]
264
264
+
pub enum DbAction {
265
265
+
Create = 0,
266
266
+
Update = 1,
267
267
+
Delete = 2,
268
268
+
}
269
269
+
270
270
+
impl DbAction {
271
271
+
pub fn as_str(&self) -> &'static str {
272
272
+
match self {
273
273
+
DbAction::Create => "create",
274
274
+
DbAction::Update => "update",
275
275
+
DbAction::Delete => "delete",
276
276
+
}
277
277
+
}
278
278
+
}
279
279
+
280
280
+
impl<'a> From<&'a str> for DbAction {
281
281
+
fn from(s: &'a str) -> Self {
282
282
+
match s {
283
283
+
"create" => DbAction::Create,
284
284
+
"update" => DbAction::Update,
285
285
+
"delete" => DbAction::Delete,
286
286
+
_ => panic!("invalid action: {}", s),
287
287
+
}
288
288
+
}
289
289
+
}
290
290
+
291
291
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
292
292
+
#[serde(untagged)]
293
293
+
pub enum DbRkey {
294
294
+
Tid(DbTid),
295
295
+
Str(SmolStr),
296
296
+
}
297
297
+
298
298
+
impl DbRkey {
299
299
+
pub fn new(s: &str) -> Self {
300
300
+
if let Ok(tid) = Tid::new(s) {
301
301
+
DbRkey::Tid(DbTid::from(tid))
302
302
+
} else {
303
303
+
DbRkey::Str(SmolStr::from(s))
304
304
+
}
305
305
+
}
306
306
+
307
307
+
pub fn to_smolstr(&self) -> SmolStr {
308
308
+
match self {
309
309
+
DbRkey::Tid(tid) => tid.to_smolstr(),
310
310
+
DbRkey::Str(s) => s.clone(),
311
311
+
}
312
312
+
}
313
313
+
}
314
314
+
315
315
+
impl From<&str> for DbRkey {
316
316
+
fn from(s: &str) -> Self {
317
317
+
Self::new(s)
318
318
+
}
319
319
+
}
320
320
+
321
321
+
impl From<String> for DbRkey {
322
322
+
fn from(s: String) -> Self {
323
323
+
Self::new(&s)
324
324
+
}
325
325
+
}
326
326
+
327
327
+
impl From<SmolStr> for DbRkey {
328
328
+
fn from(s: SmolStr) -> Self {
329
329
+
Self::new(s.as_str())
330
330
+
}
331
331
+
}
332
332
+
333
333
+
impl Display for DbRkey {
334
334
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335
335
+
write!(f, "{}", self.to_smolstr())
336
336
+
}
337
337
+
}
338
338
+
339
339
+
#[cfg(test)]
340
340
+
mod tests {
341
341
+
use super::*;
342
342
+
use jacquard_common::types::tid::Tid;
343
343
+
344
344
+
#[test]
345
345
+
fn test_dbtid_roundtrip() {
346
346
+
let tid_str = "3jzfcijpj2z2a";
347
347
+
let tid = Tid::new(tid_str).unwrap();
348
348
+
let db_tid = DbTid::from(&tid);
349
349
+
assert_eq!(db_tid.to_tid().as_str(), tid_str);
350
350
+
351
351
+
let tid_str_2 = "2222222222222";
352
352
+
let tid = Tid::new(tid_str_2).unwrap();
353
353
+
let db_tid = DbTid::from(&tid);
354
354
+
assert_eq!(db_tid.to_tid().as_str(), tid_str_2);
355
355
+
}
356
356
+
357
357
+
#[test]
358
358
+
fn test_dbrkey() {
359
359
+
let tid_str = "3jzfcijpj2z2a";
360
360
+
let rkey = DbRkey::new(tid_str);
361
361
+
if let DbRkey::Tid(t) = rkey {
362
362
+
assert_eq!(t.to_tid().as_str(), tid_str);
363
363
+
} else {
364
364
+
panic!("expected tid");
365
365
+
}
366
366
+
367
367
+
let str_val = "self";
368
368
+
let rkey = DbRkey::new(str_val);
369
369
+
if let DbRkey::Str(s) = rkey {
370
370
+
assert_eq!(s, str_val);
371
371
+
} else {
372
372
+
panic!("expected str");
373
373
+
}
94
374
}
95
375
}
+10
-5
src/ingest/worker.rs
···
279
279
SubscribeReposMessage::Commit(commit) => {
280
280
trace!("processing buffered commit for {did}");
281
281
282
282
-
if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.as_str()) {
282
282
+
if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str())
283
283
+
{
283
284
debug!(
284
285
"skipping replayed event for {}: {} <= {}",
285
286
did,
286
287
commit.rev,
287
287
-
repo_state.rev.as_ref().expect("we checked in if")
288
288
+
repo_state
289
289
+
.rev
290
290
+
.as_ref()
291
291
+
.map(|r| r.to_tid())
292
292
+
.expect("we checked in if")
288
293
);
289
294
return Ok(ProcessResult::Ok);
290
295
}
291
296
292
297
if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data)
293
293
-
&& repo != &prev_commit.0
298
298
+
&& repo != &prev_commit.0.to_ipld().expect("valid cid")
294
299
{
295
300
warn!(
296
301
"gap detected for {}: repo {} != commit prev {}. triggering backfill",
···
329
334
match ops::verify_sync_event(sync.blocks.as_ref(), get_key()?) {
330
335
Ok((root, rev)) => {
331
336
if let Some(current_data) = &repo_state.data {
332
332
-
if current_data == &root {
337
337
+
if current_data == &root.to_ipld().expect("valid cid") {
333
338
debug!("skipping noop sync for {did}");
334
339
return Ok(ProcessResult::Ok);
335
340
}
336
341
}
337
342
338
343
if let Some(current_rev) = &repo_state.rev {
339
339
-
if rev.as_str() <= current_rev.as_str() {
344
344
+
if rev.as_str() <= current_rev.to_tid().as_str() {
340
345
debug!("skipping replayed sync for {did}");
341
346
return Ok(ProcessResult::Ok);
342
347
}
+1
-1
src/main.rs
···
1
1
+
use futures::{FutureExt, TryFutureExt, future::BoxFuture};
1
2
use hydrant::config::{Config, SignatureVerification};
2
3
use hydrant::crawler::Crawler;
3
4
use hydrant::db::{self, set_firehose_cursor};
4
5
use hydrant::ingest::firehose::FirehoseIngestor;
5
6
use hydrant::state::AppState;
6
7
use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker};
7
7
-
use futures::{FutureExt, TryFutureExt, future::BoxFuture};
8
8
use miette::IntoDiagnostic;
9
9
use mimalloc::MiMalloc;
10
10
use std::sync::Arc;
+9
-9
src/ops.rs
···
1
1
-
use crate::db::types::TrimmedDid;
1
1
+
use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
2
2
use crate::db::{self, Db, keys, ser_repo_state};
3
3
use crate::state::AppState;
4
4
use crate::types::{
···
68
68
batch.remove(&db.resync, &repo_key);
69
69
70
70
// 2. delete from records (prefix: repo_key + SEP)
71
71
-
let mut records_prefix = repo_key.as_bytes().to_vec();
71
71
+
let mut records_prefix = repo_key.clone();
72
72
records_prefix.push(keys::SEP);
73
73
for guard in db.records.prefix(&records_prefix) {
74
74
let k = guard.key().into_diagnostic()?;
···
79
79
let mut count_prefix = Vec::new();
80
80
count_prefix.push(b'r');
81
81
count_prefix.push(keys::SEP);
82
82
-
count_prefix.extend_from_slice(TrimmedDid::from(did).as_bytes());
82
82
+
TrimmedDid::from(did).write_to_vec(&mut count_prefix);
83
83
count_prefix.push(keys::SEP);
84
84
85
85
for guard in db.counts.prefix(&count_prefix) {
···
210
210
trace!("signature verified for {did}");
211
211
}
212
212
213
213
-
repo_state.rev = Some(commit.rev.clone());
214
214
-
repo_state.data = Some(Cid::ipld(repo_commit.data));
213
213
+
repo_state.rev = Some(commit.rev.clone().into());
214
214
+
repo_state.data = Some(repo_commit.data);
215
215
repo_state.last_updated_at = chrono::Utc::now().timestamp();
216
216
217
217
batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
···
264
264
let evt = StoredEvent {
265
265
live: true,
266
266
did: TrimmedDid::from(did),
267
267
-
rev: CowStr::Borrowed(commit.rev.as_str()),
267
267
+
rev: DbTid::from(&commit.rev),
268
268
collection: CowStr::Borrowed(collection),
269
269
-
rkey: CowStr::Borrowed(rkey),
270
270
-
action: CowStr::Borrowed(op.action.as_str()),
271
271
-
cid: op.cid.as_ref().map(|c| c.0.clone()),
269
269
+
rkey: DbRkey::new(rkey),
270
270
+
action: DbAction::from(op.action.as_str()),
271
271
+
cid: op.cid.as_ref().map(|c| c.0.to_ipld().expect("valid cid")),
272
272
};
273
273
274
274
let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
+10
-17
src/types.rs
···
1
1
use std::fmt::Display;
2
2
3
3
-
use jacquard::{
4
4
-
CowStr, IntoStatic,
5
5
-
types::{cid::Cid, tid::Tid},
6
6
-
};
3
3
+
use jacquard::{CowStr, IntoStatic};
7
4
use jacquard_common::types::string::Did;
8
5
use serde::{Deserialize, Serialize};
9
6
use serde_json::Value;
10
7
use smol_str::SmolStr;
11
8
12
12
-
use crate::db::types::TrimmedDid;
9
9
+
use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
10
10
+
use jacquard::types::cid::IpldCid;
13
11
14
12
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15
13
pub enum RepoStatus {
···
40
38
#[serde(borrow)]
41
39
pub did: TrimmedDid<'i>,
42
40
pub status: RepoStatus,
43
43
-
pub rev: Option<Tid>,
44
44
-
#[serde(borrow)]
45
45
-
pub data: Option<Cid<'i>>,
41
41
+
pub rev: Option<DbTid>,
42
42
+
pub data: Option<IpldCid>,
46
43
pub last_seq: Option<i64>,
47
44
pub last_updated_at: i64, // unix timestamp
48
45
pub handle: Option<SmolStr>,
···
70
67
did: self.did.into_static(),
71
68
status: self.status,
72
69
rev: self.rev,
73
73
-
data: self.data.map(|c| c.into_static()),
70
70
+
data: self.data,
74
71
last_seq: self.last_seq,
75
72
last_updated_at: self.last_updated_at,
76
73
handle: self.handle,
···
167
164
pub live: bool,
168
165
#[serde(borrow)]
169
166
pub did: TrimmedDid<'i>,
170
170
-
#[serde(borrow)]
171
171
-
pub rev: CowStr<'i>,
167
167
+
pub rev: DbTid,
172
168
#[serde(borrow)]
173
169
pub collection: CowStr<'i>,
174
174
-
#[serde(borrow)]
175
175
-
pub rkey: CowStr<'i>,
176
176
-
#[serde(borrow)]
177
177
-
pub action: CowStr<'i>,
178
178
-
#[serde(borrow)]
170
170
+
pub rkey: DbRkey,
171
171
+
pub action: DbAction,
179
172
#[serde(default)]
180
173
#[serde(skip_serializing_if = "Option::is_none")]
181
181
-
pub cid: Option<Cid<'i>>,
174
174
+
pub cid: Option<IpldCid>,
182
175
}