at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at ffa704fdf0884cad263253b2cc44d87c8fb5f4fe 343 lines 11 kB view raw
1use fjall::OwnedWriteBatch; 2use jacquard_common::CowStr; 3use jacquard_common::IntoStatic; 4use jacquard_common::types::cid::Cid; 5use jacquard_common::types::crypto::PublicKey; 6use jacquard_common::types::did::Did; 7use jacquard_repo::car::reader::parse_car_bytes; 8use miette::{Context, IntoDiagnostic, Result}; 9use rand::{Rng, rng}; 10use std::collections::HashMap; 11use std::sync::atomic::Ordering; 12use std::time::Instant; 13use tracing::{debug, trace}; 14 15use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 16use crate::db::{self, Db, keys, ser_repo_state}; 17use crate::filter::FilterConfig; 18use crate::ingest::stream::Commit; 19use crate::types::{ 20 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 21 StoredEvent, 22}; 23 24pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> { 25 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev)); 26 let value = rmp_serde::to_vec(commit).into_diagnostic()?; 27 db.resync_buffer.insert(key, value).into_diagnostic()?; 28 debug!( 29 did = %did, 30 seq = commit.seq, 31 "buffered commit to resync_buffer" 32 ); 33 Ok(()) 34} 35 36pub fn has_buffered_commits(db: &Db, did: &Did) -> bool { 37 let prefix = keys::resync_buffer_prefix(did); 38 db.resync_buffer.prefix(&prefix).next().is_some() 39} 40 41// emitting identity is ephemeral 42// we dont replay these, consumers can just fetch identity themselves if they need it 43pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent { 44 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 45 let marshallable = MarshallableEvt { 46 id: event_id, 47 event_type: "identity".into(), 48 record: None, 49 identity: Some(evt), 50 account: None, 51 }; 52 BroadcastEvent::Ephemeral(marshallable) 53} 54 55pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent { 56 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 57 let marshallable = MarshallableEvt { 58 id: event_id, 59 event_type: "account".into(), 60 record: None, 61 identity: None, 62 account: Some(evt), 63 }; 64 BroadcastEvent::Ephemeral(marshallable) 65} 66 67pub fn delete_repo<'batch>( 68 batch: &'batch mut OwnedWriteBatch, 69 db: &Db, 70 did: &Did, 71 repo_state: &RepoState, 72) -> Result<()> { 73 debug!(did = %did, "deleting repo"); 74 75 let repo_key = keys::repo_key(did); 76 let pending_key = keys::pending_key(repo_state.index_id); 77 78 // 1. delete from repos, pending, resync 79 batch.remove(&db.repos, &repo_key); 80 match repo_state.status { 81 RepoStatus::Synced => {} 82 RepoStatus::Backfilling => { 83 batch.remove(&db.pending, &pending_key); 84 } 85 _ => { 86 batch.remove(&db.resync, &repo_key); 87 } 88 } 89 90 // 2. delete from resync buffer 91 let resync_prefix = keys::resync_buffer_prefix(did); 92 for guard in db.resync_buffer.prefix(&resync_prefix) { 93 let k = guard.key().into_diagnostic()?; 94 batch.remove(&db.resync_buffer, k); 95 } 96 97 // 3. delete from records 98 let records_prefix = keys::record_prefix_did(did); 99 for guard in db.records.prefix(&records_prefix) { 100 let k = guard.key().into_diagnostic()?; 101 batch.remove(&db.records, k); 102 } 103 104 // 4. reset collection counts 105 let mut count_prefix = Vec::new(); 106 count_prefix.push(b'r'); 107 count_prefix.push(keys::SEP); 108 TrimmedDid::from(did).write_to_vec(&mut count_prefix); 109 count_prefix.push(keys::SEP); 110 111 for guard in db.counts.prefix(&count_prefix) { 112 let k = guard.key().into_diagnostic()?; 113 batch.remove(&db.counts, k); 114 } 115 116 Ok(()) 117} 118 119pub fn update_repo_status<'batch, 's>( 120 batch: &'batch mut OwnedWriteBatch, 121 db: &Db, 122 did: &Did, 123 mut repo_state: RepoState<'s>, 124 new_status: RepoStatus, 125) -> Result<RepoState<'s>> { 126 debug!(did = %did, status = ?new_status, "updating repo status"); 127 128 let repo_key = keys::repo_key(did); 129 let pending_key = keys::pending_key(repo_state.index_id); 130 131 // manage queues 132 match &new_status { 133 RepoStatus::Synced => { 134 batch.remove(&db.pending, &pending_key); 135 // we dont have to remove from resync here because it has to transition resync -> pending first 136 } 137 RepoStatus::Backfilling => { 138 // if we are coming from an error state, remove from resync 139 if !matches!(repo_state.status, RepoStatus::Synced) { 140 batch.remove(&db.resync, &repo_key); 141 } 142 // remove the old entry 143 batch.remove(&db.pending, &pending_key); 144 // add as new entry 145 repo_state.index_id = rng().next_u64(); 146 batch.insert( 147 &db.pending, 148 keys::pending_key(repo_state.index_id), 149 &repo_key, 150 ); 151 } 152 RepoStatus::Error(_msg) => { 153 batch.remove(&db.pending, &pending_key); 154 // TODO: we need to make errors have kind instead of "message" in repo status 155 // and then pass it to resync error kind 156 let resync_state = crate::types::ResyncState::Error { 157 kind: crate::types::ResyncErrorKind::Generic, 158 retry_count: 0, 159 next_retry: chrono::Utc::now().timestamp(), 160 }; 161 batch.insert( 162 &db.resync, 163 &repo_key, 164 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 165 ); 166 } 167 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => { 168 // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states 169 // batch.remove(&db.pending, &pending_key); 170 let resync_state = ResyncState::Gone { 171 status: new_status.clone(), 172 }; 173 batch.insert( 174 &db.resync, 175 &repo_key, 176 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 177 ); 178 } 179 } 180 181 repo_state.status = new_status; 182 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 183 184 batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?); 185 186 Ok(repo_state) 187} 188 189pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 190 let parsed = tokio::task::block_in_place(|| { 191 tokio::runtime::Handle::current() 192 .block_on(parse_car_bytes(blocks)) 193 .into_diagnostic() 194 })?; 195 196 let root_bytes = parsed 197 .blocks 198 .get(&parsed.root) 199 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 200 201 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 202 203 if let Some(key) = key { 204 repo_commit 205 .verify(key) 206 .map_err(|e| miette::miette!("signature verification failed: {e}"))?; 207 } 208 209 Ok(( 210 Cid::ipld(repo_commit.data).into_static(), 211 repo_commit.rev.to_string(), 212 )) 213} 214 215pub struct ApplyCommitResults<'s> { 216 pub repo_state: RepoState<'s>, 217 pub records_delta: i64, 218 pub blocks_count: i64, 219} 220 221pub fn apply_commit<'batch, 'db, 'commit, 's>( 222 batch: &'batch mut OwnedWriteBatch, 223 db: &'db Db, 224 mut repo_state: RepoState<'s>, 225 commit: &'commit Commit<'commit>, 226 signing_key: Option<&PublicKey>, 227 filter: &FilterConfig, 228) -> Result<ApplyCommitResults<'s>> { 229 let did = &commit.repo; 230 debug!(did = %did, commit = %commit.commit, "applying commit"); 231 232 // 1. parse CAR blocks and store them in CAS 233 let start = Instant::now(); 234 let parsed = tokio::task::block_in_place(|| { 235 tokio::runtime::Handle::current() 236 .block_on(parse_car_bytes(commit.blocks.as_ref())) 237 .into_diagnostic() 238 })?; 239 240 trace!(did = %did, elapsed = ?start.elapsed(), "parsed car"); 241 242 let root_bytes = parsed 243 .blocks 244 .get(&parsed.root) 245 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 246 247 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 248 249 if let Some(key) = signing_key { 250 repo_commit 251 .verify(key) 252 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 253 trace!(did = %did, "signature verified"); 254 } 255 256 repo_state.rev = Some((&commit.rev).into()); 257 repo_state.data = Some(repo_commit.data); 258 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 259 260 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 261 262 // 2. iterate ops and update records index 263 let mut records_delta = 0; 264 let mut blocks_count = 0; 265 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 266 267 for op in &commit.ops { 268 let (collection, rkey) = parse_path(&op.path)?; 269 270 if !filter.matches_collection(collection) { 271 continue; 272 } 273 274 let rkey = DbRkey::new(rkey); 275 let db_key = keys::record_key(did, collection, &rkey); 276 277 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 278 279 let action = DbAction::try_from(op.action.as_str())?; 280 match action { 281 DbAction::Create | DbAction::Update => { 282 let Some(cid) = &op.cid else { 283 continue; 284 }; 285 let cid_ipld = cid 286 .to_ipld() 287 .into_diagnostic() 288 .wrap_err("expected valid cid from relay")?; 289 290 if let Some(bytes) = parsed.blocks.get(&cid_ipld) { 291 batch.insert(&db.blocks, cid_ipld.to_bytes(), bytes.to_vec()); 292 blocks_count += 1; 293 } 294 295 batch.insert(&db.records, db_key.clone(), cid_ipld.to_bytes()); 296 297 // accumulate counts 298 if action == DbAction::Create { 299 records_delta += 1; 300 *collection_deltas.entry(collection).or_default() += 1; 301 } 302 } 303 DbAction::Delete => { 304 batch.remove(&db.records, db_key); 305 306 // accumulate counts 307 records_delta -= 1; 308 *collection_deltas.entry(collection).or_default() -= 1; 309 } 310 } 311 312 let evt = StoredEvent { 313 live: true, 314 did: TrimmedDid::from(did), 315 rev: DbTid::from(&commit.rev), 316 collection: CowStr::Borrowed(collection), 317 rkey, 318 action, 319 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")), 320 }; 321 322 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 323 batch.insert(&db.events, keys::event_key(event_id), bytes); 324 } 325 326 // update counts 327 for (col, delta) in collection_deltas { 328 db::update_record_count(batch, db, did, col, delta)?; 329 } 330 331 Ok(ApplyCommitResults { 332 repo_state, 333 records_delta, 334 blocks_count, 335 }) 336} 337 338pub fn parse_path(path: &str) -> Result<(&str, &str)> { 339 let mut parts = path.splitn(2, '/'); 340 let collection = parts.next().wrap_err("missing collection")?; 341 let rkey = parts.next().wrap_err("missing rkey")?; 342 Ok((collection, rkey)) 343}