at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 02165cb84b2914aaedb64e22cb80ef641edd02ce 233 lines 7.7 kB view raw
1use crate::db::{keys, Db}; 2use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, StoredEvent}; 3use jacquard::api::com_atproto::sync::subscribe_repos::Commit; 4use jacquard::cowstr::ToCowStr; 5use jacquard_repo::car::reader::parse_car_bytes; 6use miette::{IntoDiagnostic, Result}; 7use smol_str::{SmolStr, ToSmolStr}; 8use std::collections::HashMap; 9use std::sync::atomic::Ordering; 10use std::time::Instant; 11use tracing::{debug, trace}; 12 13// emitting identity is ephemeral 14// we dont replay these, consumers can just fetch identity themselves if they need it 15pub fn emit_identity_event(db: &Db, evt: IdentityEvt) { 16 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 17 let marshallable = MarshallableEvt { 18 id: event_id, 19 event_type: "identity".into(), 20 record: None, 21 identity: Some(evt), 22 account: None, 23 }; 24 let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 25} 26 27pub fn emit_account_event(db: &Db, evt: AccountEvt) { 28 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 29 let marshallable = MarshallableEvt { 30 id: event_id, 31 event_type: "account".into(), 32 record: None, 33 identity: None, 34 account: Some(evt), 35 }; 36 let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 37} 38 39pub fn delete_repo(db: &Db, did: &jacquard::types::did::Did) -> Result<()> { 40 debug!("deleting repo {did}"); 41 let mut batch = db.inner.batch(); 42 let repo_key = keys::repo_key(did); 43 44 // 1. delete from repos, pending, resync 45 batch.remove(&db.repos, repo_key); 46 batch.remove(&db.pending, repo_key); 47 batch.remove(&db.resync, repo_key); 48 49 // 2. delete from buffer (prefix: repo_key + SEP) 50 let mut buffer_prefix = repo_key.to_vec(); 51 buffer_prefix.push(keys::SEP); 52 for guard in db.buffer.prefix(&buffer_prefix) { 53 let k = guard.key().into_diagnostic()?; 54 batch.remove(&db.buffer, k); 55 } 56 57 // 3. delete from records (prefix: repo_key + SEP) 58 let mut records_prefix = repo_key.to_vec(); 59 records_prefix.push(keys::SEP); 60 let mut deleted_count = 0; 61 62 for guard in db.records.prefix(&records_prefix) { 63 let k = guard.key().into_diagnostic()?; 64 batch.remove(&db.records, k); 65 deleted_count += 1; 66 } 67 68 // 4. reset collection counts 69 let mut count_prefix = Vec::new(); 70 count_prefix.push(b'r'); 71 count_prefix.push(keys::SEP); 72 count_prefix.extend_from_slice(keys::did_prefix(did).as_bytes()); 73 count_prefix.push(keys::SEP); 74 75 for guard in db.counts.prefix(&count_prefix) { 76 let k = guard.key().into_diagnostic()?; 77 batch.remove(&db.counts, k); 78 } 79 80 batch.commit().into_diagnostic()?; 81 82 // update global record count 83 if deleted_count > 0 { 84 tokio::spawn(db.increment_count(keys::count_keyspace_key("records"), -deleted_count)); 85 } 86 87 Ok(()) 88} 89 90pub fn update_repo_status( 91 db: &Db, 92 did: &jacquard::types::did::Did, 93 status: crate::types::RepoStatus, 94) -> Result<()> { 95 debug!("updating repo status for {did} to {status:?}"); 96 let (updated, batch) = 97 Db::update_repo_state(db.inner.batch(), &db.repos, did, |state, _val| { 98 state.status = status.clone(); 99 state.last_updated_at = chrono::Utc::now().timestamp(); 100 Ok((true, ())) 101 })?; 102 103 if updated.is_some() { 104 batch.commit().into_diagnostic()?; 105 } 106 Ok(()) 107} 108 109pub fn apply_commit(db: &Db, commit: &Commit<'_>, live: bool) -> Result<()> { 110 let did = &commit.repo; 111 debug!("applying commit {} for {did}", &commit.commit); 112 113 // 1. parse CAR blocks and store them in CAS 114 let start = Instant::now(); 115 let parsed = tokio::task::block_in_place(|| { 116 tokio::runtime::Handle::current() 117 .block_on(parse_car_bytes(commit.blocks.as_ref())) 118 .into_diagnostic() 119 })?; 120 121 trace!("parsed car for {did} in {:?}", start.elapsed()); 122 123 let (_, mut batch) = Db::update_repo_state(db.inner.batch(), &db.repos, did, |state, _| { 124 state.rev = commit.rev.as_str().into(); 125 state.data = parsed.root.to_smolstr(); 126 state.last_updated_at = chrono::Utc::now().timestamp(); 127 Ok((true, ())) 128 })?; 129 130 // store all blocks in the CAS 131 for (cid, bytes) in &parsed.blocks { 132 batch.insert( 133 &db.blocks, 134 keys::block_key(&cid.to_cowstr()), 135 bytes.to_vec(), 136 ); 137 } 138 139 // 2. iterate ops and update records index 140 let mut records_delta = 0; 141 let mut events_count = 0; 142 let mut collection_deltas: HashMap<SmolStr, i64> = HashMap::new(); 143 144 for op in &commit.ops { 145 let parts: Vec<&str> = op.path.splitn(2, '/').collect(); 146 if parts.len() != 2 { 147 continue; 148 } 149 let collection = parts[0]; 150 let rkey = parts[1]; 151 152 let db_key = keys::record_key(did, collection, rkey); 153 154 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 155 156 let mut cid_str = None; 157 158 match op.action.as_str() { 159 "create" | "update" => { 160 let Some(cid) = &op.cid else { 161 continue; 162 }; 163 let s = smol_str::SmolStr::from(cid.as_str()); 164 batch.insert(&db.records, db_key, s.as_bytes().to_vec()); 165 cid_str = Some(s); 166 167 // accumulate counts 168 if op.action.as_str() == "create" { 169 records_delta += 1; 170 *collection_deltas 171 .entry(collection.to_smolstr()) 172 .or_default() += 1; 173 } 174 } 175 "delete" => { 176 batch.remove(&db.records, db_key); 177 178 // accumulate counts 179 records_delta -= 1; 180 *collection_deltas 181 .entry(collection.to_smolstr()) 182 .or_default() -= 1; 183 } 184 _ => {} 185 } 186 187 let evt = StoredEvent::Record { 188 live, 189 did: did.as_str().into(), 190 rev: commit.rev.as_str().into(), 191 collection: collection.into(), 192 rkey: rkey.into(), 193 action: op.action.as_str().into(), 194 cid: cid_str, 195 }; 196 197 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 198 batch.insert(&db.events, keys::event_key(event_id as i64), bytes); 199 events_count += 1; 200 } 201 202 let start = Instant::now(); 203 204 batch.commit().into_diagnostic()?; 205 trace!("committed sync batch for {did} in {:?}", start.elapsed()); 206 207 let blocks_count = parsed.blocks.len() as i64; 208 tokio::spawn({ 209 let blocks_fut = (blocks_count > 0) 210 .then(|| db.increment_count(keys::count_keyspace_key("blocks"), blocks_count)); 211 let records_fut = (records_delta != 0) 212 .then(|| db.increment_count(keys::count_keyspace_key("records"), records_delta)); 213 let events_fut = (events_count > 0) 214 .then(|| db.increment_count(keys::count_keyspace_key("events"), events_count)); 215 let collections_fut = collection_deltas 216 .into_iter() 217 .map(|(col, delta)| db.increment_count(keys::count_collection_key(&did, &col), delta)) 218 .collect::<Vec<_>>(); 219 futures::future::join_all( 220 blocks_fut 221 .into_iter() 222 .chain(records_fut) 223 .chain(events_fut) 224 .chain(collections_fut), 225 ) 226 }); 227 228 let _ = db.event_tx.send(BroadcastEvent::Persisted( 229 db.next_event_id.load(Ordering::SeqCst) - 1, 230 )); 231 232 Ok(()) 233}