at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 02165cb84b2914aaedb64e22cb80ef641edd02ce 132 lines 3.2 kB view raw
1use jacquard_common::types::string::Did; 2use serde::{Deserialize, Serialize}; 3use serde_json::Value; 4use smol_str::SmolStr; 5 6// from src/state.rs 7 8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 9pub enum RepoStatus { 10 New, 11 Backfilling, 12 Synced, 13 Error(SmolStr), 14 Deactivated, 15 Takendown, 16 Suspended, 17} 18 19#[derive(Debug, Clone, Serialize, Deserialize)] 20pub struct RepoState { 21 pub did: SmolStr, 22 pub status: RepoStatus, 23 pub rev: SmolStr, 24 pub data: SmolStr, 25 pub last_seq: Option<i64>, 26 pub last_updated_at: i64, // unix timestamp 27 pub handle: Option<SmolStr>, 28} 29 30impl RepoState { 31 pub fn new(did: Did) -> Self { 32 Self { 33 did: did.as_str().into(), 34 status: RepoStatus::New, 35 rev: "".into(), 36 data: "".into(), 37 last_seq: None, 38 last_updated_at: chrono::Utc::now().timestamp(), 39 handle: None, 40 } 41 } 42} 43 44// from src/backfill/resync_state.rs 45 46#[derive(Debug, Clone, Serialize, Deserialize)] 47pub enum ResyncState { 48 Error { 49 message: SmolStr, 50 retry_count: u32, 51 next_retry: i64, // unix timestamp 52 }, 53 Gone { 54 status: RepoStatus, // deactivated, takendown, suspended 55 }, 56} 57 58impl ResyncState { 59 pub fn next_backoff(retry_count: u32) -> i64 { 60 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 61 let base = 60; 62 let cap = 3600; 63 let mult = 2u64.pow(retry_count.min(10)) as i64; 64 let delay = (base * mult).min(cap); 65 chrono::Utc::now().timestamp() + delay 66 } 67} 68 69// from src/api/event.rs 70 71#[derive(Debug, Serialize, Deserialize, Clone)] 72pub struct MarshallableEvt { 73 pub id: u64, 74 #[serde(rename = "type")] 75 pub event_type: SmolStr, 76 #[serde(skip_serializing_if = "Option::is_none")] 77 pub record: Option<RecordEvt>, 78 #[serde(skip_serializing_if = "Option::is_none")] 79 pub identity: Option<IdentityEvt>, 80 #[serde(skip_serializing_if = "Option::is_none")] 81 pub account: Option<AccountEvt>, 82} 83 84#[derive(Clone, Debug)] 85pub enum BroadcastEvent { 86 Persisted(u64), 87 Ephemeral(MarshallableEvt), 88} 89 90#[derive(Debug, Serialize, Deserialize, Clone)] 91pub struct RecordEvt { 92 pub live: bool, 93 pub did: SmolStr, 94 pub rev: SmolStr, 95 pub collection: SmolStr, 96 pub rkey: SmolStr, 97 pub action: SmolStr, 98 #[serde(skip_serializing_if = "Option::is_none")] 99 pub record: Option<Value>, 100 #[serde(skip_serializing_if = "Option::is_none")] 101 pub cid: Option<SmolStr>, 102} 103 104#[derive(Debug, Serialize, Deserialize, Clone)] 105pub struct IdentityEvt { 106 pub did: SmolStr, 107 #[serde(skip_serializing_if = "Option::is_none")] 108 pub handle: Option<SmolStr>, 109} 110 111#[derive(Debug, Serialize, Deserialize, Clone)] 112pub struct AccountEvt { 113 pub did: SmolStr, 114 pub active: bool, 115 #[serde(skip_serializing_if = "Option::is_none")] 116 pub status: Option<SmolStr>, 117} 118 119#[derive(Debug, Serialize, Deserialize, Clone)] 120pub enum StoredEvent { 121 Record { 122 live: bool, 123 did: SmolStr, 124 rev: SmolStr, 125 collection: SmolStr, 126 rkey: SmolStr, 127 action: SmolStr, 128 cid: Option<SmolStr>, 129 }, 130 Identity(IdentityEvt), 131 Account(AccountEvt), 132}