at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at e558361eb571e7e019a4a7967bb4ae7e666f3f25 214 lines 5.6 kB view raw
1use std::fmt::Display; 2 3use jacquard_common::types::cid::IpldCid; 4use jacquard_common::types::string::Did; 5use jacquard_common::{CowStr, IntoStatic, types::string::Handle}; 6use serde::{Deserialize, Serialize}; 7use serde_json::Value; 8use smol_str::SmolStr; 9 10use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 11 12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 13pub enum RepoStatus { 14 Backfilling, 15 Synced, 16 Error(SmolStr), 17 Deactivated, 18 Takendown, 19 Suspended, 20} 21 22impl Display for RepoStatus { 23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 match self { 25 RepoStatus::Backfilling => write!(f, "backfilling"), 26 RepoStatus::Synced => write!(f, "synced"), 27 RepoStatus::Error(e) => write!(f, "error({e})"), 28 RepoStatus::Deactivated => write!(f, "deactivated"), 29 RepoStatus::Takendown => write!(f, "takendown"), 30 RepoStatus::Suspended => write!(f, "suspended"), 31 } 32 } 33} 34 35#[derive(Debug, Clone, Serialize, Deserialize)] 36#[serde(bound(deserialize = "'i: 'de"))] 37pub struct RepoState<'i> { 38 pub status: RepoStatus, 39 pub rev: Option<DbTid>, 40 pub data: Option<IpldCid>, 41 pub last_seq: Option<i64>, 42 pub last_updated_at: i64, // unix timestamp 43 #[serde(borrow)] 44 pub handle: Option<Handle<'i>>, 45 pub index_id: u64, 46 #[serde(default = "default_tracked")] 47 pub tracked: bool, 48} 49 50fn default_tracked() -> bool { 51 true 52} 53 54impl<'i> RepoState<'i> { 55 pub fn backfilling(index_id: u64) -> Self { 56 Self { 57 status: RepoStatus::Backfilling, 58 rev: None, 59 data: None, 60 last_seq: None, 61 last_updated_at: chrono::Utc::now().timestamp(), 62 handle: None, 63 index_id, 64 tracked: true, 65 } 66 } 67 68 /// backfilling, but not tracked yet 69 pub fn untracked(index_id: u64) -> Self { 70 Self { 71 tracked: false, 72 ..Self::backfilling(index_id) 73 } 74 } 75} 76 77impl<'i> IntoStatic for RepoState<'i> { 78 type Output = RepoState<'static>; 79 80 fn into_static(self) -> Self::Output { 81 RepoState { 82 status: self.status, 83 rev: self.rev, 84 data: self.data, 85 last_seq: self.last_seq, 86 last_updated_at: self.last_updated_at, 87 handle: self.handle.map(|s| s.into_static()), 88 index_id: self.index_id, 89 tracked: self.tracked, 90 } 91 } 92} 93 94#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 95pub enum ResyncErrorKind { 96 Ratelimited, 97 Transport, 98 Generic, 99} 100 101#[derive(Debug, Clone, Serialize, Deserialize)] 102pub enum ResyncState { 103 Error { 104 kind: ResyncErrorKind, 105 retry_count: u32, 106 next_retry: i64, // unix timestamp 107 }, 108 Gone { 109 status: RepoStatus, // deactivated, takendown, suspended 110 }, 111} 112 113impl ResyncState { 114 pub fn next_backoff(retry_count: u32) -> i64 { 115 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 116 let base = 60; 117 let cap = 3600; 118 let mult = 2u64.pow(retry_count.min(10)) as i64; 119 let delay = (base * mult).min(cap); 120 121 // add +/- 10% jitter 122 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 123 let delay = (delay as f64 + jitter) as i64; 124 125 chrono::Utc::now().timestamp() + delay 126 } 127} 128 129// from src/api/event.rs 130 131#[derive(Debug, Serialize, Clone)] 132pub struct MarshallableEvt<'i> { 133 pub id: u64, 134 #[serde(rename = "type")] 135 pub event_type: SmolStr, 136 #[serde(borrow)] 137 #[serde(skip_serializing_if = "Option::is_none")] 138 pub record: Option<RecordEvt<'i>>, 139 #[serde(borrow)] 140 #[serde(skip_serializing_if = "Option::is_none")] 141 pub identity: Option<IdentityEvt<'i>>, 142 #[serde(borrow)] 143 #[serde(skip_serializing_if = "Option::is_none")] 144 pub account: Option<AccountEvt<'i>>, 145} 146 147#[derive(Clone, Debug)] 148pub enum BroadcastEvent { 149 #[allow(dead_code)] 150 Persisted(u64), 151 Ephemeral(MarshallableEvt<'static>), 152} 153 154#[derive(Debug, Serialize, Clone)] 155pub struct RecordEvt<'i> { 156 pub live: bool, 157 #[serde(borrow)] 158 pub did: Did<'i>, 159 pub rev: CowStr<'i>, 160 pub collection: CowStr<'i>, 161 pub rkey: CowStr<'i>, 162 pub action: CowStr<'i>, 163 #[serde(skip_serializing_if = "Option::is_none")] 164 pub record: Option<Value>, 165 #[serde(skip_serializing_if = "Option::is_none")] 166 pub cid: Option<CowStr<'i>>, 167} 168 169#[derive(Debug, Serialize, Clone)] 170pub struct IdentityEvt<'i> { 171 #[serde(borrow)] 172 pub did: Did<'i>, 173 #[serde(skip_serializing_if = "Option::is_none")] 174 pub handle: Option<CowStr<'i>>, 175} 176 177#[derive(Debug, Serialize, Clone)] 178pub struct AccountEvt<'i> { 179 #[serde(borrow)] 180 pub did: Did<'i>, 181 pub active: bool, 182 #[serde(skip_serializing_if = "Option::is_none")] 183 pub status: Option<CowStr<'i>>, 184} 185 186#[derive(Debug, Serialize, Deserialize, Clone)] 187#[serde(bound(deserialize = "'i: 'de"))] 188pub struct StoredEvent<'i> { 189 #[serde(default)] 190 pub live: bool, 191 #[serde(borrow)] 192 pub did: TrimmedDid<'i>, 193 pub rev: DbTid, 194 #[serde(borrow)] 195 pub collection: CowStr<'i>, 196 pub rkey: DbRkey, 197 pub action: DbAction, 198 #[serde(default)] 199 #[serde(skip_serializing_if = "Option::is_none")] 200 pub cid: Option<IpldCid>, 201} 202 203#[derive(Debug, PartialEq, Eq, Clone, Copy)] 204pub enum GaugeState { 205 Synced, 206 Pending, 207 Resync(Option<ResyncErrorKind>), 208} 209 210impl GaugeState { 211 pub fn is_resync(&self) -> bool { 212 matches!(self, GaugeState::Resync(_)) 213 } 214}