at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest,backfill] implement signature verification

ptr.pet 41cb51ee 0de7dd05

verified
+234 -29
+16 -2
src/backfill/mod.rs
··· 27 27 rx: BackfillRx, 28 28 http: reqwest::Client, 29 29 semaphore: Arc<Semaphore>, 30 + verify_signatures: bool, 30 31 } 31 32 32 33 impl BackfillWorker { ··· 35 36 rx: BackfillRx, 36 37 timeout: Duration, 37 38 concurrency_limit: usize, 39 + verify_signatures: bool, 38 40 ) -> Self { 39 41 Self { 40 42 state, ··· 47 49 .build() 48 50 .expect("failed to build http client"), 49 51 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 52 + verify_signatures, 50 53 } 51 54 } 52 55 ··· 66 69 self.http.clone(), 67 70 did.clone(), 68 71 permit, 72 + self.verify_signatures, 69 73 ) 70 74 .inspect_err(move |e| { 71 75 error!("backfill process failed for {did}: {e}"); ··· 80 84 http: reqwest::Client, 81 85 did: Did<'static>, 82 86 _permit: tokio::sync::OwnedSemaphorePermit, 87 + verify_signatures: bool, 83 88 ) -> Result<()> { 84 89 let db = &state.db; 85 90 86 - match Self::process_did(&state, &http, &did).await { 91 + match Self::process_did(&state, &http, &did, verify_signatures).await { 87 92 Ok(previous_state) => { 88 93 let did_key = keys::repo_key(&did); 89 94 ··· 194 199 Ok(()) 195 200 } 196 201 197 - // returns previous repo state if successful 198 202 async fn process_did<'i>( 199 203 app_state: &Arc<AppState>, 200 204 http: &reqwest::Client, 201 205 did: &Did<'static>, 206 + verify_signatures: bool, 202 207 ) -> Result<RepoState<'static>> { 203 208 debug!("backfilling {}", did); 204 209 ··· 335 340 336 341 let commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 337 342 debug!("backfilling repo at revision {}", commit.rev); 343 + 344 + // 4.5. verify commit signature 345 + if verify_signatures { 346 + let pubkey = app_state.resolver.resolve_signing_key(did).await?; 347 + commit 348 + .verify(&pubkey) 349 + .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 350 + trace!("signature verified for {did}"); 351 + } 338 352 339 353 // 5. walk mst 340 354 let start = Instant::now();
+42
src/config.rs
··· 2 2 use smol_str::SmolStr; 3 3 use std::fmt; 4 4 use std::path::PathBuf; 5 + use std::str::FromStr; 5 6 use std::time::Duration; 6 7 use url::Url; 7 8 9 + #[derive(Debug, Clone, Copy)] 10 + pub enum SignatureVerification { 11 + Full, 12 + BackfillOnly, 13 + None, 14 + } 15 + 16 + impl FromStr for SignatureVerification { 17 + type Err = miette::Error; 18 + fn from_str(s: &str) -> Result<Self> { 19 + match s { 20 + "full" => Ok(Self::Full), 21 + "backfill-only" => Ok(Self::BackfillOnly), 22 + "none" => Ok(Self::None), 23 + _ => Err(miette::miette!("invalid signature verification level")), 24 + } 25 + } 26 + } 27 + 28 + impl fmt::Display for SignatureVerification { 29 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 + match self { 31 + Self::Full => write!(f, "full"), 32 + Self::BackfillOnly => write!(f, "backfill-only"), 33 + Self::None => write!(f, "none"), 34 + } 35 + } 36 + } 37 + 8 38 #[derive(Debug, Clone)] 9 39 pub struct Config { 10 40 pub database_path: PathBuf, ··· 20 50 pub disable_lz4_compression: bool, 21 51 pub debug_port: u16, 22 52 pub enable_debug: bool, 53 + pub verify_signatures: SignatureVerification, 54 + pub identity_cache_size: u64, 23 55 } 24 56 25 57 impl Config { ··· 63 95 let api_port = cfg!("API_PORT", 3000u16); 64 96 let enable_debug = cfg!("ENABLE_DEBUG", false); 65 97 let debug_port = cfg!("DEBUG_PORT", 3001u16); 98 + let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 99 + let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 100_000u64); 66 100 67 101 Ok(Self { 68 102 database_path, ··· 78 112 disable_lz4_compression, 79 113 debug_port, 80 114 enable_debug, 115 + verify_signatures, 116 + identity_cache_size, 81 117 }) 82 118 } 83 119 } ··· 89 125 writeln!(f, " relay host: {}", self.relay_host)?; 90 126 writeln!(f, " plc url: {}", self.plc_url)?; 91 127 writeln!(f, " full network indexing: {}", self.full_network)?; 128 + writeln!(f, " verify signatures: {}", self.verify_signatures)?; 92 129 writeln!( 93 130 f, 94 131 " backfill concurrency: {}", 95 132 self.backfill_concurrency_limit 133 + )?; 134 + writeln!( 135 + f, 136 + " identity cache size: {}", 137 + self.identity_cache_size 96 138 )?; 97 139 writeln!( 98 140 f,
+14 -2
src/ingest/firehose.rs
··· 1 - use crate::db::{self, keys, Db}; 1 + use crate::db::{self, Db, keys}; 2 2 use crate::ingest::BufferTx; 3 3 use crate::state::AppState; 4 4 use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; ··· 6 6 use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 7 7 use miette::Result; 8 8 use n0_future::StreamExt; 9 - use std::sync::atomic::Ordering; 10 9 use std::sync::Arc; 10 + use std::sync::atomic::Ordering; 11 11 use std::time::Duration; 12 12 use tracing::{error, info}; 13 13 use url::Url; ··· 17 17 buffer_tx: BufferTx, 18 18 relay_host: Url, 19 19 full_network: bool, 20 + verify_signatures: bool, 20 21 } 21 22 22 23 impl FirehoseIngestor { ··· 25 26 buffer_tx: BufferTx, 26 27 relay_host: Url, 27 28 full_network: bool, 29 + verify_signatures: bool, 28 30 ) -> Self { 29 31 Self { 30 32 state, 31 33 buffer_tx, 32 34 relay_host, 33 35 full_network, 36 + verify_signatures, 34 37 } 35 38 } 36 39 ··· 100 103 101 104 if !self.should_process(did).await.unwrap_or(false) { 102 105 return; 106 + } 107 + 108 + // pre-warm the key cache for commit events 109 + if self.verify_signatures && matches!(&msg, SubscribeReposMessage::Commit(_)) { 110 + let state = self.state.clone(); 111 + let did = did.clone(); 112 + tokio::spawn(async move { 113 + let _ = state.resolver.resolve_signing_key(&did).await; 114 + }); 103 115 } 104 116 105 117 if let Err(e) = self.buffer_tx.send(msg) {
+47 -6
src/ingest/worker.rs
··· 6 6 use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; 7 7 8 8 use fjall::OwnedWriteBatch; 9 + use futures::future::join_all; 9 10 use jacquard::cowstr::ToCowStr; 10 11 use jacquard::types::did::Did; 11 12 use jacquard_common::IntoStatic; 13 + use jacquard_common::types::crypto::PublicKey; 12 14 use miette::{IntoDiagnostic, Result}; 13 15 use smol_str::ToSmolStr; 14 - use std::collections::HashSet; 16 + use std::collections::{HashMap, HashSet}; 15 17 use std::sync::Arc; 16 18 use std::time::Duration; 17 19 use tokio::sync::mpsc; ··· 31 33 pub struct FirehoseWorker { 32 34 state: Arc<AppState>, 33 35 rx: mpsc::UnboundedReceiver<BufferedMessage>, 36 + verify_signatures: bool, 34 37 } 35 38 36 39 impl FirehoseWorker { 37 - pub fn new(state: Arc<AppState>, rx: mpsc::UnboundedReceiver<BufferedMessage>) -> Self { 38 - Self { state, rx } 40 + pub fn new( 41 + state: Arc<AppState>, 42 + rx: mpsc::UnboundedReceiver<BufferedMessage>, 43 + verify_signatures: bool, 44 + ) -> Self { 45 + Self { 46 + state, 47 + rx, 48 + verify_signatures, 49 + } 39 50 } 40 51 41 52 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> { ··· 47 58 let mut batch = self.state.db.inner.batch(); 48 59 let mut deleted = HashSet::new(); 49 60 61 + // resolve signing keys for commits if verification is enabled 62 + let keys = if self.verify_signatures { 63 + let dids: HashSet<Did> = buf 64 + .iter() 65 + .filter_map(|msg| match msg { 66 + SubscribeReposMessage::Commit(c) => Some(c.repo.clone()), 67 + _ => None, 68 + }) 69 + .collect(); 70 + 71 + let futures = dids.into_iter().map(|did| async { 72 + match self.state.resolver.resolve_signing_key(&did).await { 73 + Ok(key) => Some((did, key)), 74 + Err(e) => { 75 + warn!("failed to resolve key for {did}: {e}"); 76 + None 77 + } 78 + } 79 + }); 80 + 81 + handle 82 + .block_on(join_all(futures)) 83 + .into_iter() 84 + .flatten() 85 + .collect() 86 + } else { 87 + HashMap::new() 88 + }; 89 + 50 90 for msg in buf.drain(..) { 51 91 let (did, seq) = match &msg { 52 92 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), ··· 64 104 continue; 65 105 } 66 106 67 - match Self::process_message(&self.state, &mut batch, &msg, did) { 107 + match Self::process_message(&self.state, &mut batch, &msg, did, &keys) { 68 108 Ok(ProcessResult::Ok) => {} 69 109 Ok(ProcessResult::Deleted) => { 70 110 deleted.insert(did.clone()); ··· 114 154 } 115 155 116 156 fn process_message( 117 - state: &AppState, 157 + state: &Arc<AppState>, 118 158 batch: &mut OwnedWriteBatch, 119 159 msg: &BufferedMessage, 120 160 did: &Did, 161 + keys: &HashMap<Did<'static>, PublicKey<'static>>, 121 162 ) -> Result<ProcessResult> { 122 163 let RepoCheckResult::Ok(repo_state) = Self::check_repo_state(batch, state, did)? else { 123 164 return Ok(ProcessResult::Ok); ··· 160 201 return Ok(ProcessResult::Ok); 161 202 } 162 203 163 - ops::apply_commit(batch, &state.db, repo_state, &commit)?(); 204 + ops::apply_commit(batch, &state.db, repo_state, &commit, keys.get(did))?(); 164 205 } 165 206 SubscribeReposMessage::Identity(identity) => { 166 207 debug!("processing buffered identity for {did}");
+29 -7
src/main.rs
··· 9 9 mod state; 10 10 mod types; 11 11 12 - use crate::config::Config; 12 + use crate::config::{Config, SignatureVerification}; 13 13 use crate::crawler::Crawler; 14 14 use crate::db::set_firehose_cursor; 15 15 use crate::ingest::firehose::FirehoseIngestor; 16 16 use crate::state::AppState; 17 17 use crate::{backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 18 - use futures::{future::BoxFuture, FutureExt, TryFutureExt}; 18 + use futures::{FutureExt, TryFutureExt, future::BoxFuture}; 19 19 use miette::IntoDiagnostic; 20 20 use mimalloc::MiMalloc; 21 + use std::sync::Arc; 21 22 use std::sync::atomic::Ordering; 22 - use std::sync::Arc; 23 23 use tokio::{sync::mpsc, task::spawn_blocking}; 24 24 use tracing::{error, info}; 25 25 ··· 53 53 tokio::spawn({ 54 54 let state = state.clone(); 55 55 let timeout = cfg.repo_fetch_timeout; 56 - BackfillWorker::new(state, backfill_rx, timeout, cfg.backfill_concurrency_limit).run() 56 + BackfillWorker::new( 57 + state, 58 + backfill_rx, 59 + timeout, 60 + cfg.backfill_concurrency_limit, 61 + matches!( 62 + cfg.verify_signatures, 63 + SignatureVerification::Full | SignatureVerification::BackfillOnly 64 + ), 65 + ) 66 + .run() 57 67 }); 58 68 59 69 let firehose_worker = std::thread::spawn({ 60 70 let state = state.clone(); 61 71 let handle = tokio::runtime::Handle::current(); 62 - move || FirehoseWorker::new(state, buffer_rx).run(handle) 72 + move || { 73 + FirehoseWorker::new( 74 + state, 75 + buffer_rx, 76 + matches!(cfg.verify_signatures, SignatureVerification::Full), 77 + ) 78 + .run(handle) 79 + } 63 80 }); 64 81 65 82 if let Err(e) = spawn_blocking({ ··· 161 178 ); 162 179 } 163 180 164 - let ingestor = 165 - FirehoseIngestor::new(state.clone(), buffer_tx, cfg.relay_host, cfg.full_network); 181 + let ingestor = FirehoseIngestor::new( 182 + state.clone(), 183 + buffer_tx, 184 + cfg.relay_host, 185 + cfg.full_network, 186 + matches!(cfg.verify_signatures, SignatureVerification::Full), 187 + ); 166 188 167 189 let res = futures::future::try_join_all::<[BoxFuture<_>; _]>([ 168 190 Box::pin(
+18 -3
src/ops.rs
··· 1 1 use crate::db::types::TrimmedDid; 2 - use crate::db::{self, keys, ser_repo_state, Db}; 2 + use crate::db::{self, Db, keys, ser_repo_state}; 3 3 use crate::state::AppState; 4 4 use crate::types::{ 5 5 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 6 6 StoredEvent, 7 7 }; 8 8 use fjall::OwnedWriteBatch; 9 - use jacquard::api::com_atproto::sync::subscribe_repos::Commit; 9 + use jacquard::CowStr; 10 10 use jacquard::cowstr::ToCowStr; 11 11 use jacquard::types::cid::Cid; 12 - use jacquard::CowStr; 12 + use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 13 + use jacquard_common::types::crypto::PublicKey; 13 14 use jacquard_repo::car::reader::parse_car_bytes; 14 15 use miette::{Context, IntoDiagnostic, Result}; 15 16 use std::collections::HashMap; ··· 148 149 db: &'db Db, 149 150 mut repo_state: RepoState, 150 151 commit: &Commit<'_>, 152 + signing_key: Option<&PublicKey>, 151 153 ) -> Result<impl FnOnce() + use<'db>> { 152 154 let did = &commit.repo; 153 155 debug!("applying commit {} for {did}", &commit.commit); ··· 161 163 })?; 162 164 163 165 trace!("parsed car for {did} in {:?}", start.elapsed()); 166 + 167 + if let Some(key) = signing_key { 168 + let root_bytes = parsed 169 + .blocks 170 + .get(&parsed.root) 171 + .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 172 + 173 + let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 174 + repo_commit 175 + .verify(key) 176 + .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 177 + trace!("signature verified for {did}"); 178 + } 164 179 165 180 repo_state.rev = Some(commit.rev.clone()); 166 181 repo_state.data = Some(Cid::ipld(parsed.root));
+67 -8
src/resolver.rs
··· 1 1 use std::ops::Not; 2 + use std::sync::Arc; 3 + use std::time::Duration; 2 4 5 + use jacquard::IntoStatic; 3 6 use jacquard::types::string::Handle; 4 - use jacquard::IntoStatic; 7 + use jacquard_common::types::crypto::PublicKey; 5 8 use jacquard_common::types::ident::AtIdentifier; 6 9 use jacquard_common::types::string::Did; 7 - use jacquard_identity::resolver::{IdentityResolver, PlcSource, ResolverOptions}; 8 10 use jacquard_identity::JacquardResolver; 11 + use jacquard_identity::resolver::{IdentityResolver, PlcSource, ResolverOptions}; 9 12 use miette::{IntoDiagnostic, Result}; 13 + use scc::HashCache; 10 14 use url::Url; 11 15 16 + struct ResolverInner { 17 + jacquard: JacquardResolver, 18 + key_cache: HashCache<Did<'static>, PublicKey<'static>>, 19 + } 20 + 21 + #[derive(Clone)] 12 22 pub struct Resolver { 13 - inner: JacquardResolver, 23 + inner: Arc<ResolverInner>, 14 24 } 15 25 16 26 impl Resolver { 17 - pub fn new(plc_url: Url) -> Self { 27 + pub fn new(plc_url: Url, identity_cache_size: u64) -> Self { 18 28 let http = reqwest::Client::new(); 19 29 let mut opts = ResolverOptions::default(); 20 30 opts.plc_source = PlcSource::PlcDirectory { base: plc_url }; 31 + opts.request_timeout = Some(Duration::from_secs(3)); 21 32 22 - let inner = JacquardResolver::new(http, opts); 33 + // no jacquard cache - we manage our own 34 + let jacquard = JacquardResolver::new(http, opts); 23 35 24 - Self { inner } 36 + Self { 37 + inner: Arc::new(ResolverInner { 38 + jacquard, 39 + key_cache: HashCache::with_capacity( 40 + std::cmp::min(1000, (identity_cache_size / 100) as usize), 41 + identity_cache_size as usize, 42 + ), 43 + }), 44 + } 25 45 } 26 46 27 47 pub async fn resolve_did(&self, identifier: &AtIdentifier<'_>) -> Result<Did<'static>> { 28 48 match identifier { 29 49 AtIdentifier::Did(did) => Ok(did.clone().into_static()), 30 50 AtIdentifier::Handle(handle) => { 31 - let did = self.inner.resolve_handle(handle).await.into_diagnostic()?; 51 + let did = self 52 + .inner 53 + .jacquard 54 + .resolve_handle(handle) 55 + .await 56 + .into_diagnostic()?; 32 57 Ok(did.into_static()) 33 58 } 34 59 } 35 60 } 36 61 37 62 pub async fn resolve_identity_info(&self, did: &Did<'_>) -> Result<(Url, Option<Handle<'_>>)> { 38 - let doc_resp = self.inner.resolve_did_doc(did).await.into_diagnostic()?; 63 + let doc_resp = self 64 + .inner 65 + .jacquard 66 + .resolve_did_doc(did) 67 + .await 68 + .into_diagnostic()?; 39 69 let doc = doc_resp.parse().into_diagnostic()?; 40 70 41 71 let pds = doc ··· 46 76 let handle = handles.is_empty().not().then(|| handles.remove(0)); 47 77 48 78 Ok((pds, handle)) 79 + } 80 + 81 + pub async fn resolve_signing_key(&self, did: &Did<'_>) -> Result<PublicKey<'static>> { 82 + let did_static = did.clone().into_static(); 83 + 84 + if let Some(entry) = self.inner.key_cache.get_async(&did_static).await { 85 + return Ok(entry.get().clone()); 86 + } 87 + 88 + let doc_resp = self 89 + .inner 90 + .jacquard 91 + .resolve_did_doc(did) 92 + .await 93 + .into_diagnostic()?; 94 + let doc = doc_resp.parse().into_diagnostic()?; 95 + 96 + let key = doc 97 + .atproto_public_key() 98 + .into_diagnostic()? 99 + .ok_or_else(|| miette::miette!("no atproto signing key in DID doc for {did}"))?; 100 + 101 + let _ = self 102 + .inner 103 + .key_cache 104 + .put_async(did_static, key.clone()) 105 + .await; 106 + 107 + Ok(key) 49 108 } 50 109 }
+1 -1
src/state.rs
··· 25 25 config.cache_size, 26 26 config.disable_lz4_compression, 27 27 )?; 28 - let resolver = Resolver::new(config.plc_url.clone()); 28 + let resolver = Resolver::new(config.plc_url.clone(), config.identity_cache_size); 29 29 let (backfill_tx, backfill_rx) = mpsc::unbounded_channel(); 30 30 31 31 Ok((