at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] dont broadcast duplicate account / identity events

ptr.pet 117317c4 64b2c0ea

verified
+133 -64
+1 -1
src/db/types.rs
··· 360 360 } 361 361 362 362 /// did:key:z... → raw multicodec public key bytes 363 - #[derive(Debug, Clone, Serialize, Deserialize, jacquard_derive::IntoStatic)] 363 + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, jacquard_derive::IntoStatic)] 364 364 pub struct DidKey<'b>( 365 365 #[serde(borrow)] 366 366 #[serde(with = "serde_bytes")]
+102 -33
src/ingest/worker.rs
··· 52 52 enum RepoProcessResult<'s, 'c> { 53 53 Deleted, 54 54 Syncing(Option<&'c Commit<'c>>), 55 - Ok(RepoState<'s>), 55 + Ok { 56 + state: RepoState<'s>, 57 + old_status: RepoStatus, 58 + }, 56 59 } 57 60 58 61 pub struct FirehoseWorker { ··· 191 194 Ok(repo_state) => { 192 195 let repo_state = repo_state.into_static(); 193 196 194 - match Self::drain_resync_buffer(&mut ctx, &did, repo_state) { 197 + let old_status = repo_state.status.clone(); 198 + match Self::drain_resync_buffer( 199 + &mut ctx, &did, repo_state, old_status, 200 + ) { 195 201 Ok(res) => match res { 196 - RepoProcessResult::Ok(s) => { 202 + RepoProcessResult::Ok { state: s, .. } => { 197 203 // TODO: there might be a race condition here where we get a new commit 198 204 // while the resync buffer is being drained, we should handle that probably 199 205 // but also it should still be fine since we'll sync eventually anyway ··· 233 239 }; 234 240 235 241 match Self::process_message(&mut ctx, &msg, did) { 236 - Ok(RepoProcessResult::Ok(_)) => {} 242 + Ok(RepoProcessResult::Ok { .. }) => {} 237 243 Ok(RepoProcessResult::Deleted) => {} 238 244 Ok(RepoProcessResult::Syncing(Some(commit))) => { 239 245 if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) { ··· 302 308 did: &Did, 303 309 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 304 310 let check_repo_res = Self::check_repo_state(ctx, did, msg)?; 305 - let mut repo_state = match check_repo_res { 311 + let (mut repo_state, old_status) = match check_repo_res { 306 312 RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => { 307 313 return Ok(check_repo_res); 308 314 } 309 - RepoProcessResult::Ok(s) => s, 315 + RepoProcessResult::Ok { state, old_status } => (state, old_status), 310 316 }; 311 317 312 318 match msg { 313 319 SubscribeReposMessage::Commit(commit) => { 314 320 trace!(did = %did, "processing buffered commit"); 315 321 316 - return Self::process_commit(ctx, did, repo_state, commit); 322 + let res = Self::process_commit(ctx, did, repo_state, commit)?; 323 + return match res { 324 + RepoProcessResult::Ok { state, .. } => { 325 + Ok(RepoProcessResult::Ok { state, old_status }) 326 + } 327 + other => Ok(other), 328 + }; 317 329 } 318 330 SubscribeReposMessage::Sync(sync) => { 319 331 debug!(did = %did, "processing buffered sync"); ··· 328 340 if let Some(current_data) = &repo_state.data { 329 341 if current_data == &root.to_ipld().expect("valid cid") { 330 342 debug!(did = %did, "skipping noop sync"); 331 - return Ok(RepoProcessResult::Ok(repo_state)); 343 + return Ok(RepoProcessResult::Ok { 344 + state: repo_state, 345 + old_status, 346 + }); 332 347 } 333 348 } 334 349 335 350 if let Some(current_rev) = &repo_state.rev { 336 351 if rev.as_str() <= current_rev.to_tid().as_str() { 337 352 debug!(did = %did, "skipping replayed sync"); 338 - return Ok(RepoProcessResult::Ok(repo_state)); 353 + return Ok(RepoProcessResult::Ok { 354 + state: repo_state, 355 + old_status, 356 + }); 339 357 } 340 358 } 341 359 ··· 353 371 .db 354 372 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 355 373 ctx.state.notify_backfill(); 356 - return Ok(RepoProcessResult::Ok(repo_state)); 374 + return Ok(RepoProcessResult::Ok { 375 + state: repo_state, 376 + old_status, 377 + }); 357 378 } 358 379 Err(e) => { 359 380 error!(did = %did, err = %e, "failed to process sync event"); ··· 363 384 SubscribeReposMessage::Identity(identity) => { 364 385 debug!(did = %did, "processing buffered identity"); 365 386 366 - if identity.handle.is_none() { 387 + let changed = if identity.handle.is_none() { 367 388 // we invalidate only if no handle is sent since its like a 368 389 // "invalidate your caches" message then basically 369 390 ctx.state.resolver.invalidate_sync(did); 370 391 let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 371 - repo_state.update_from_doc(doc); 372 - } 392 + repo_state.update_from_doc(doc) 393 + } else { 394 + let old_handle = repo_state.handle.clone(); 395 + repo_state.handle = identity.handle.clone().or(repo_state.handle); 396 + repo_state.handle != old_handle 397 + }; 373 398 374 - let handle = identity.handle.as_ref().map(|h| h.clone()); 375 - repo_state.handle = handle.or(repo_state.handle); 376 399 ctx.batch.batch_mut().insert( 377 400 &ctx.state.db.repos, 378 401 keys::repo_key(did), 379 402 crate::db::ser_repo_state(&repo_state)?, 380 403 ); 381 404 382 - let evt = IdentityEvt { 383 - did: did.clone().into_static(), 384 - handle: repo_state.handle.clone(), 385 - }; 386 - ctx.broadcast_events 387 - .push(ops::make_identity_event(&ctx.state.db, evt)); 405 + if changed { 406 + let evt = IdentityEvt { 407 + did: did.clone().into_static(), 408 + handle: repo_state.handle.clone(), 409 + }; 410 + ctx.broadcast_events 411 + .push(ops::make_identity_event(&ctx.state.db, evt)); 412 + } 388 413 } 389 414 SubscribeReposMessage::Account(account) => { 390 415 debug!(did = %did, "processing buffered account"); 416 + let was_inactive = matches!( 417 + old_status, 418 + RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 419 + ); 420 + let is_inactive = !account.active; 391 421 let evt = AccountEvt { 392 422 did: did.clone().into_static(), 393 423 active: account.active, ··· 440 470 441 471 if repo_state.status == target_status { 442 472 debug!(did = %did, ?target_status, "account status unchanged"); 443 - return Ok(RepoProcessResult::Ok(repo_state)); 473 + return Ok(RepoProcessResult::Ok { 474 + state: repo_state, 475 + old_status, 476 + }); 444 477 } 445 478 446 479 repo_state = ops::update_repo_status( ··· 461 494 // 1. we handle changing repo status to Synced before this (in check repo state) 462 495 // 2. initiating backfilling is also handled there 463 496 } 464 - ctx.broadcast_events 465 - .push(ops::make_account_event(&ctx.state.db, evt)); 497 + 498 + if was_inactive != is_inactive || repo_state.status != old_status { 499 + ctx.broadcast_events 500 + .push(ops::make_account_event(&ctx.state.db, evt)); 501 + } 466 502 } 467 503 _ => { 468 504 warn!(did = %did, "unknown message type in buffer"); 469 505 } 470 506 } 471 507 472 - Ok(RepoProcessResult::Ok(repo_state)) 508 + Ok(RepoProcessResult::Ok { 509 + state: repo_state, 510 + old_status, 511 + }) 473 512 } 474 513 475 514 fn process_commit<'c, 'ns, 's: 'ns>( ··· 486 525 state_rev = %repo_state.rev.as_ref().map(|r| r.to_tid()).expect("we checked in if"), 487 526 "skipping replayed event" 488 527 ); 489 - return Ok(RepoProcessResult::Ok(repo_state)); 528 + let old_status = repo_state.status.clone(); 529 + return Ok(RepoProcessResult::Ok { 530 + state: repo_state, 531 + old_status, 532 + }); 490 533 } 491 534 492 535 if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) ··· 542 585 - 1, 543 586 )); 544 587 545 - Ok(RepoProcessResult::Ok(repo_state)) 588 + let old_status = repo_state.status.clone(); 589 + Ok(RepoProcessResult::Ok { 590 + state: repo_state, 591 + old_status, 592 + }) 546 593 } 547 594 548 595 // checks the current state of the repo in the database ··· 608 655 return Ok(RepoProcessResult::Syncing(None)); 609 656 }; 610 657 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 658 + let old_status = repo_state.status.clone(); 611 659 612 660 if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling { 613 661 trace!(did = %did, "ignoring active status as it is explicitly untracked"); ··· 620 668 RepoStatus::Synced => { 621 669 // lazy drain: if there are buffered commits, drain them now 622 670 if ops::has_buffered_commits(&ctx.state.db, did) { 623 - Self::drain_resync_buffer(ctx, did, repo_state) 671 + Self::drain_resync_buffer(ctx, did, repo_state, old_status) 624 672 } else { 625 - Ok(RepoProcessResult::Ok(repo_state)) 673 + Ok(RepoProcessResult::Ok { 674 + state: repo_state, 675 + old_status, 676 + }) 626 677 } 627 678 } 628 679 RepoStatus::Backfilling | RepoStatus::Error(_) => { ··· 638 689 // UNLESS it is an account status event that keeps it deactivated 639 690 if let SubscribeReposMessage::Account(acc) = msg { 640 691 if !acc.active { 641 - return Ok(RepoProcessResult::Ok(repo_state)); 692 + return Ok(RepoProcessResult::Ok { 693 + state: repo_state, 694 + old_status, 695 + }); 642 696 } 697 + } else { 698 + // buffer commits and drop everything else until we get an active=true message 699 + return match msg { 700 + SubscribeReposMessage::Commit(commit) => { 701 + Ok(RepoProcessResult::Syncing(Some(commit))) 702 + } 703 + _ => Ok(RepoProcessResult::Syncing(None)), 704 + }; 643 705 } 644 706 repo_state = ops::update_repo_status( 645 707 ctx.batch.batch_mut(), ··· 652 714 &crate::types::GaugeState::Resync(None), 653 715 &crate::types::GaugeState::Synced, 654 716 ); 655 - Ok(RepoProcessResult::Ok(repo_state)) 717 + Ok(RepoProcessResult::Ok { 718 + state: repo_state, 719 + old_status, 720 + }) 656 721 } 657 722 } 658 723 } ··· 661 726 ctx: &mut WorkerContext, 662 727 did: &Did, 663 728 mut repo_state: RepoState<'s>, 729 + old_status: RepoStatus, 664 730 ) -> Result<RepoProcessResult<'s, 'static>, IngestError> { 665 731 let prefix = keys::resync_buffer_prefix(did); 666 732 ··· 681 747 } 682 748 }; 683 749 match res { 684 - RepoProcessResult::Ok(rs) => { 750 + RepoProcessResult::Ok { state: rs, .. } => { 685 751 ctx.batch 686 752 .batch_mut() 687 753 .remove(&ctx.state.db.resync_buffer, key); ··· 699 765 } 700 766 } 701 767 702 - Ok(RepoProcessResult::Ok(repo_state)) 768 + Ok(RepoProcessResult::Ok { 769 + state: repo_state, 770 + old_status, 771 + }) 703 772 } 704 773 705 774 // refreshes the handle, pds url and signing key of a did
+14 -13
src/types.rs
··· 41 41 pub data: Option<IpldCid>, 42 42 /// this is when we *ingested* any last updates 43 43 pub last_updated_at: i64, // unix timestamp 44 - #[serde(borrow)] 45 - pub handle: Option<Handle<'i>>, 46 - pub index_id: u64, 47 - #[serde(default = "default_tracked")] 44 + /// whether we are ingesting events for this repo 48 45 pub tracked: bool, 49 - #[serde(default)] 46 + /// index id in pending keyspace 47 + pub index_id: u64, 48 + #[serde(borrow)] 50 49 pub signing_key: Option<DidKey<'i>>, 51 - #[serde(default)] 50 + #[serde(borrow)] 52 51 pub pds: Option<CowStr<'i>>, 53 - } 54 - 55 - fn default_tracked() -> bool { 56 - true 52 + #[serde(borrow)] 53 + pub handle: Option<Handle<'i>>, 57 54 } 58 55 59 56 impl<'i> RepoState<'i> { ··· 63 60 rev: None, 64 61 data: None, 65 62 last_updated_at: chrono::Utc::now().timestamp(), 66 - 67 63 index_id, 68 64 tracked: true, 69 65 handle: None, ··· 80 76 } 81 77 } 82 78 83 - pub fn update_from_doc(&mut self, doc: MiniDoc) { 79 + pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool { 80 + let new_signing_key = doc.key.map(From::from); 81 + let changed = self.pds.as_deref() != Some(doc.pds.as_str()) 82 + || self.handle != doc.handle 83 + || self.signing_key != new_signing_key; 84 84 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr())); 85 85 self.handle = doc.handle; 86 - self.signing_key = doc.key.map(From::from); 86 + self.signing_key = new_signing_key; 87 + changed 87 88 } 88 89 } 89 90
+16 -17
tests/authenticated_stream_test.nu
··· 5 5 let url = $"http://localhost:($port)" 6 6 let ws_url = $"ws://localhost:($port)/stream" 7 7 let db_path = (mktemp -d -t hydrant_auth_test.XXXXXX) 8 - 8 + 9 9 # 1. authenticate 10 10 print $"authenticating with ($pds_url)..." 11 11 let session = authenticate $pds_url $did $password ··· 18 18 let instance = (with-env { HYDRANT_RELAY_HOSTS: $relays } { 19 19 start-hydrant $binary $db_path $port 20 20 }) 21 - 21 + 22 22 mut test_passed = false 23 23 24 24 if (wait-for-api $url) { 25 25 # 3. start listener (live stream) 26 26 let output_file = $"($db_path)/stream_output.txt" 27 27 print $"starting stream listener -> ($output_file)" 28 - # use websocat to capture output. 28 + # use websocat to capture output. 29 29 let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int) 30 30 print $"listener pid: ($stream_pid)" 31 - 31 + 32 32 # 4. add repo to hydrant (backfill trigger) 33 33 print $"adding repo ($did) to tracking..." 34 34 try { ··· 36 36 } catch { 37 37 print "warning: failed to add repo (might already be tracked), continuing..." 38 38 } 39 - 39 + 40 40 sleep 5sec 41 41 42 42 # 5. perform actions ··· 56 56 57 57 print "--- action: update ---" 58 58 let update_data = ($record_data | update text $"updated text ($timestamp)") 59 - 59 + 60 60 try { 61 61 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.putRecord" { 62 62 repo: $did, ··· 88 88 89 89 print "--- action: activate ---" 90 90 activate-account $pds_url $jwt 91 - 91 + 92 92 # 6. verify 93 93 sleep 3sec 94 94 print "stopping listener..." 95 95 try { kill -9 $stream_pid } 96 - 96 + 97 97 if ($output_file | path exists) { 98 98 let content = (open $output_file | str trim) 99 99 if ($content | is-empty) { ··· 124 124 { |e| $e.type == "record" and $e.record.action == "update" }, 125 125 { |e| $e.type == "record" and $e.record.action == "delete" }, 126 126 { |e| $e.type == "account" and $e.account.active == false }, 127 - { |e| $e.type == "account" and $e.account.active == true }, 128 - { |e| $e.type == "identity" and $e.identity.did == $did } 127 + { |e| $e.type == "account" and $e.account.active == true } 129 128 ] 130 129 131 130 if ($relevant_events | length) != ($checks | length) { ··· 144 143 break 145 144 } 146 145 } 147 - 146 + 148 147 if not $failed { 149 148 print "test success!" 150 149 $test_passed = true ··· 164 163 # cleanup 165 164 print "cleaning up..." 166 165 try { kill -9 $instance.pid } 167 - 166 + 168 167 $test_passed 169 168 } 170 169 ··· 172 171 let env_vars = load-env-file 173 172 let did = ($env_vars | get --optional TEST_REPO) 174 173 let password = ($env_vars | get --optional TEST_PASSWORD) 175 - 174 + 176 175 if ($did | is-empty) or ($password | is-empty) { 177 176 print "error: TEST_REPO and TEST_PASSWORD must be set in .env" 178 177 exit 1 179 178 } 180 179 181 180 let pds_url = resolve-pds $did 182 - 181 + 183 182 # ensure build 184 183 build-hydrant | ignore 185 - 184 + 186 185 print "=== running single-relay test ===" 187 186 let relay1 = "wss://relay.fire.hose.cam" 188 187 let success1 = run-auth-test $did $password $pds_url $relay1 3005 189 - 188 + 190 189 print "" 191 190 print "=== running multi-relay test ===" 192 191 let relay_multi = "wss://relay.fire.hose.cam,wss://relay3.fr.hose.cam,wss://relay1.us-west.bsky.network,wss://relay1.us-east.bsky.network" 193 192 let success2 = run-auth-test $did $password $pds_url $relay_multi 3015 194 - 193 + 195 194 if $success1 and $success2 { 196 195 print "" 197 196 print "ALL AUTHENTICATED STREAM TESTS PASSED"