at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[api] update handlers for new lifecycle states and remove redundant module

ptr.pet 46787251 b055ce25

verified
+28 -42
-15
src/api/event.rs
··· 1 - use serde::{Deserialize, Serialize}; 2 - use smol_str::SmolStr; 3 - 4 - use crate::types::{IdentityEvt, RecordEvt}; 5 - 6 - #[derive(Debug, Serialize, Deserialize, Clone)] 7 - pub struct MarshallableEvt { 8 - pub id: u64, 9 - #[serde(rename = "type")] 10 - pub event_type: SmolStr, 11 - #[serde(skip_serializing_if = "Option::is_none")] 12 - pub record: Option<RecordEvt>, 13 - #[serde(skip_serializing_if = "Option::is_none")] 14 - pub identity: Option<IdentityEvt>, 15 - }
-1
src/api/mod.rs
··· 5 5 use tower_http::trace::TraceLayer; 6 6 7 7 mod debug; 8 - pub mod event; 9 8 pub mod repo; 10 9 pub mod stats; 11 10 mod stream;
+2 -2
src/api/repo.rs
··· 1 1 use crate::api::AppState; 2 - use crate::db::{keys, Db}; 2 + use crate::db::{keys, ser_repo_state, Db}; 3 3 use crate::types::{RepoState, RepoStatus}; 4 4 use axum::{extract::State, http::StatusCode, routing::post, Json, Router}; 5 5 use jacquard::{types::did::Did, IntoStatic}; ··· 36 36 { 37 37 let mut repo_state = RepoState::new(did.clone()); 38 38 repo_state.status = RepoStatus::Backfilling; 39 - let bytes = rmp_serde::to_vec(&repo_state) 39 + let bytes = ser_repo_state(&repo_state) 40 40 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 41 41 42 42 batch.insert(&db.repos, did_key, bytes);
+13 -4
src/api/stream.rs
··· 1 - use crate::api::event::MarshallableEvt; 2 1 use crate::api::AppState; 3 2 use crate::db::keys; 4 - use crate::types::{RecordEvt, StoredEvent}; 3 + use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent}; 5 4 use axum::{ 6 5 extract::{ 7 6 ws::{Message, WebSocket, WebSocketUpgrade}, ··· 28 27 } 29 28 30 29 async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) { 31 - let (tx, mut rx) = mpsc::channel(100); 30 + let (tx, mut rx) = mpsc::channel(500); 32 31 33 32 std::thread::Builder::new() 34 33 .name(format!( ··· 135 134 136 135 // 2. wait for live events 137 136 match event_rx.blocking_recv() { 138 - Ok(_) => { 137 + Ok(BroadcastEvent::Persisted(_)) => { 139 138 // just wake up and run catch-up loop again 139 + } 140 + Ok(BroadcastEvent::Ephemeral(evt)) => { 141 + // send ephemeral event directly 142 + let json_str = match serde_json::to_string(&evt) { 143 + Ok(s) => s, 144 + Err(_) => continue, 145 + }; 146 + if tx.blocking_send(Message::Text(json_str.into())).is_err() { 147 + return; 148 + } 140 149 } 141 150 Err(broadcast::error::RecvError::Lagged(_)) => { 142 151 // continue to catch up
+13 -20
src/crawler/mod.rs
··· 1 - use crate::db::{keys, Db}; 1 + use crate::db::{keys, ser_repo_state, Db}; 2 2 use crate::state::AppState; 3 3 use crate::types::{RepoState, RepoStatus}; 4 4 use jacquard::api::com_atproto::sync::list_repos::{ListRepos, ListReposOutput}; ··· 57 57 let output: ListReposOutput = match res_result { 58 58 Ok(res) => res.into_output().into_diagnostic()?, 59 59 Err(e) => { 60 - error!("crawler failed to list repos: {}. retrying in 30s...", e); 60 + error!("crawler failed to list repos: {e}. retrying in 30s..."); 61 61 tokio::time::sleep(Duration::from_secs(30)).await; 62 62 continue; 63 63 } ··· 66 66 if output.repos.is_empty() { 67 67 info!("crawler finished enumeration (or empty page). sleeping for 1 hour."); 68 68 tokio::time::sleep(Duration::from_secs(3600)).await; 69 - // we might want to reset cursor to start over? tap seems to loop. 70 - // for now, just wait. 71 69 continue; 72 70 } 73 71 ··· 83 81 84 82 // check if known 85 83 if !Db::contains_key(db.repos.clone(), did_key).await? { 86 - debug!("crawler found new repo: {}", did_str); 84 + debug!("crawler found new repo: {did_str}"); 87 85 88 86 // create state (backfilling) 89 87 let mut state = RepoState::new(repo.did.to_owned()); 90 88 state.status = RepoStatus::Backfilling; 91 - let bytes = rmp_serde::to_vec(&state).into_diagnostic()?; 92 89 93 - batch.insert(&db.repos, did_key, bytes); 90 + batch.insert(&db.repos, did_key, ser_repo_state(&state)?); 94 91 batch.insert(&db.pending, did_key, Vec::new()); 95 92 to_queue.push(did_str); 96 93 } ··· 99 96 // update counts if we found new repos 100 97 if !to_queue.is_empty() { 101 98 let count = to_queue.len() as i64; 102 - tokio::spawn({ 103 - let state = self.state.clone(); 104 - async move { 105 - let _ = state 106 - .db 107 - .increment_count(keys::count_keyspace_key("repos"), count) 108 - .await; 109 - let _ = state 110 - .db 111 - .increment_count(keys::count_keyspace_key("pending"), count) 112 - .await; 113 - } 114 - }); 99 + let repos_fut = self 100 + .state 101 + .db 102 + .increment_count(keys::count_keyspace_key("repos"), count); 103 + let pending_fut = self 104 + .state 105 + .db 106 + .increment_count(keys::count_keyspace_key("pending"), count); 107 + tokio::spawn(futures::future::join_all([repos_fut, pending_fut])); 115 108 } 116 109 117 110 // 4. update cursor