use axum::{ extract::{Query, State}, routing::get, Json, Router, }; use fjall::{Database, Keyspace, KeyspaceCreateOptions}; use serde::{Deserialize, Serialize}; use std::{ pin, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, time::Duration, }; use tapped::{Event, RecordAction, RecordEvent, TapClient}; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[derive(Clone)] struct AppState { db: Database, counts: Keyspace, } #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let db = Database::builder("aturlist_fjall").open()?; // open keyspaces let counts = db.keyspace("counts", || KeyspaceCreateOptions::default())?; let state = AppState { db: db.clone(), counts: counts.clone(), }; let ops_count = Arc::new(AtomicU64::new(0)); // start tap consumers let num_consumers = std::env::var("TAP_CONCURRENCY") .ok() .and_then(|s| s.parse().ok()) .unwrap_or(20); let closed = CancellationToken::new(); for i in 0..num_consumers { let db_clone = state.db.clone(); let counts_clone = state.counts.clone(); let ops_count_clone = ops_count.clone(); let closed = closed.child_token(); tokio::spawn(async move { info!("starting consumer #{}", i); run_tap_consumer(db_clone, counts_clone, ops_count_clone, closed).await; }); } // start stats reporter let ops_count_stats = ops_count.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); let mut last_count = 0; // The first tick completes immediately interval.tick().await; loop { interval.tick().await; let current_count = ops_count_stats.load(Ordering::Relaxed); let delta = current_count - last_count; let ops_sec = delta as f64 / 60.0; info!( "stats: total_ops={} delta_ops={} ops_sec={:.2}", current_count, delta, ops_sec ); last_count = current_count; } }); let app = Router::new() .route("/xrpc/systems.gaze.aturlist.listRecords", get(list_records)) .route( "/xrpc/systems.gaze.aturlist.countRecords", get(count_records), ) .with_state(state); let listener = tokio::net::TcpListener::bind("0.0.0.0:7155").await?; info!("listening on {}", listener.local_addr()?); let mut _sigterm = tokio::signal::unix::signal(SignalKind::terminate())?; let mut _sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; let sigterm = pin::pin!(_sigterm.recv()); let sigint = pin::pin!(_sigint.recv()); let terminating = futures::future::select(sigterm, sigint); tokio::select! { res = axum::serve(listener, app) => res?, _ = terminating => { info!("shutting down!"); closed.cancel(); } } info!("waiting 10 seconds for cleanup..."); tokio::time::sleep(Duration::from_secs(10)).await; info!("byebye! (_ _*)Zzz"); Ok(()) } async fn run_tap_consumer( db: Database, counts: Keyspace, ops_count: Arc, closed: CancellationToken, ) { let tap_url = "http://localhost:2480"; 'outer: loop { info!("connecting to tap at {}", tap_url); match TapClient::new(tap_url) { Ok(client) => { if let Err(e) = client.health().await { warn!("tap health check failed: {}", e); tokio::time::sleep(std::time::Duration::from_secs(5)).await; continue; } match client.channel().await { Ok((mut receiver, mut ack_sender)) => { info!("connected to tap firehose"); loop { tokio::select! { ev = receiver.recv() => { match ev { Ok((event, ack_id)) => { let mut handled = true; if let Event::Record(rec) = event { match db.keyspace(&rec.collection, KeyspaceCreateOptions::default) { Ok(ks) => { let counts = counts.clone(); let ops_count = ops_count.clone(); handled = tokio::task::spawn_blocking(move || { if let Err(e) = handle_record(&counts, &ks, rec) { error!("error handling record: {}", e); false } else { ops_count.fetch_add(1, Ordering::Relaxed); true } }) .await .expect("couldnt join task"); } Err(err) => { error!( "failed to open keyspace for {}: {}", rec.collection, err ); } } } if handled { if let Err(e) = ack_sender.ack(ack_id).await { warn!("failed to ack event: {}", e); break; } } } Err(err) => { warn!("tap channel closed: {err}"); break; } } } _ = closed.cancelled() => break 'outer, } } } Err(e) => { warn!("failed to subscribe to channel: {}", e); } } } Err(e) => { warn!("failed to create tap client: {}", e); } } tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } fn strip_did_prefix(did: &str) -> &str { did.strip_prefix("did:").unwrap_or(did) } fn handle_record(counts: &Keyspace, records: &Keyspace, rec: RecordEvent) -> anyhow::Result<()> { // index everything, no filter. // key: strip_did(did)|rkey let key = make_key(strip_did_prefix(&rec.did), &rec.rkey); // logic to maintain counts: // create: insert and increment. // update: insert (overwrite). no count change. // delete: remove and decrement. match rec.action { RecordAction::Create => { // info!("creating {} {} {}", rec.did, rec.collection, rec.rkey); records.insert(&key, &[])?; increment_count(counts, strip_did_prefix(&rec.did), &rec.collection)?; } RecordAction::Update => { // info!("updating {} {} {}", rec.did, rec.collection, rec.rkey); // records.insert(&key, &[])?; } RecordAction::Delete => { // info!("deleting {} {} {}", rec.did, rec.collection, rec.rkey); records.remove(&key)?; decrement_count(counts, strip_did_prefix(&rec.did), &rec.collection)?; } _ => {} } Ok(()) } fn make_key(repo_stripped: &str, rkey: &str) -> Vec { format!("{}|{}", repo_stripped, rkey).into_bytes() } fn make_count_key(repo_stripped: &str, collection: &str) -> Vec { format!("{}|{}", repo_stripped, collection).into_bytes() } fn increment_count(counts: &Keyspace, repo_stripped: &str, collection: &str) -> anyhow::Result<()> { let key = make_count_key(repo_stripped, collection); let mut current = 0u64; if let Some(val) = counts.get(&key)? { if val.len() == 8 { current = u64::from_le_bytes(val[..].try_into().unwrap()); } } current += 1; counts.insert(&key, current.to_le_bytes())?; Ok(()) } fn decrement_count(counts: &Keyspace, repo_stripped: &str, collection: &str) -> anyhow::Result<()> { let key = make_count_key(repo_stripped, collection); let mut current = 0u64; if let Some(val) = counts.get(&key)? { if val.len() == 8 { current = u64::from_le_bytes(val[..].try_into().unwrap()); } } if current > 0 { current -= 1; counts.insert(&key, current.to_le_bytes())?; } Ok(()) } // handlers #[derive(Deserialize)] struct ListRecordsParams { repo: String, collection: String, cursor: Option, reverse: Option, limit: Option, } #[derive(Serialize)] struct ListRecordsResponse { aturis: Vec, // count field is usually empty in listRecords but we can leave it 0 count: usize, #[serde(skip_serializing_if = "Option::is_none")] cursor: Option, } async fn list_records( State(state): State, Query(params): Query, ) -> Json { let records = match state .db .keyspace(¶ms.collection, || KeyspaceCreateOptions::default()) { Ok(p) => p, Err(_) => { return Json(ListRecordsResponse { aturis: Vec::new(), count: 0, cursor: None, }); } }; let repo_stripped = strip_did_prefix(¶ms.repo); let prefix_str = format!("{}|", repo_stripped); let prefix = prefix_str.as_bytes(); // default to descending (newest first) -> reverse=false means descending. // reverse=true means ascending. let ascending = params.reverse.unwrap_or(false); let limit = params.limit.unwrap_or(50).min(500); let mut aturis = Vec::new(); let mut last_rkey = None; let start_bound = if ascending { if let Some(c) = ¶ms.cursor { let mut k = make_key(repo_stripped, c); k.push(0); // start after cursor k } else { prefix.to_vec() } } else { // descending prefix.to_vec() }; let end_bound = if ascending { let mut p = prefix.to_vec(); p.push(0xFF); p } else { // descending if let Some(c) = ¶ms.cursor { make_key(repo_stripped, c) } else { let mut p = prefix.to_vec(); p.push(0xFF); p } }; let range = records.range(start_bound..end_bound); let mut process_key = |k: &[u8]| { let k_str = String::from_utf8_lossy(k); let parts: Vec<&str> = k_str.split('|').collect(); // key format: repo_stripped|rkey if parts.len() == 2 { let rkey = parts[1]; aturis.push(format!( "at://{}/{}/{}", params.repo, params.collection, rkey )); last_rkey = Some(rkey.to_string()); } }; if ascending { for item in range.take(limit) { if let Ok(k) = item.key() { process_key(&k); } } } else { for item in range.rev().take(limit) { if let Ok(k) = item.key() { process_key(&k); } } } let count = aturis.len(); Json(ListRecordsResponse { aturis, count, cursor: last_rkey, }) } #[derive(Deserialize)] struct CountRecordsParams { repo: String, collection: String, } #[derive(Serialize)] struct CountRecordsResponse { repo: String, collection: String, count: u64, } async fn count_records( State(state): State, Query(params): Query, ) -> Json { let repo_stripped = strip_did_prefix(¶ms.repo); let key = make_count_key(repo_stripped, ¶ms.collection); let mut count = 0u64; if let Ok(Some(val)) = state.counts.get(&key) { if val.len() == 8 { count = u64::from_le_bytes(val[..].try_into().unwrap()); } } Json(CountRecordsResponse { repo: params.repo, collection: params.collection, count, }) }