forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
2use crate::db::{self, Db, keys, ser_repo_state};
3use crate::filter::FilterConfig;
4use crate::types::{
5 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState,
6 StoredEvent,
7};
8use fjall::OwnedWriteBatch;
9use jacquard::CowStr;
10use jacquard::IntoStatic;
11
12use jacquard::types::cid::Cid;
13use jacquard::types::did::Did;
14use jacquard_api::com_atproto::sync::subscribe_repos::Commit;
15use jacquard_common::types::crypto::PublicKey;
16use jacquard_repo::car::reader::parse_car_bytes;
17use miette::{Context, IntoDiagnostic, Result};
18use rand::{Rng, rng};
19use std::collections::HashMap;
20use std::sync::atomic::Ordering;
21use std::time::Instant;
22use tracing::{debug, trace};
23
24pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> {
25 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev));
26 let value = rmp_serde::to_vec(commit).into_diagnostic()?;
27 db.resync_buffer.insert(key, value).into_diagnostic()?;
28 debug!(
29 "buffered commit seq {} for {did} to resync_buffer",
30 commit.seq
31 );
32 Ok(())
33}
34
35pub fn has_buffered_commits(db: &Db, did: &Did) -> bool {
36 let prefix = keys::resync_buffer_prefix(did);
37 db.resync_buffer.prefix(&prefix).next().is_some()
38}
39
40// emitting identity is ephemeral
41// we dont replay these, consumers can just fetch identity themselves if they need it
42pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent {
43 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
44 let marshallable = MarshallableEvt {
45 id: event_id,
46 event_type: "identity".into(),
47 record: None,
48 identity: Some(evt),
49 account: None,
50 };
51 BroadcastEvent::Ephemeral(marshallable)
52}
53
54pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent {
55 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
56 let marshallable = MarshallableEvt {
57 id: event_id,
58 event_type: "account".into(),
59 record: None,
60 identity: None,
61 account: Some(evt),
62 };
63 BroadcastEvent::Ephemeral(marshallable)
64}
65
66pub fn delete_repo<'batch>(
67 batch: &'batch mut OwnedWriteBatch,
68 db: &Db,
69 did: &jacquard::types::did::Did,
70 repo_state: RepoState,
71) -> Result<()> {
72 debug!("deleting repo {did}");
73
74 let repo_key = keys::repo_key(did);
75 let pending_key = keys::pending_key(repo_state.index_id);
76
77 // 1. delete from repos, pending, resync
78 batch.remove(&db.repos, &repo_key);
79 match repo_state.status {
80 RepoStatus::Synced => {}
81 RepoStatus::Backfilling => {
82 batch.remove(&db.pending, &pending_key);
83 }
84 _ => {
85 batch.remove(&db.resync, &repo_key);
86 }
87 }
88
89 // 2. delete from resync buffer
90 let resync_prefix = keys::resync_buffer_prefix(did);
91 for guard in db.resync_buffer.prefix(&resync_prefix) {
92 let k = guard.key().into_diagnostic()?;
93 batch.remove(&db.resync_buffer, k);
94 }
95
96 // 3. delete from records
97 let records_prefix = keys::record_prefix_did(did);
98 for guard in db.records.prefix(&records_prefix) {
99 let k = guard.key().into_diagnostic()?;
100 batch.remove(&db.records, k);
101 }
102
103 // 4. reset collection counts
104 let mut count_prefix = Vec::new();
105 count_prefix.push(b'r');
106 count_prefix.push(keys::SEP);
107 TrimmedDid::from(did).write_to_vec(&mut count_prefix);
108 count_prefix.push(keys::SEP);
109
110 for guard in db.counts.prefix(&count_prefix) {
111 let k = guard.key().into_diagnostic()?;
112 batch.remove(&db.counts, k);
113 }
114
115 Ok(())
116}
117
118pub fn update_repo_status<'batch, 's>(
119 batch: &'batch mut OwnedWriteBatch,
120 db: &Db,
121 did: &jacquard::types::did::Did,
122 mut repo_state: RepoState<'s>,
123 new_status: RepoStatus,
124) -> Result<RepoState<'s>> {
125 debug!("updating repo status for {did} to {new_status:?}");
126
127 let repo_key = keys::repo_key(did);
128 let pending_key = keys::pending_key(repo_state.index_id);
129
130 // manage queues
131 match &new_status {
132 RepoStatus::Synced => {
133 batch.remove(&db.pending, &pending_key);
134 // we dont have to remove from resync here because it has to transition resync -> pending first
135 }
136 RepoStatus::Backfilling => {
137 // if we are coming from an error state, remove from resync
138 if !matches!(repo_state.status, RepoStatus::Synced) {
139 batch.remove(&db.resync, &repo_key);
140 }
141 // remove the old entry
142 batch.remove(&db.pending, &pending_key);
143 // add as new entry
144 repo_state.index_id = rng().next_u64();
145 batch.insert(
146 &db.pending,
147 keys::pending_key(repo_state.index_id),
148 &repo_key,
149 );
150 }
151 RepoStatus::Error(_msg) => {
152 batch.remove(&db.pending, &pending_key);
153 // TODO: we need to make errors have kind instead of "message" in repo status
154 // and then pass it to resync error kind
155 let resync_state = crate::types::ResyncState::Error {
156 kind: crate::types::ResyncErrorKind::Generic,
157 retry_count: 0,
158 next_retry: chrono::Utc::now().timestamp(),
159 };
160 batch.insert(
161 &db.resync,
162 &repo_key,
163 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
164 );
165 }
166 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => {
167 // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states
168 // batch.remove(&db.pending, &pending_key);
169 let resync_state = ResyncState::Gone {
170 status: new_status.clone(),
171 };
172 batch.insert(
173 &db.resync,
174 &repo_key,
175 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
176 );
177 }
178 }
179
180 repo_state.status = new_status;
181 repo_state.last_updated_at = chrono::Utc::now().timestamp();
182
183 batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?);
184
185 Ok(repo_state)
186}
187
188pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> {
189 let parsed = tokio::task::block_in_place(|| {
190 tokio::runtime::Handle::current()
191 .block_on(parse_car_bytes(blocks))
192 .into_diagnostic()
193 })?;
194
195 let root_bytes = parsed
196 .blocks
197 .get(&parsed.root)
198 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
199
200 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
201
202 if let Some(key) = key {
203 repo_commit
204 .verify(key)
205 .map_err(|e| miette::miette!("signature verification failed: {e}"))?;
206 }
207
208 Ok((
209 Cid::ipld(repo_commit.data).into_static(),
210 repo_commit.rev.to_string(),
211 ))
212}
213
214pub struct ApplyCommitResults<'s> {
215 pub repo_state: RepoState<'s>,
216 pub records_delta: i64,
217 pub blocks_count: i64,
218}
219
220pub fn apply_commit<'batch, 'db, 'commit, 's>(
221 batch: &'batch mut OwnedWriteBatch,
222 db: &'db Db,
223 mut repo_state: RepoState<'s>,
224 commit: &'commit Commit<'commit>,
225 signing_key: Option<&PublicKey>,
226 filter: &FilterConfig,
227) -> Result<ApplyCommitResults<'s>> {
228 let did = &commit.repo;
229 debug!("applying commit {} for {did}", &commit.commit);
230
231 // 1. parse CAR blocks and store them in CAS
232 let start = Instant::now();
233 let parsed = tokio::task::block_in_place(|| {
234 tokio::runtime::Handle::current()
235 .block_on(parse_car_bytes(commit.blocks.as_ref()))
236 .into_diagnostic()
237 })?;
238
239 trace!("parsed car for {did} in {:?}", start.elapsed());
240
241 let root_bytes = parsed
242 .blocks
243 .get(&parsed.root)
244 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
245
246 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
247
248 if let Some(key) = signing_key {
249 repo_commit
250 .verify(key)
251 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
252 trace!("signature verified for {did}");
253 }
254
255 repo_state.rev = Some((&commit.rev).into());
256 repo_state.data = Some(repo_commit.data);
257 repo_state.last_updated_at = chrono::Utc::now().timestamp();
258
259 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
260
261 // store all blocks in the CAS
262 for (cid, bytes) in &parsed.blocks {
263 batch.insert(&db.blocks, cid.to_bytes(), bytes.to_vec());
264 }
265
266 // 2. iterate ops and update records index
267 let mut records_delta = 0;
268 let mut collection_deltas: HashMap<&str, i64> = HashMap::new();
269
270 for op in &commit.ops {
271 let (collection, rkey) = parse_path(&op.path)?;
272
273 if !filter.matches_collection(collection) {
274 continue;
275 }
276
277 let rkey = DbRkey::new(rkey);
278 let db_key = keys::record_key(did, collection, &rkey);
279
280 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
281
282 let action = DbAction::try_from(op.action.as_str())?;
283 match action {
284 DbAction::Create | DbAction::Update => {
285 let Some(cid) = &op.cid else {
286 continue;
287 };
288 batch.insert(
289 &db.records,
290 db_key.clone(),
291 cid.to_ipld()
292 .into_diagnostic()
293 .wrap_err("expected valid cid from relay")?
294 .to_bytes(),
295 );
296
297 // accumulate counts
298 if action == DbAction::Create {
299 records_delta += 1;
300 *collection_deltas.entry(collection).or_default() += 1;
301 }
302 }
303 DbAction::Delete => {
304 batch.remove(&db.records, db_key);
305
306 // accumulate counts
307 records_delta -= 1;
308 *collection_deltas.entry(collection).or_default() -= 1;
309 }
310 }
311
312 let evt = StoredEvent {
313 live: true,
314 did: TrimmedDid::from(did),
315 rev: DbTid::from(&commit.rev),
316 collection: CowStr::Borrowed(collection),
317 rkey,
318 action,
319 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")),
320 };
321
322 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
323 batch.insert(&db.events, keys::event_key(event_id), bytes);
324 }
325
326 // update counts
327 let blocks_count = parsed.blocks.len() as i64;
328 for (col, delta) in collection_deltas {
329 db::update_record_count(batch, db, did, col, delta)?;
330 }
331
332 Ok(ApplyCommitResults {
333 repo_state,
334 records_delta,
335 blocks_count,
336 })
337}
338
339pub fn parse_path(path: &str) -> Result<(&str, &str)> {
340 let mut parts = path.splitn(2, '/');
341 let collection = parts.next().wrap_err("missing collection")?;
342 let rkey = parts.next().wrap_err("missing rkey")?;
343 Ok((collection, rkey))
344}