aturi indexer with listRecords and countRecords endpoints

use updated version of tapped, we properly ack now?

ptr.pet 04408052 4b9431de

verified
+37 -28
+5 -5
Cargo.lock
··· 1641 1641 [[package]] 1642 1642 name = "tapped" 1643 1643 version = "0.2.0" 1644 - source = "git+https://tangled.sh/ptr.pet/tapped#982c843e276a3a2e93c3288d7dc4b9abaf9ba5c0" 1644 + source = "git+https://tangled.sh/ptr.pet/tapped#a49b32c917a97500245b0e54dc5cdb67a0f7aa70" 1645 1645 dependencies = [ 1646 1646 "base64", 1647 1647 "bytes", ··· 2355 2355 2356 2356 [[package]] 2357 2357 name = "zerocopy" 2358 - version = "0.8.33" 2358 + version = "0.8.34" 2359 2359 source = "registry+https://github.com/rust-lang/crates.io-index" 2360 - checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" 2360 + checksum = "71ddd76bcebeed25db614f82bf31a9f4222d3fbba300e6fb6c00afa26cbd4d9d" 2361 2361 dependencies = [ 2362 2362 "zerocopy-derive", 2363 2363 ] 2364 2364 2365 2365 [[package]] 2366 2366 name = "zerocopy-derive" 2367 - version = "0.8.33" 2367 + version = "0.8.34" 2368 2368 source = "registry+https://github.com/rust-lang/crates.io-index" 2369 - checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" 2369 + checksum = "d8187381b52e32220d50b255276aa16a084ec0a9017a0ca2152a1f55c539758d" 2370 2370 dependencies = [ 2371 2371 "proc-macro2", 2372 2372 "quote",
+32 -23
src/main.rs
··· 133 133 } 134 134 135 135 match client.channel().await { 136 - Ok(mut receiver) => { 136 + Ok((mut receiver, mut ack_sender)) => { 137 137 info!("connected to tap firehose"); 138 138 loop { 139 139 tokio::select! { 140 140 ev = receiver.recv() => { 141 141 match ev { 142 - Ok(event) => { 143 - let Event::Record(rec) = event.event else { 144 - continue; 145 - }; 146 - // we drop here and assume we wont crash in between here and fjall i suppose 142 + Ok((event, ack_id)) => { 143 + let mut handled = true; 147 144 148 - ops_count.fetch_add(1, Ordering::Relaxed); 145 + if let Event::Record(rec) = event { 146 + ops_count.fetch_add(1, Ordering::Relaxed); 149 147 150 - let ks = match db 151 - .keyspace(&rec.collection, KeyspaceCreateOptions::default) 152 - { 153 - Ok(ks) => ks, 154 - Err(err) => { 155 - error!( 156 - "failed to open keyspace for {}: {}", 157 - rec.collection, err 158 - ); 159 - continue; 148 + match db.keyspace(&rec.collection, KeyspaceCreateOptions::default) { 149 + Ok(ks) => { 150 + let counts = counts.clone(); 151 + handled = 152 + tokio::task::spawn_blocking(move || { 153 + if let Err(e) = handle_record(&counts, &ks, rec) { 154 + error!("error handling record: {}", e); 155 + false 156 + } else { 157 + true 158 + } 159 + }) 160 + .await 161 + .expect("couldnt join task"); 162 + } 163 + Err(err) => { 164 + error!( 165 + "failed to open keyspace for {}: {}", 166 + rec.collection, err 167 + ); 168 + } 160 169 } 161 - }; 170 + } 162 171 163 - let counts = counts.clone(); 164 - tokio::task::spawn_blocking(move || { 165 - if let Err(e) = handle_record(&counts, &ks, rec) { 166 - error!("error handling record: {}", e); 172 + if handled { 173 + if let Err(e) = ack_sender.ack(ack_id).await { 174 + warn!("failed to ack event: {}", e); 175 + break; 167 176 } 168 - }); 177 + } 169 178 } 170 179 Err(err) => { 171 180 warn!("tap channel closed: {err}");