at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
2use crate::db::{self, Db, keys, ser_repo_state};
3use crate::types::{
4 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState,
5 StoredEvent,
6};
7use fjall::OwnedWriteBatch;
8use jacquard::CowStr;
9use jacquard::IntoStatic;
10
11use jacquard::types::cid::Cid;
12use jacquard::types::did::Did;
13use jacquard_api::com_atproto::sync::subscribe_repos::Commit;
14use jacquard_common::types::crypto::PublicKey;
15use jacquard_repo::car::reader::parse_car_bytes;
16use miette::{Context, IntoDiagnostic, Result};
17use std::collections::HashMap;
18use std::sync::atomic::Ordering;
19use std::time::Instant;
20use tracing::{debug, trace};
21
22pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> {
23 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev));
24 let value = rmp_serde::to_vec(commit).into_diagnostic()?;
25 db.resync_buffer.insert(key, value).into_diagnostic()?;
26 debug!(
27 "buffered commit seq {} for {did} to resync_buffer",
28 commit.seq
29 );
30 Ok(())
31}
32
33pub fn has_buffered_commits(db: &Db, did: &Did) -> bool {
34 let prefix = keys::resync_buffer_prefix(did);
35 db.resync_buffer.prefix(&prefix).next().is_some()
36}
37
38// emitting identity is ephemeral
39// we dont replay these, consumers can just fetch identity themselves if they need it
40pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent {
41 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
42 let marshallable = MarshallableEvt {
43 id: event_id,
44 event_type: "identity".into(),
45 record: None,
46 identity: Some(evt),
47 account: None,
48 };
49 BroadcastEvent::Ephemeral(marshallable)
50}
51
52pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent {
53 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
54 let marshallable = MarshallableEvt {
55 id: event_id,
56 event_type: "account".into(),
57 record: None,
58 identity: None,
59 account: Some(evt),
60 };
61 BroadcastEvent::Ephemeral(marshallable)
62}
63
64pub fn delete_repo<'batch>(
65 batch: &'batch mut OwnedWriteBatch,
66 db: &Db,
67 did: &jacquard::types::did::Did,
68) -> Result<()> {
69 debug!("deleting repo {did}");
70 let repo_key = keys::repo_key(did);
71
72 // 1. delete from repos, pending, resync
73 batch.remove(&db.repos, &repo_key);
74 batch.remove(&db.pending, &repo_key);
75 batch.remove(&db.resync, &repo_key);
76
77 let resync_prefix = keys::resync_buffer_prefix(did);
78 for guard in db.resync_buffer.prefix(&resync_prefix) {
79 let k = guard.key().into_diagnostic()?;
80 batch.remove(&db.resync_buffer, k);
81 }
82
83 // 2. delete from records (all partitions)
84 let mut partitions = Vec::new();
85 db.record_partitions.iter_sync(|_, v| {
86 partitions.push(v.clone());
87 true
88 });
89
90 let records_prefix = keys::record_prefix(did);
91 for ks in partitions {
92 for guard in ks.prefix(&records_prefix) {
93 let k = guard.key().into_diagnostic()?;
94 batch.remove(&ks, k);
95 }
96 }
97
98 // 3. reset collection counts
99 let mut count_prefix = Vec::new();
100 count_prefix.push(b'r');
101 count_prefix.push(keys::SEP);
102 TrimmedDid::from(did).write_to_vec(&mut count_prefix);
103 count_prefix.push(keys::SEP);
104
105 for guard in db.counts.prefix(&count_prefix) {
106 let k = guard.key().into_diagnostic()?;
107 batch.remove(&db.counts, k);
108 }
109
110 Ok(())
111}
112
113pub fn update_repo_status<'batch, 's>(
114 batch: &'batch mut OwnedWriteBatch,
115 db: &Db,
116 did: &jacquard::types::did::Did,
117 mut repo_state: RepoState<'s>,
118 new_status: RepoStatus,
119) -> Result<RepoState<'s>> {
120 debug!("updating repo status for {did} to {new_status:?}");
121
122 let key = keys::repo_key(did);
123
124 // manage queues
125 match &new_status {
126 RepoStatus::Synced => {
127 batch.remove(&db.pending, &key);
128 batch.remove(&db.resync, &key);
129 }
130 RepoStatus::Backfilling => {
131 batch.insert(&db.pending, &key, &[]);
132 batch.remove(&db.resync, &key);
133 }
134 RepoStatus::Error(msg) => {
135 batch.remove(&db.pending, &key);
136 let resync_state = ResyncState::Error {
137 message: msg.clone(),
138 retry_count: 0,
139 next_retry: chrono::Utc::now().timestamp(),
140 };
141 batch.insert(
142 &db.resync,
143 &key,
144 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
145 );
146 }
147 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => {
148 batch.remove(&db.pending, &key);
149 let resync_state = ResyncState::Gone {
150 status: new_status.clone(),
151 };
152 batch.insert(
153 &db.resync,
154 &key,
155 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
156 );
157 }
158 }
159
160 repo_state.status = new_status;
161 repo_state.last_updated_at = chrono::Utc::now().timestamp();
162
163 batch.insert(&db.repos, &key, ser_repo_state(&repo_state)?);
164
165 Ok(repo_state)
166}
167
168pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> {
169 let parsed = tokio::task::block_in_place(|| {
170 tokio::runtime::Handle::current()
171 .block_on(parse_car_bytes(blocks))
172 .into_diagnostic()
173 })?;
174
175 let root_bytes = parsed
176 .blocks
177 .get(&parsed.root)
178 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
179
180 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
181
182 if let Some(key) = key {
183 repo_commit
184 .verify(key)
185 .map_err(|e| miette::miette!("signature verification failed: {e}"))?;
186 }
187
188 Ok((
189 Cid::ipld(repo_commit.data).into_static(),
190 repo_commit.rev.to_string(),
191 ))
192}
193
194pub struct ApplyCommitResults<'s> {
195 pub repo_state: RepoState<'s>,
196 pub records_delta: i64,
197 pub blocks_count: i64,
198}
199
200pub fn apply_commit<'batch, 'db, 'commit, 's>(
201 batch: &'batch mut OwnedWriteBatch,
202 db: &'db Db,
203 mut repo_state: RepoState<'s>,
204 commit: &'commit Commit<'commit>,
205 signing_key: Option<&PublicKey>,
206) -> Result<ApplyCommitResults<'s>> {
207 let did = &commit.repo;
208 debug!("applying commit {} for {did}", &commit.commit);
209
210 // 1. parse CAR blocks and store them in CAS
211 let start = Instant::now();
212 let parsed = tokio::task::block_in_place(|| {
213 tokio::runtime::Handle::current()
214 .block_on(parse_car_bytes(commit.blocks.as_ref()))
215 .into_diagnostic()
216 })?;
217
218 trace!("parsed car for {did} in {:?}", start.elapsed());
219
220 let root_bytes = parsed
221 .blocks
222 .get(&parsed.root)
223 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
224
225 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
226
227 if let Some(key) = signing_key {
228 repo_commit
229 .verify(key)
230 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
231 trace!("signature verified for {did}");
232 }
233
234 repo_state.rev = Some((&commit.rev).into());
235 repo_state.data = Some(repo_commit.data);
236 repo_state.last_updated_at = chrono::Utc::now().timestamp();
237
238 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
239
240 // store all blocks in the CAS
241 for (cid, bytes) in &parsed.blocks {
242 batch.insert(&db.blocks, cid.to_bytes(), bytes.to_vec());
243 }
244
245 // 2. iterate ops and update records index
246 let mut records_delta = 0;
247 let mut collection_deltas: HashMap<&str, i64> = HashMap::new();
248
249 for op in &commit.ops {
250 let (collection, rkey) = parse_path(&op.path)?;
251 let rkey = DbRkey::new(rkey);
252 let partition = db.record_partition(collection)?;
253 let db_key = keys::record_key(did, &rkey);
254
255 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
256
257 let action = DbAction::try_from(op.action.as_str())?;
258 match action {
259 DbAction::Create | DbAction::Update => {
260 let Some(cid) = &op.cid else {
261 continue;
262 };
263 batch.insert(
264 &partition,
265 db_key.clone(),
266 cid.to_ipld()
267 .into_diagnostic()
268 .wrap_err("expected valid cid from relay")?
269 .to_bytes(),
270 );
271
272 // accumulate counts
273 if action == DbAction::Create {
274 records_delta += 1;
275 *collection_deltas.entry(collection).or_default() += 1;
276 }
277 }
278 DbAction::Delete => {
279 batch.remove(&partition, db_key);
280
281 // accumulate counts
282 records_delta -= 1;
283 *collection_deltas.entry(collection).or_default() -= 1;
284 }
285 }
286
287 let evt = StoredEvent {
288 live: true,
289 did: TrimmedDid::from(did),
290 rev: DbTid::from(&commit.rev),
291 collection: CowStr::Borrowed(collection),
292 rkey,
293 action,
294 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")),
295 };
296
297 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
298 batch.insert(&db.events, keys::event_key(event_id), bytes);
299 }
300
301 // update counts
302 let blocks_count = parsed.blocks.len() as i64;
303 for (col, delta) in collection_deltas {
304 db::update_record_count(batch, db, did, col, delta)?;
305 }
306
307 Ok(ApplyCommitResults {
308 repo_state,
309 records_delta,
310 blocks_count,
311 })
312}
313
314pub fn parse_path(path: &str) -> Result<(&str, &str)> {
315 let mut parts = path.splitn(2, '/');
316 let collection = parts.next().wrap_err("missing collection")?;
317 let rkey = parts.next().wrap_err("missing rkey")?;
318 Ok((collection, rkey))
319}