at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use fjall::OwnedWriteBatch;
2use jacquard_common::CowStr;
3use jacquard_common::IntoStatic;
4use jacquard_common::types::cid::Cid;
5use jacquard_common::types::crypto::PublicKey;
6use jacquard_common::types::did::Did;
7use jacquard_repo::car::reader::parse_car_bytes;
8use miette::{Context, IntoDiagnostic, Result};
9use rand::{Rng, rng};
10use std::collections::HashMap;
11use std::sync::atomic::Ordering;
12use std::time::Instant;
13use tracing::{debug, trace};
14
15use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
16use crate::db::{self, Db, keys, ser_repo_state};
17use crate::filter::FilterConfig;
18use crate::ingest::stream::Commit;
19use crate::types::{
20 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState,
21 StoredEvent,
22};
23
24pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> {
25 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev));
26 let value = rmp_serde::to_vec(commit).into_diagnostic()?;
27 db.resync_buffer.insert(key, value).into_diagnostic()?;
28 debug!(
29 did = %did,
30 seq = commit.seq,
31 "buffered commit to resync_buffer"
32 );
33 Ok(())
34}
35
36pub fn has_buffered_commits(db: &Db, did: &Did) -> bool {
37 let prefix = keys::resync_buffer_prefix(did);
38 db.resync_buffer.prefix(&prefix).next().is_some()
39}
40
41// emitting identity is ephemeral
42// we dont replay these, consumers can just fetch identity themselves if they need it
43pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent {
44 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
45 let marshallable = MarshallableEvt {
46 id: event_id,
47 event_type: "identity".into(),
48 record: None,
49 identity: Some(evt),
50 account: None,
51 };
52 BroadcastEvent::Ephemeral(marshallable)
53}
54
55pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent {
56 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
57 let marshallable = MarshallableEvt {
58 id: event_id,
59 event_type: "account".into(),
60 record: None,
61 identity: None,
62 account: Some(evt),
63 };
64 BroadcastEvent::Ephemeral(marshallable)
65}
66
67pub fn delete_repo<'batch>(
68 batch: &'batch mut OwnedWriteBatch,
69 db: &Db,
70 did: &Did,
71 repo_state: &RepoState,
72) -> Result<()> {
73 debug!(did = %did, "deleting repo");
74
75 let repo_key = keys::repo_key(did);
76 let pending_key = keys::pending_key(repo_state.index_id);
77
78 // 1. delete from repos, pending, resync
79 batch.remove(&db.repos, &repo_key);
80 match repo_state.status {
81 RepoStatus::Synced => {}
82 RepoStatus::Backfilling => {
83 batch.remove(&db.pending, &pending_key);
84 }
85 _ => {
86 batch.remove(&db.resync, &repo_key);
87 }
88 }
89
90 // 2. delete from resync buffer
91 let resync_prefix = keys::resync_buffer_prefix(did);
92 for guard in db.resync_buffer.prefix(&resync_prefix) {
93 let k = guard.key().into_diagnostic()?;
94 batch.remove(&db.resync_buffer, k);
95 }
96
97 // 3. delete from records
98 let records_prefix = keys::record_prefix_did(did);
99 for guard in db.records.prefix(&records_prefix) {
100 let k = guard.key().into_diagnostic()?;
101 batch.remove(&db.records, k);
102 }
103
104 // 4. reset collection counts
105 let mut count_prefix = Vec::new();
106 count_prefix.push(b'r');
107 count_prefix.push(keys::SEP);
108 TrimmedDid::from(did).write_to_vec(&mut count_prefix);
109 count_prefix.push(keys::SEP);
110
111 for guard in db.counts.prefix(&count_prefix) {
112 let k = guard.key().into_diagnostic()?;
113 batch.remove(&db.counts, k);
114 }
115
116 Ok(())
117}
118
119pub fn update_repo_status<'batch, 's>(
120 batch: &'batch mut OwnedWriteBatch,
121 db: &Db,
122 did: &Did,
123 mut repo_state: RepoState<'s>,
124 new_status: RepoStatus,
125) -> Result<RepoState<'s>> {
126 debug!(did = %did, status = ?new_status, "updating repo status");
127
128 let repo_key = keys::repo_key(did);
129 let pending_key = keys::pending_key(repo_state.index_id);
130
131 // manage queues
132 match &new_status {
133 RepoStatus::Synced => {
134 batch.remove(&db.pending, &pending_key);
135 // we dont have to remove from resync here because it has to transition resync -> pending first
136 }
137 RepoStatus::Backfilling => {
138 // if we are coming from an error state, remove from resync
139 if !matches!(repo_state.status, RepoStatus::Synced) {
140 batch.remove(&db.resync, &repo_key);
141 }
142 // remove the old entry
143 batch.remove(&db.pending, &pending_key);
144 // add as new entry
145 repo_state.index_id = rng().next_u64();
146 batch.insert(
147 &db.pending,
148 keys::pending_key(repo_state.index_id),
149 &repo_key,
150 );
151 }
152 RepoStatus::Error(_msg) => {
153 batch.remove(&db.pending, &pending_key);
154 // TODO: we need to make errors have kind instead of "message" in repo status
155 // and then pass it to resync error kind
156 let resync_state = crate::types::ResyncState::Error {
157 kind: crate::types::ResyncErrorKind::Generic,
158 retry_count: 0,
159 next_retry: chrono::Utc::now().timestamp(),
160 };
161 batch.insert(
162 &db.resync,
163 &repo_key,
164 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
165 );
166 }
167 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => {
168 // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states
169 // batch.remove(&db.pending, &pending_key);
170 let resync_state = ResyncState::Gone {
171 status: new_status.clone(),
172 };
173 batch.insert(
174 &db.resync,
175 &repo_key,
176 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
177 );
178 }
179 }
180
181 repo_state.status = new_status;
182 repo_state.last_updated_at = chrono::Utc::now().timestamp();
183
184 batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?);
185
186 Ok(repo_state)
187}
188
189pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> {
190 let parsed = tokio::task::block_in_place(|| {
191 tokio::runtime::Handle::current()
192 .block_on(parse_car_bytes(blocks))
193 .into_diagnostic()
194 })?;
195
196 let root_bytes = parsed
197 .blocks
198 .get(&parsed.root)
199 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
200
201 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
202
203 if let Some(key) = key {
204 repo_commit
205 .verify(key)
206 .map_err(|e| miette::miette!("signature verification failed: {e}"))?;
207 }
208
209 Ok((
210 Cid::ipld(repo_commit.data).into_static(),
211 repo_commit.rev.to_string(),
212 ))
213}
214
215pub struct ApplyCommitResults<'s> {
216 pub repo_state: RepoState<'s>,
217 pub records_delta: i64,
218 pub blocks_count: i64,
219}
220
221pub fn apply_commit<'batch, 'db, 'commit, 's>(
222 batch: &'batch mut OwnedWriteBatch,
223 db: &'db Db,
224 mut repo_state: RepoState<'s>,
225 commit: &'commit Commit<'commit>,
226 signing_key: Option<&PublicKey>,
227 filter: &FilterConfig,
228) -> Result<ApplyCommitResults<'s>> {
229 let did = &commit.repo;
230 debug!(did = %did, commit = %commit.commit, "applying commit");
231
232 // 1. parse CAR blocks and store them in CAS
233 let start = Instant::now();
234 let parsed = tokio::task::block_in_place(|| {
235 tokio::runtime::Handle::current()
236 .block_on(parse_car_bytes(commit.blocks.as_ref()))
237 .into_diagnostic()
238 })?;
239
240 trace!(did = %did, elapsed = ?start.elapsed(), "parsed car");
241
242 let root_bytes = parsed
243 .blocks
244 .get(&parsed.root)
245 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
246
247 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
248
249 if let Some(key) = signing_key {
250 repo_commit
251 .verify(key)
252 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
253 trace!(did = %did, "signature verified");
254 }
255
256 repo_state.rev = Some((&commit.rev).into());
257 repo_state.data = Some(repo_commit.data);
258 repo_state.last_updated_at = chrono::Utc::now().timestamp();
259
260 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
261
262 // 2. iterate ops and update records index
263 let mut records_delta = 0;
264 let mut blocks_count = 0;
265 let mut collection_deltas: HashMap<&str, i64> = HashMap::new();
266
267 for op in &commit.ops {
268 let (collection, rkey) = parse_path(&op.path)?;
269
270 if !filter.matches_collection(collection) {
271 continue;
272 }
273
274 let rkey = DbRkey::new(rkey);
275 let db_key = keys::record_key(did, collection, &rkey);
276
277 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
278
279 let action = DbAction::try_from(op.action.as_str())?;
280 match action {
281 DbAction::Create | DbAction::Update => {
282 let Some(cid) = &op.cid else {
283 continue;
284 };
285 let cid_ipld = cid
286 .to_ipld()
287 .into_diagnostic()
288 .wrap_err("expected valid cid from relay")?;
289
290 if let Some(bytes) = parsed.blocks.get(&cid_ipld) {
291 batch.insert(&db.blocks, cid_ipld.to_bytes(), bytes.to_vec());
292 blocks_count += 1;
293 }
294
295 batch.insert(&db.records, db_key.clone(), cid_ipld.to_bytes());
296
297 // accumulate counts
298 if action == DbAction::Create {
299 records_delta += 1;
300 *collection_deltas.entry(collection).or_default() += 1;
301 }
302 }
303 DbAction::Delete => {
304 batch.remove(&db.records, db_key);
305
306 // accumulate counts
307 records_delta -= 1;
308 *collection_deltas.entry(collection).or_default() -= 1;
309 }
310 }
311
312 let evt = StoredEvent {
313 live: true,
314 did: TrimmedDid::from(did),
315 rev: DbTid::from(&commit.rev),
316 collection: CowStr::Borrowed(collection),
317 rkey,
318 action,
319 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")),
320 };
321
322 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
323 batch.insert(&db.events, keys::event_key(event_id), bytes);
324 }
325
326 // update counts
327 for (col, delta) in collection_deltas {
328 db::update_record_count(batch, db, did, col, delta)?;
329 }
330
331 Ok(ApplyCommitResults {
332 repo_state,
333 records_delta,
334 blocks_count,
335 })
336}
337
338pub fn parse_path(path: &str) -> Result<(&str, &str)> {
339 let mut parts = path.splitn(2, '/');
340 let collection = parts.next().wrap_err("missing collection")?;
341 let rkey = parts.next().wrap_err("missing rkey")?;
342 Ok((collection, rkey))
343}