kind of like tap but different and in rust
at main 344 lines 11 kB view raw
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2use crate::db::{self, Db, keys, ser_repo_state}; 3use crate::filter::FilterConfig; 4use crate::types::{ 5 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 6 StoredEvent, 7}; 8use fjall::OwnedWriteBatch; 9use jacquard::CowStr; 10use jacquard::IntoStatic; 11 12use jacquard::types::cid::Cid; 13use jacquard::types::did::Did; 14use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 15use jacquard_common::types::crypto::PublicKey; 16use jacquard_repo::car::reader::parse_car_bytes; 17use miette::{Context, IntoDiagnostic, Result}; 18use rand::{Rng, rng}; 19use std::collections::HashMap; 20use std::sync::atomic::Ordering; 21use std::time::Instant; 22use tracing::{debug, trace}; 23 24pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> { 25 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev)); 26 let value = rmp_serde::to_vec(commit).into_diagnostic()?; 27 db.resync_buffer.insert(key, value).into_diagnostic()?; 28 debug!( 29 "buffered commit seq {} for {did} to resync_buffer", 30 commit.seq 31 ); 32 Ok(()) 33} 34 35pub fn has_buffered_commits(db: &Db, did: &Did) -> bool { 36 let prefix = keys::resync_buffer_prefix(did); 37 db.resync_buffer.prefix(&prefix).next().is_some() 38} 39 40// emitting identity is ephemeral 41// we dont replay these, consumers can just fetch identity themselves if they need it 42pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent { 43 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 44 let marshallable = MarshallableEvt { 45 id: event_id, 46 event_type: "identity".into(), 47 record: None, 48 identity: Some(evt), 49 account: None, 50 }; 51 BroadcastEvent::Ephemeral(marshallable) 52} 53 54pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent { 55 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 56 let marshallable = MarshallableEvt { 57 id: event_id, 58 event_type: "account".into(), 59 record: None, 60 identity: None, 61 account: Some(evt), 62 }; 63 BroadcastEvent::Ephemeral(marshallable) 64} 65 66pub fn delete_repo<'batch>( 67 batch: &'batch mut OwnedWriteBatch, 68 db: &Db, 69 did: &jacquard::types::did::Did, 70 repo_state: RepoState, 71) -> Result<()> { 72 debug!("deleting repo {did}"); 73 74 let repo_key = keys::repo_key(did); 75 let pending_key = keys::pending_key(repo_state.index_id); 76 77 // 1. delete from repos, pending, resync 78 batch.remove(&db.repos, &repo_key); 79 match repo_state.status { 80 RepoStatus::Synced => {} 81 RepoStatus::Backfilling => { 82 batch.remove(&db.pending, &pending_key); 83 } 84 _ => { 85 batch.remove(&db.resync, &repo_key); 86 } 87 } 88 89 // 2. delete from resync buffer 90 let resync_prefix = keys::resync_buffer_prefix(did); 91 for guard in db.resync_buffer.prefix(&resync_prefix) { 92 let k = guard.key().into_diagnostic()?; 93 batch.remove(&db.resync_buffer, k); 94 } 95 96 // 3. delete from records 97 let records_prefix = keys::record_prefix_did(did); 98 for guard in db.records.prefix(&records_prefix) { 99 let k = guard.key().into_diagnostic()?; 100 batch.remove(&db.records, k); 101 } 102 103 // 4. reset collection counts 104 let mut count_prefix = Vec::new(); 105 count_prefix.push(b'r'); 106 count_prefix.push(keys::SEP); 107 TrimmedDid::from(did).write_to_vec(&mut count_prefix); 108 count_prefix.push(keys::SEP); 109 110 for guard in db.counts.prefix(&count_prefix) { 111 let k = guard.key().into_diagnostic()?; 112 batch.remove(&db.counts, k); 113 } 114 115 Ok(()) 116} 117 118pub fn update_repo_status<'batch, 's>( 119 batch: &'batch mut OwnedWriteBatch, 120 db: &Db, 121 did: &jacquard::types::did::Did, 122 mut repo_state: RepoState<'s>, 123 new_status: RepoStatus, 124) -> Result<RepoState<'s>> { 125 debug!("updating repo status for {did} to {new_status:?}"); 126 127 let repo_key = keys::repo_key(did); 128 let pending_key = keys::pending_key(repo_state.index_id); 129 130 // manage queues 131 match &new_status { 132 RepoStatus::Synced => { 133 batch.remove(&db.pending, &pending_key); 134 // we dont have to remove from resync here because it has to transition resync -> pending first 135 } 136 RepoStatus::Backfilling => { 137 // if we are coming from an error state, remove from resync 138 if !matches!(repo_state.status, RepoStatus::Synced) { 139 batch.remove(&db.resync, &repo_key); 140 } 141 // remove the old entry 142 batch.remove(&db.pending, &pending_key); 143 // add as new entry 144 repo_state.index_id = rng().next_u64(); 145 batch.insert( 146 &db.pending, 147 keys::pending_key(repo_state.index_id), 148 &repo_key, 149 ); 150 } 151 RepoStatus::Error(_msg) => { 152 batch.remove(&db.pending, &pending_key); 153 // TODO: we need to make errors have kind instead of "message" in repo status 154 // and then pass it to resync error kind 155 let resync_state = crate::types::ResyncState::Error { 156 kind: crate::types::ResyncErrorKind::Generic, 157 retry_count: 0, 158 next_retry: chrono::Utc::now().timestamp(), 159 }; 160 batch.insert( 161 &db.resync, 162 &repo_key, 163 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 164 ); 165 } 166 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => { 167 // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states 168 // batch.remove(&db.pending, &pending_key); 169 let resync_state = ResyncState::Gone { 170 status: new_status.clone(), 171 }; 172 batch.insert( 173 &db.resync, 174 &repo_key, 175 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 176 ); 177 } 178 } 179 180 repo_state.status = new_status; 181 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 182 183 batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?); 184 185 Ok(repo_state) 186} 187 188pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 189 let parsed = tokio::task::block_in_place(|| { 190 tokio::runtime::Handle::current() 191 .block_on(parse_car_bytes(blocks)) 192 .into_diagnostic() 193 })?; 194 195 let root_bytes = parsed 196 .blocks 197 .get(&parsed.root) 198 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 199 200 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 201 202 if let Some(key) = key { 203 repo_commit 204 .verify(key) 205 .map_err(|e| miette::miette!("signature verification failed: {e}"))?; 206 } 207 208 Ok(( 209 Cid::ipld(repo_commit.data).into_static(), 210 repo_commit.rev.to_string(), 211 )) 212} 213 214pub struct ApplyCommitResults<'s> { 215 pub repo_state: RepoState<'s>, 216 pub records_delta: i64, 217 pub blocks_count: i64, 218} 219 220pub fn apply_commit<'batch, 'db, 'commit, 's>( 221 batch: &'batch mut OwnedWriteBatch, 222 db: &'db Db, 223 mut repo_state: RepoState<'s>, 224 commit: &'commit Commit<'commit>, 225 signing_key: Option<&PublicKey>, 226 filter: &FilterConfig, 227) -> Result<ApplyCommitResults<'s>> { 228 let did = &commit.repo; 229 debug!("applying commit {} for {did}", &commit.commit); 230 231 // 1. parse CAR blocks and store them in CAS 232 let start = Instant::now(); 233 let parsed = tokio::task::block_in_place(|| { 234 tokio::runtime::Handle::current() 235 .block_on(parse_car_bytes(commit.blocks.as_ref())) 236 .into_diagnostic() 237 })?; 238 239 trace!("parsed car for {did} in {:?}", start.elapsed()); 240 241 let root_bytes = parsed 242 .blocks 243 .get(&parsed.root) 244 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 245 246 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 247 248 if let Some(key) = signing_key { 249 repo_commit 250 .verify(key) 251 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 252 trace!("signature verified for {did}"); 253 } 254 255 repo_state.rev = Some((&commit.rev).into()); 256 repo_state.data = Some(repo_commit.data); 257 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 258 259 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 260 261 // store all blocks in the CAS 262 for (cid, bytes) in &parsed.blocks { 263 batch.insert(&db.blocks, cid.to_bytes(), bytes.to_vec()); 264 } 265 266 // 2. iterate ops and update records index 267 let mut records_delta = 0; 268 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 269 270 for op in &commit.ops { 271 let (collection, rkey) = parse_path(&op.path)?; 272 273 if !filter.matches_collection(collection) { 274 continue; 275 } 276 277 let rkey = DbRkey::new(rkey); 278 let db_key = keys::record_key(did, collection, &rkey); 279 280 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 281 282 let action = DbAction::try_from(op.action.as_str())?; 283 match action { 284 DbAction::Create | DbAction::Update => { 285 let Some(cid) = &op.cid else { 286 continue; 287 }; 288 batch.insert( 289 &db.records, 290 db_key.clone(), 291 cid.to_ipld() 292 .into_diagnostic() 293 .wrap_err("expected valid cid from relay")? 294 .to_bytes(), 295 ); 296 297 // accumulate counts 298 if action == DbAction::Create { 299 records_delta += 1; 300 *collection_deltas.entry(collection).or_default() += 1; 301 } 302 } 303 DbAction::Delete => { 304 batch.remove(&db.records, db_key); 305 306 // accumulate counts 307 records_delta -= 1; 308 *collection_deltas.entry(collection).or_default() -= 1; 309 } 310 } 311 312 let evt = StoredEvent { 313 live: true, 314 did: TrimmedDid::from(did), 315 rev: DbTid::from(&commit.rev), 316 collection: CowStr::Borrowed(collection), 317 rkey, 318 action, 319 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")), 320 }; 321 322 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 323 batch.insert(&db.events, keys::event_key(event_id), bytes); 324 } 325 326 // update counts 327 let blocks_count = parsed.blocks.len() as i64; 328 for (col, delta) in collection_deltas { 329 db::update_record_count(batch, db, did, col, delta)?; 330 } 331 332 Ok(ApplyCommitResults { 333 repo_state, 334 records_delta, 335 blocks_count, 336 }) 337} 338 339pub fn parse_path(path: &str) -> Result<(&str, &str)> { 340 let mut parts = path.splitn(2, '/'); 341 let collection = parts.next().wrap_err("missing collection")?; 342 let rkey = parts.next().wrap_err("missing rkey")?; 343 Ok((collection, rkey)) 344}