aturi indexer with listRecords and countRecords endpoints

decouple receiving from processing

ptr.pet 4bc72704 4dd69495

verified
+36 -8
+36 -8
src/main.rs
··· 8 8 use tapped::{TapClient, Event, RecordAction, RecordEvent}; 9 9 use fjall::{Database, Keyspace, KeyspaceCreateOptions}; 10 10 use std::sync::{Arc, atomic::{AtomicU64, Ordering}}; 11 - 11 + use std::collections::HashMap; 12 12 13 13 #[global_allocator] 14 14 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; ··· 74 74 75 75 async fn run_tap_consumer(db: Database, counts: Keyspace, ops_count: Arc<AtomicU64>) { 76 76 let tap_url = "http://localhost:2480"; 77 + let mut keyspaces: HashMap<String, Keyspace> = HashMap::new(); 77 78 78 79 loop { 79 80 info!("connecting to tap at {}", tap_url); ··· 90 91 info!("connected to tap firehose"); 91 92 while let Ok(event) = receiver.recv().await { 92 93 ops_count.fetch_add(1, Ordering::Relaxed); 93 - if let Event::Record(rec) = &*event { 94 - if let Err(e) = handle_record(&db, &counts, rec) { 95 - error!("error handling record: {}", e); 94 + 95 + // check if it's a record and get the keyspace, without moving event yet 96 + let ks = if let Event::Record(rec) = &*event { 97 + if let Some(k) = keyspaces.get(&rec.collection) { 98 + Some(k.clone()) 99 + } else { 100 + match db.keyspace(&rec.collection, KeyspaceCreateOptions::default) { 101 + Ok(k) => { 102 + keyspaces.insert(rec.collection.clone(), k.clone()); 103 + Some(k) 104 + } 105 + Err(e) => { 106 + error!("failed to open keyspace for {}: {}", rec.collection, e); 107 + None 108 + } 109 + } 96 110 } 111 + } else { 112 + None 113 + }; 114 + 115 + if let Some(ks) = ks { 116 + let counts = counts.clone(); 117 + tokio::task::spawn_blocking(move || { 118 + if let Event::Record(rec) = &*event { 119 + if let Err(e) = handle_record(&counts, &ks, rec) { 120 + error!("error handling record: {}", e); 121 + } 122 + } 123 + }); 97 124 } 98 125 } 99 126 warn!("tap channel closed"); ··· 115 142 did.strip_prefix("did:").unwrap_or(did) 116 143 } 117 144 118 - fn handle_record(db: &Database, counts: &Keyspace, rec: &RecordEvent) -> anyhow::Result<()> { 119 - // partition by collection 120 - let records = db.keyspace(&rec.collection, KeyspaceCreateOptions::default)?; 121 - 145 + fn handle_record( 146 + counts: &Keyspace, 147 + records: &Keyspace, 148 + rec: &RecordEvent, 149 + ) -> anyhow::Result<()> { 122 150 // index everything, no filter. 123 151 // key: strip_did(did)|rkey 124 152 let key = make_key(strip_did_prefix(&rec.did), &rec.rkey);