at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::{keys, Db};
2use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, StoredEvent};
3use jacquard::api::com_atproto::sync::subscribe_repos::Commit;
4use jacquard::cowstr::ToCowStr;
5use jacquard_repo::car::reader::parse_car_bytes;
6use miette::{IntoDiagnostic, Result};
7use smol_str::{SmolStr, ToSmolStr};
8use std::collections::HashMap;
9use std::sync::atomic::Ordering;
10use std::time::Instant;
11use tracing::{debug, trace};
12
13// emitting identity is ephemeral
14// we dont replay these, consumers can just fetch identity themselves if they need it
15pub fn emit_identity_event(db: &Db, evt: IdentityEvt) {
16 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
17 let marshallable = MarshallableEvt {
18 id: event_id,
19 event_type: "identity".into(),
20 record: None,
21 identity: Some(evt),
22 account: None,
23 };
24 let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable));
25}
26
27pub fn emit_account_event(db: &Db, evt: AccountEvt) {
28 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
29 let marshallable = MarshallableEvt {
30 id: event_id,
31 event_type: "account".into(),
32 record: None,
33 identity: None,
34 account: Some(evt),
35 };
36 let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable));
37}
38
39pub fn delete_repo(db: &Db, did: &jacquard::types::did::Did) -> Result<()> {
40 debug!("deleting repo {did}");
41 let mut batch = db.inner.batch();
42 let repo_key = keys::repo_key(did);
43
44 // 1. delete from repos, pending, resync
45 batch.remove(&db.repos, repo_key);
46 batch.remove(&db.pending, repo_key);
47 batch.remove(&db.resync, repo_key);
48
49 // 2. delete from buffer (prefix: repo_key + SEP)
50 let mut buffer_prefix = repo_key.to_vec();
51 buffer_prefix.push(keys::SEP);
52 for guard in db.buffer.prefix(&buffer_prefix) {
53 let k = guard.key().into_diagnostic()?;
54 batch.remove(&db.buffer, k);
55 }
56
57 // 3. delete from records (prefix: repo_key + SEP)
58 let mut records_prefix = repo_key.to_vec();
59 records_prefix.push(keys::SEP);
60 let mut deleted_count = 0;
61
62 for guard in db.records.prefix(&records_prefix) {
63 let k = guard.key().into_diagnostic()?;
64 batch.remove(&db.records, k);
65 deleted_count += 1;
66 }
67
68 // 4. reset collection counts
69 let mut count_prefix = Vec::new();
70 count_prefix.push(b'r');
71 count_prefix.push(keys::SEP);
72 count_prefix.extend_from_slice(keys::did_prefix(did).as_bytes());
73 count_prefix.push(keys::SEP);
74
75 for guard in db.counts.prefix(&count_prefix) {
76 let k = guard.key().into_diagnostic()?;
77 batch.remove(&db.counts, k);
78 }
79
80 batch.commit().into_diagnostic()?;
81
82 // update global record count
83 if deleted_count > 0 {
84 tokio::spawn(db.increment_count(keys::count_keyspace_key("records"), -deleted_count));
85 }
86
87 Ok(())
88}
89
90pub fn update_repo_status(
91 db: &Db,
92 did: &jacquard::types::did::Did,
93 status: crate::types::RepoStatus,
94) -> Result<()> {
95 debug!("updating repo status for {did} to {status:?}");
96 let (updated, batch) =
97 Db::update_repo_state(db.inner.batch(), &db.repos, did, |state, _val| {
98 state.status = status.clone();
99 state.last_updated_at = chrono::Utc::now().timestamp();
100 Ok((true, ()))
101 })?;
102
103 if updated.is_some() {
104 batch.commit().into_diagnostic()?;
105 }
106 Ok(())
107}
108
109pub fn apply_commit(db: &Db, commit: &Commit<'_>, live: bool) -> Result<()> {
110 let did = &commit.repo;
111 debug!("applying commit {} for {did}", &commit.commit);
112
113 // 1. parse CAR blocks and store them in CAS
114 let start = Instant::now();
115 let parsed = tokio::task::block_in_place(|| {
116 tokio::runtime::Handle::current()
117 .block_on(parse_car_bytes(commit.blocks.as_ref()))
118 .into_diagnostic()
119 })?;
120
121 trace!("parsed car for {did} in {:?}", start.elapsed());
122
123 let (_, mut batch) = Db::update_repo_state(db.inner.batch(), &db.repos, did, |state, _| {
124 state.rev = commit.rev.as_str().into();
125 state.data = parsed.root.to_smolstr();
126 state.last_updated_at = chrono::Utc::now().timestamp();
127 Ok((true, ()))
128 })?;
129
130 // store all blocks in the CAS
131 for (cid, bytes) in &parsed.blocks {
132 batch.insert(
133 &db.blocks,
134 keys::block_key(&cid.to_cowstr()),
135 bytes.to_vec(),
136 );
137 }
138
139 // 2. iterate ops and update records index
140 let mut records_delta = 0;
141 let mut events_count = 0;
142 let mut collection_deltas: HashMap<SmolStr, i64> = HashMap::new();
143
144 for op in &commit.ops {
145 let parts: Vec<&str> = op.path.splitn(2, '/').collect();
146 if parts.len() != 2 {
147 continue;
148 }
149 let collection = parts[0];
150 let rkey = parts[1];
151
152 let db_key = keys::record_key(did, collection, rkey);
153
154 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
155
156 let mut cid_str = None;
157
158 match op.action.as_str() {
159 "create" | "update" => {
160 let Some(cid) = &op.cid else {
161 continue;
162 };
163 let s = smol_str::SmolStr::from(cid.as_str());
164 batch.insert(&db.records, db_key, s.as_bytes().to_vec());
165 cid_str = Some(s);
166
167 // accumulate counts
168 if op.action.as_str() == "create" {
169 records_delta += 1;
170 *collection_deltas
171 .entry(collection.to_smolstr())
172 .or_default() += 1;
173 }
174 }
175 "delete" => {
176 batch.remove(&db.records, db_key);
177
178 // accumulate counts
179 records_delta -= 1;
180 *collection_deltas
181 .entry(collection.to_smolstr())
182 .or_default() -= 1;
183 }
184 _ => {}
185 }
186
187 let evt = StoredEvent::Record {
188 live,
189 did: did.as_str().into(),
190 rev: commit.rev.as_str().into(),
191 collection: collection.into(),
192 rkey: rkey.into(),
193 action: op.action.as_str().into(),
194 cid: cid_str,
195 };
196
197 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
198 batch.insert(&db.events, keys::event_key(event_id as i64), bytes);
199 events_count += 1;
200 }
201
202 let start = Instant::now();
203
204 batch.commit().into_diagnostic()?;
205 trace!("committed sync batch for {did} in {:?}", start.elapsed());
206
207 let blocks_count = parsed.blocks.len() as i64;
208 tokio::spawn({
209 let blocks_fut = (blocks_count > 0)
210 .then(|| db.increment_count(keys::count_keyspace_key("blocks"), blocks_count));
211 let records_fut = (records_delta != 0)
212 .then(|| db.increment_count(keys::count_keyspace_key("records"), records_delta));
213 let events_fut = (events_count > 0)
214 .then(|| db.increment_count(keys::count_keyspace_key("events"), events_count));
215 let collections_fut = collection_deltas
216 .into_iter()
217 .map(|(col, delta)| db.increment_count(keys::count_collection_key(&did, &col), delta))
218 .collect::<Vec<_>>();
219 futures::future::join_all(
220 blocks_fut
221 .into_iter()
222 .chain(records_fut)
223 .chain(events_fut)
224 .chain(collections_fut),
225 )
226 });
227
228 let _ = db.event_tx.send(BroadcastEvent::Persisted(
229 db.next_event_id.load(Ordering::SeqCst) - 1,
230 ));
231
232 Ok(())
233}