at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 6ad49bbce1f59c8a3d3f76d025d87b428e5057be 319 lines 10 kB view raw
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2use crate::db::{self, Db, keys, ser_repo_state}; 3use crate::types::{ 4 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 5 StoredEvent, 6}; 7use fjall::OwnedWriteBatch; 8use jacquard::CowStr; 9use jacquard::IntoStatic; 10 11use jacquard::types::cid::Cid; 12use jacquard::types::did::Did; 13use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 14use jacquard_common::types::crypto::PublicKey; 15use jacquard_repo::car::reader::parse_car_bytes; 16use miette::{Context, IntoDiagnostic, Result}; 17use std::collections::HashMap; 18use std::sync::atomic::Ordering; 19use std::time::Instant; 20use tracing::{debug, trace}; 21 22pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> { 23 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev)); 24 let value = rmp_serde::to_vec(commit).into_diagnostic()?; 25 db.resync_buffer.insert(key, value).into_diagnostic()?; 26 debug!( 27 "buffered commit seq {} for {did} to resync_buffer", 28 commit.seq 29 ); 30 Ok(()) 31} 32 33pub fn has_buffered_commits(db: &Db, did: &Did) -> bool { 34 let prefix = keys::resync_buffer_prefix(did); 35 db.resync_buffer.prefix(&prefix).next().is_some() 36} 37 38// emitting identity is ephemeral 39// we dont replay these, consumers can just fetch identity themselves if they need it 40pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent { 41 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 42 let marshallable = MarshallableEvt { 43 id: event_id, 44 event_type: "identity".into(), 45 record: None, 46 identity: Some(evt), 47 account: None, 48 }; 49 BroadcastEvent::Ephemeral(marshallable) 50} 51 52pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent { 53 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 54 let marshallable = MarshallableEvt { 55 id: event_id, 56 event_type: "account".into(), 57 record: None, 58 identity: None, 59 account: Some(evt), 60 }; 61 BroadcastEvent::Ephemeral(marshallable) 62} 63 64pub fn delete_repo<'batch>( 65 batch: &'batch mut OwnedWriteBatch, 66 db: &Db, 67 did: &jacquard::types::did::Did, 68) -> Result<()> { 69 debug!("deleting repo {did}"); 70 let repo_key = keys::repo_key(did); 71 72 // 1. delete from repos, pending, resync 73 batch.remove(&db.repos, &repo_key); 74 batch.remove(&db.pending, &repo_key); 75 batch.remove(&db.resync, &repo_key); 76 77 let resync_prefix = keys::resync_buffer_prefix(did); 78 for guard in db.resync_buffer.prefix(&resync_prefix) { 79 let k = guard.key().into_diagnostic()?; 80 batch.remove(&db.resync_buffer, k); 81 } 82 83 // 2. delete from records (all partitions) 84 let mut partitions = Vec::new(); 85 db.record_partitions.iter_sync(|_, v| { 86 partitions.push(v.clone()); 87 true 88 }); 89 90 let records_prefix = keys::record_prefix(did); 91 for ks in partitions { 92 for guard in ks.prefix(&records_prefix) { 93 let k = guard.key().into_diagnostic()?; 94 batch.remove(&ks, k); 95 } 96 } 97 98 // 3. reset collection counts 99 let mut count_prefix = Vec::new(); 100 count_prefix.push(b'r'); 101 count_prefix.push(keys::SEP); 102 TrimmedDid::from(did).write_to_vec(&mut count_prefix); 103 count_prefix.push(keys::SEP); 104 105 for guard in db.counts.prefix(&count_prefix) { 106 let k = guard.key().into_diagnostic()?; 107 batch.remove(&db.counts, k); 108 } 109 110 Ok(()) 111} 112 113pub fn update_repo_status<'batch, 's>( 114 batch: &'batch mut OwnedWriteBatch, 115 db: &Db, 116 did: &jacquard::types::did::Did, 117 mut repo_state: RepoState<'s>, 118 new_status: RepoStatus, 119) -> Result<RepoState<'s>> { 120 debug!("updating repo status for {did} to {new_status:?}"); 121 122 let key = keys::repo_key(did); 123 124 // manage queues 125 match &new_status { 126 RepoStatus::Synced => { 127 batch.remove(&db.pending, &key); 128 batch.remove(&db.resync, &key); 129 } 130 RepoStatus::Backfilling => { 131 batch.insert(&db.pending, &key, &[]); 132 batch.remove(&db.resync, &key); 133 } 134 RepoStatus::Error(msg) => { 135 batch.remove(&db.pending, &key); 136 let resync_state = ResyncState::Error { 137 message: msg.clone(), 138 retry_count: 0, 139 next_retry: chrono::Utc::now().timestamp(), 140 }; 141 batch.insert( 142 &db.resync, 143 &key, 144 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 145 ); 146 } 147 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => { 148 batch.remove(&db.pending, &key); 149 let resync_state = ResyncState::Gone { 150 status: new_status.clone(), 151 }; 152 batch.insert( 153 &db.resync, 154 &key, 155 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 156 ); 157 } 158 } 159 160 repo_state.status = new_status; 161 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 162 163 batch.insert(&db.repos, &key, ser_repo_state(&repo_state)?); 164 165 Ok(repo_state) 166} 167 168pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 169 let parsed = tokio::task::block_in_place(|| { 170 tokio::runtime::Handle::current() 171 .block_on(parse_car_bytes(blocks)) 172 .into_diagnostic() 173 })?; 174 175 let root_bytes = parsed 176 .blocks 177 .get(&parsed.root) 178 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 179 180 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 181 182 if let Some(key) = key { 183 repo_commit 184 .verify(key) 185 .map_err(|e| miette::miette!("signature verification failed: {e}"))?; 186 } 187 188 Ok(( 189 Cid::ipld(repo_commit.data).into_static(), 190 repo_commit.rev.to_string(), 191 )) 192} 193 194pub struct ApplyCommitResults<'s> { 195 pub repo_state: RepoState<'s>, 196 pub records_delta: i64, 197 pub blocks_count: i64, 198} 199 200pub fn apply_commit<'batch, 'db, 'commit, 's>( 201 batch: &'batch mut OwnedWriteBatch, 202 db: &'db Db, 203 mut repo_state: RepoState<'s>, 204 commit: &'commit Commit<'commit>, 205 signing_key: Option<&PublicKey>, 206) -> Result<ApplyCommitResults<'s>> { 207 let did = &commit.repo; 208 debug!("applying commit {} for {did}", &commit.commit); 209 210 // 1. parse CAR blocks and store them in CAS 211 let start = Instant::now(); 212 let parsed = tokio::task::block_in_place(|| { 213 tokio::runtime::Handle::current() 214 .block_on(parse_car_bytes(commit.blocks.as_ref())) 215 .into_diagnostic() 216 })?; 217 218 trace!("parsed car for {did} in {:?}", start.elapsed()); 219 220 let root_bytes = parsed 221 .blocks 222 .get(&parsed.root) 223 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 224 225 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 226 227 if let Some(key) = signing_key { 228 repo_commit 229 .verify(key) 230 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 231 trace!("signature verified for {did}"); 232 } 233 234 repo_state.rev = Some((&commit.rev).into()); 235 repo_state.data = Some(repo_commit.data); 236 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 237 238 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 239 240 // store all blocks in the CAS 241 for (cid, bytes) in &parsed.blocks { 242 batch.insert(&db.blocks, cid.to_bytes(), bytes.to_vec()); 243 } 244 245 // 2. iterate ops and update records index 246 let mut records_delta = 0; 247 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 248 249 for op in &commit.ops { 250 let (collection, rkey) = parse_path(&op.path)?; 251 let rkey = DbRkey::new(rkey); 252 let partition = db.record_partition(collection)?; 253 let db_key = keys::record_key(did, &rkey); 254 255 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 256 257 let action = DbAction::try_from(op.action.as_str())?; 258 match action { 259 DbAction::Create | DbAction::Update => { 260 let Some(cid) = &op.cid else { 261 continue; 262 }; 263 batch.insert( 264 &partition, 265 db_key.clone(), 266 cid.to_ipld() 267 .into_diagnostic() 268 .wrap_err("expected valid cid from relay")? 269 .to_bytes(), 270 ); 271 272 // accumulate counts 273 if action == DbAction::Create { 274 records_delta += 1; 275 *collection_deltas.entry(collection).or_default() += 1; 276 } 277 } 278 DbAction::Delete => { 279 batch.remove(&partition, db_key); 280 281 // accumulate counts 282 records_delta -= 1; 283 *collection_deltas.entry(collection).or_default() -= 1; 284 } 285 } 286 287 let evt = StoredEvent { 288 live: true, 289 did: TrimmedDid::from(did), 290 rev: DbTid::from(&commit.rev), 291 collection: CowStr::Borrowed(collection), 292 rkey, 293 action, 294 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")), 295 }; 296 297 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 298 batch.insert(&db.events, keys::event_key(event_id), bytes); 299 } 300 301 // update counts 302 let blocks_count = parsed.blocks.len() as i64; 303 for (col, delta) in collection_deltas { 304 db::update_record_count(batch, db, did, col, delta)?; 305 } 306 307 Ok(ApplyCommitResults { 308 repo_state, 309 records_delta, 310 blocks_count, 311 }) 312} 313 314pub fn parse_path(path: &str) -> Result<(&str, &str)> { 315 let mut parts = path.splitn(2, '/'); 316 let collection = parts.next().wrap_err("missing collection")?; 317 let rkey = parts.next().wrap_err("missing rkey")?; 318 Ok((collection, rkey)) 319}