at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

use correct cid for repo data, eg. root_commit.data not parsed.root

ptr.pet 7d1c28db 19362e3d

verified
+158 -54
+23 -23
src/backfill/mod.rs
··· 9 9 use jacquard::types::did::Did; 10 10 use jacquard::{CowStr, IntoStatic, prelude::*}; 11 11 use jacquard_common::xrpc::XrpcError; 12 - use jacquard_repo::MemoryBlockStore; 13 12 use jacquard_repo::mst::Mst; 13 + use jacquard_repo::{BlockStore, MemoryBlockStore}; 14 14 use miette::{IntoDiagnostic, Result}; 15 15 use smol_str::{SmolStr, ToSmolStr, format_smolstr}; 16 16 use std::collections::HashMap; ··· 305 305 emit_identity(&state.status); 306 306 307 307 trace!( 308 - "fetched {} bytes for {} in {:?}", 308 + "fetched {} bytes for {did} in {:?}", 309 309 car_bytes.body.len(), 310 - did, 311 310 start.elapsed() 312 311 ); 313 312 ··· 316 315 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 317 316 .await 318 317 .into_diagnostic()?; 319 - trace!("parsed car for {} in {:?}", did, start.elapsed()); 318 + trace!("parsed car for {did} in {:?}", start.elapsed()); 320 319 321 320 let start = Instant::now(); 322 - let store = Arc::new(MemoryBlockStore::new()); 323 - for (_cid, bytes) in &parsed.blocks { 324 - jacquard_repo::BlockStore::put(store.as_ref(), bytes) 325 - .await 326 - .into_diagnostic()?; 327 - } 321 + let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 328 322 trace!( 329 - "stored {} blocks in memory for {} in {:?}", 330 - parsed.blocks.len(), 331 - did, 323 + "stored {} blocks in memory for {did} in {:?}", 324 + store.len(), 332 325 start.elapsed() 333 326 ); 334 327 335 328 // 4. parse root commit to get mst root 336 - let root_bytes = parsed 337 - .blocks 329 + let root_bytes = store 338 330 .get(&parsed.root) 331 + .await 332 + .into_diagnostic()? 339 333 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 340 334 341 - let commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 342 - debug!("backfilling repo at revision {}", commit.rev); 335 + let root_commit = 336 + jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?; 337 + debug!( 338 + "backfilling repo at revision {}, root cid {}", 339 + root_commit.rev, root_commit.data 340 + ); 343 341 344 342 // 4.5. verify commit signature 345 343 if verify_signatures { 346 344 let pubkey = app_state.resolver.resolve_signing_key(did).await?; 347 - commit 345 + root_commit 348 346 .verify(&pubkey) 349 347 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 350 348 trace!("signature verified for {did}"); ··· 352 350 353 351 // 5. walk mst 354 352 let start = Instant::now(); 355 - let mst: Mst<MemoryBlockStore> = Mst::load(store, commit.data, None); 353 + let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); 356 354 let leaves = mst.leaves().await.into_diagnostic()?; 357 355 trace!("walked mst for {} in {:?}", did, start.elapsed()); 358 356 ··· 361 359 let (_state, added_records, added_blocks, count) = { 362 360 let app_state = app_state.clone(); 363 361 let did = did.clone(); 364 - let rev = commit.rev; 365 - let storage = mst.storage().clone(); 362 + let rev = root_commit.rev; 366 363 367 364 tokio::task::spawn_blocking(move || { 368 365 let mut count = 0; ··· 370 367 let mut added_blocks = 0; 371 368 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 372 369 let mut batch = app_state.db.inner.batch(); 370 + let store = mst.storage(); 373 371 374 372 // pre-load existing record CIDs for this DID to detect duplicates/updates 375 373 let prefix = keys::record_prefix(&did); ··· 385 383 386 384 for (key, cid) in leaves { 387 385 let val_bytes = tokio::runtime::Handle::current() 388 - .block_on(jacquard_repo::BlockStore::get(storage.as_ref(), &cid)) 386 + .block_on(store.get(&cid)) 389 387 .into_diagnostic()?; 390 388 391 389 if let Some(val) = val_bytes { ··· 398 396 let (action, is_new) = if let Some(existing_cid) = existing_cids.get(&path) 399 397 { 400 398 if existing_cid == cid.as_str() { 399 + debug!("skip {did}/{collection}/{rkey} ({cid})"); 401 400 continue; // skip unchanged record 402 401 } 403 402 ("update", false) 404 403 } else { 405 404 ("create", true) 406 405 }; 406 + debug!("{action} {did}/{collection}/{rkey} ({cid})"); 407 407 408 408 let db_key = keys::record_key(&did, &collection, rkey); 409 409 ··· 440 440 // 6. update status to synced 441 441 state.status = RepoStatus::Synced; 442 442 state.rev = Some(rev); 443 - state.data = Some(Cid::ipld(parsed.root)); 443 + state.data = Some(Cid::ipld(root_commit.data)); 444 444 state.last_updated_at = chrono::Utc::now().timestamp(); 445 445 446 446 batch.insert(
+104
src/bin/mst_dump.rs
··· 1 + use std::env; 2 + use std::sync::Arc; 3 + use std::time::Duration; 4 + 5 + use hydrant::resolver::Resolver; 6 + use jacquard::IntoStatic; // Corrected from jacquard_identity::IntoStatic 7 + use jacquard::api::com_atproto::sync::get_repo::GetRepo; 8 + use jacquard::prelude::XrpcExt; 9 + use jacquard::types::did::Did; 10 + use jacquard_common::types::ident::AtIdentifier; 11 + use jacquard_repo::MemoryBlockStore; 12 + use jacquard_repo::mst::Mst; 13 + use miette::{IntoDiagnostic, Result}; 14 + use tracing::{Level, info}; 15 + use tracing_subscriber::FmtSubscriber; // Restored 16 + use url::Url; // Restored 17 + 18 + #[tokio::main] 19 + async fn main() -> Result<()> { 20 + // Setup logging 21 + let subscriber = FmtSubscriber::builder() 22 + .with_max_level(Level::INFO) 23 + .finish(); 24 + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); 25 + 26 + // Parse args 27 + let args: Vec<String> = env::args().collect(); 28 + if args.len() != 2 { 29 + eprintln!("Usage: {} <handle|did>", args[0]); 30 + std::process::exit(1); 31 + } 32 + let identifier_str = &args[1]; 33 + 34 + // Init resolver 35 + let plc_url = Url::parse("https://plc.directory").into_diagnostic()?; 36 + let resolver = Resolver::new(plc_url, 100); 37 + 38 + // Resolve identity 39 + info!("Resolving {}...", identifier_str); 40 + let identifier = if identifier_str.starts_with("did:") { 41 + AtIdentifier::Did(Did::new(identifier_str).map_err(|e| miette::miette!("{}", e))?) 42 + } else { 43 + AtIdentifier::Handle(identifier_str.parse().into_diagnostic()?) 44 + }; 45 + 46 + let did = match identifier { 47 + AtIdentifier::Did(d) => d.into_static(), 48 + AtIdentifier::Handle(h) => { 49 + let d = resolver.resolve_did(&AtIdentifier::Handle(h)).await?; 50 + d 51 + } 52 + }; 53 + info!("Resolved to DID: {}", did); 54 + 55 + let (pds_url, _) = resolver.resolve_identity_info(&did).await?; 56 + info!("PDS URL: {}", pds_url); 57 + 58 + // Fetch repo 59 + info!("Fetching repo..."); 60 + let http = reqwest::Client::builder() 61 + .timeout(Duration::from_secs(30)) 62 + .build() 63 + .into_diagnostic()?; 64 + 65 + let req = GetRepo::new().did(did.clone()).build(); 66 + let resp = http.xrpc(pds_url).send(&req).await.into_diagnostic()?; 67 + let car_bytes = resp.into_output().map_err(|e| miette::miette!("{}", e))?; // explicit map_err 68 + 69 + info!("Fetched {} bytes", car_bytes.body.len()); 70 + 71 + // Parse CAR 72 + let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 73 + .await 74 + .into_diagnostic()?; 75 + 76 + let store = Arc::new(MemoryBlockStore::new()); 77 + for (_cid, bytes) in &parsed.blocks { 78 + jacquard_repo::BlockStore::put(store.as_ref(), bytes) 79 + .await 80 + .into_diagnostic()?; 81 + } 82 + 83 + // Load MST 84 + let root_bytes = parsed 85 + .blocks 86 + .get(&parsed.root) 87 + .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 88 + 89 + let root_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 90 + info!("Repo rev: {}", root_commit.rev); 91 + 92 + let mst: Mst<MemoryBlockStore> = Mst::load(store.clone(), root_commit.data, None); 93 + let leaves = mst.leaves().await.into_diagnostic()?; 94 + let root = mst.root().await.into_diagnostic()?; 95 + 96 + info!("Found {} records", leaves.len()); 97 + 98 + println!("root -> {}", root); 99 + for (key, cid) in leaves { 100 + println!("{} -> {}", key, cid); 101 + } 102 + 103 + Ok(()) 104 + }
+4 -4
src/ingest/worker.rs
··· 262 262 return Ok(ProcessResult::Ok); 263 263 } 264 264 265 - if let (Some(prev_repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) 266 - && prev_repo != &prev_commit.0 265 + if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) 266 + && repo != &prev_commit.0 267 267 { 268 268 warn!( 269 - "gap detected for {}: prev {} != stored {}. triggering backfill", 270 - did, prev_repo, prev_commit.0 269 + "gap detected for {}: repo {} != commit prev {}. triggering backfill", 270 + did, repo, prev_commit.0 271 271 ); 272 272 273 273 let mut batch = state.db.inner.batch();
+10
src/lib.rs
··· 1 + pub mod api; 2 + pub mod backfill; 3 + pub mod config; 4 + pub mod crawler; 5 + pub mod db; 6 + pub mod ingest; 7 + pub mod ops; 8 + pub mod resolver; 9 + pub mod state; 10 + pub mod types;
+9 -20
src/main.rs
··· 1 - mod api; 2 - mod backfill; 3 - mod config; 4 - mod crawler; 5 - mod db; 6 - mod ingest; 7 - mod ops; 8 - mod resolver; 9 - mod state; 10 - mod types; 11 - 12 - use crate::config::{Config, SignatureVerification}; 13 - use crate::crawler::Crawler; 14 - use crate::db::set_firehose_cursor; 15 - use crate::ingest::firehose::FirehoseIngestor; 16 - use crate::state::AppState; 17 - use crate::{backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 1 + use hydrant::config::{Config, SignatureVerification}; 2 + use hydrant::crawler::Crawler; 3 + use hydrant::db::{self, set_firehose_cursor}; 4 + use hydrant::ingest::firehose::FirehoseIngestor; 5 + use hydrant::state::AppState; 6 + use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 18 7 use futures::{FutureExt, TryFutureExt, future::BoxFuture}; 19 8 use miette::IntoDiagnostic; 20 9 use mimalloc::MiMalloc; ··· 81 70 82 71 if let Err(e) = spawn_blocking({ 83 72 let state = state.clone(); 84 - move || crate::backfill::manager::queue_pending_backfills(&state) 73 + move || hydrant::backfill::manager::queue_pending_backfills(&state) 85 74 }) 86 75 .await 87 76 .into_diagnostic()? ··· 92 81 93 82 if let Err(e) = spawn_blocking({ 94 83 let state = state.clone(); 95 - move || crate::backfill::manager::queue_gone_backfills(&state) 84 + move || hydrant::backfill::manager::queue_gone_backfills(&state) 96 85 }) 97 86 .await 98 87 .into_diagnostic()? ··· 103 92 104 93 std::thread::spawn({ 105 94 let state = state.clone(); 106 - move || crate::backfill::manager::retry_worker(state) 95 + move || hydrant::backfill::manager::retry_worker(state) 107 96 }); 108 97 109 98 tokio::spawn({
+8 -7
src/ops.rs
··· 190 190 191 191 trace!("parsed car for {did} in {:?}", start.elapsed()); 192 192 193 + let root_bytes = parsed 194 + .blocks 195 + .get(&parsed.root) 196 + .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 197 + 198 + let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 199 + 193 200 if let Some(key) = signing_key { 194 - let root_bytes = parsed 195 - .blocks 196 - .get(&parsed.root) 197 - .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 198 - 199 - let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 200 201 repo_commit 201 202 .verify(key) 202 203 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; ··· 204 205 } 205 206 206 207 repo_state.rev = Some(commit.rev.clone()); 207 - repo_state.data = Some(Cid::ipld(parsed.root)); 208 + repo_state.data = Some(Cid::ipld(repo_commit.data)); 208 209 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 209 210 210 211 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);