at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 4fe44f8a28620861fb772d40d55fbeb65716e5df 206 lines 5.4 kB view raw
1use std::fmt::Display; 2 3use jacquard::{CowStr, IntoStatic, types::string::Handle}; 4use jacquard_common::types::string::Did; 5use serde::{Deserialize, Serialize}; 6use serde_json::Value; 7use smol_str::SmolStr; 8 9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 10use jacquard::types::cid::IpldCid; 11 12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 13pub enum RepoStatus { 14 Backfilling, 15 Synced, 16 Error(SmolStr), 17 Deactivated, 18 Takendown, 19 Suspended, 20} 21 22impl Display for RepoStatus { 23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 match self { 25 RepoStatus::Backfilling => write!(f, "backfilling"), 26 RepoStatus::Synced => write!(f, "synced"), 27 RepoStatus::Error(e) => write!(f, "error({e})"), 28 RepoStatus::Deactivated => write!(f, "deactivated"), 29 RepoStatus::Takendown => write!(f, "takendown"), 30 RepoStatus::Suspended => write!(f, "suspended"), 31 } 32 } 33} 34 35#[derive(Debug, Clone, Serialize, Deserialize)] 36#[serde(bound(deserialize = "'i: 'de"))] 37pub struct RepoState<'i> { 38 pub status: RepoStatus, 39 pub rev: Option<DbTid>, 40 pub data: Option<IpldCid>, 41 pub last_seq: Option<i64>, 42 pub last_updated_at: i64, // unix timestamp 43 #[serde(borrow)] 44 pub handle: Option<Handle<'i>>, 45 pub index_id: u64, 46 #[serde(default = "default_tracked")] 47 pub tracked: bool, 48} 49 50fn default_tracked() -> bool { 51 true 52} 53 54impl<'i> RepoState<'i> { 55 pub fn backfilling(index_id: u64) -> Self { 56 Self { 57 status: RepoStatus::Backfilling, 58 rev: None, 59 data: None, 60 last_seq: None, 61 last_updated_at: chrono::Utc::now().timestamp(), 62 handle: None, 63 index_id, 64 tracked: true, 65 } 66 } 67} 68 69impl<'i> IntoStatic for RepoState<'i> { 70 type Output = RepoState<'static>; 71 72 fn into_static(self) -> Self::Output { 73 RepoState { 74 status: self.status, 75 rev: self.rev, 76 data: self.data, 77 last_seq: self.last_seq, 78 last_updated_at: self.last_updated_at, 79 handle: self.handle.map(|s| s.into_static()), 80 index_id: self.index_id, 81 tracked: self.tracked, 82 } 83 } 84} 85 86#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 87pub enum ResyncErrorKind { 88 Ratelimited, 89 Transport, 90 Generic, 91} 92 93#[derive(Debug, Clone, Serialize, Deserialize)] 94pub enum ResyncState { 95 Error { 96 kind: ResyncErrorKind, 97 retry_count: u32, 98 next_retry: i64, // unix timestamp 99 }, 100 Gone { 101 status: RepoStatus, // deactivated, takendown, suspended 102 }, 103} 104 105impl ResyncState { 106 pub fn next_backoff(retry_count: u32) -> i64 { 107 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 108 let base = 60; 109 let cap = 3600; 110 let mult = 2u64.pow(retry_count.min(10)) as i64; 111 let delay = (base * mult).min(cap); 112 113 // add +/- 10% jitter 114 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 115 let delay = (delay as f64 + jitter) as i64; 116 117 chrono::Utc::now().timestamp() + delay 118 } 119} 120 121// from src/api/event.rs 122 123#[derive(Debug, Serialize, Clone)] 124pub struct MarshallableEvt<'i> { 125 pub id: u64, 126 #[serde(rename = "type")] 127 pub event_type: SmolStr, 128 #[serde(borrow)] 129 #[serde(skip_serializing_if = "Option::is_none")] 130 pub record: Option<RecordEvt<'i>>, 131 #[serde(borrow)] 132 #[serde(skip_serializing_if = "Option::is_none")] 133 pub identity: Option<IdentityEvt<'i>>, 134 #[serde(borrow)] 135 #[serde(skip_serializing_if = "Option::is_none")] 136 pub account: Option<AccountEvt<'i>>, 137} 138 139#[derive(Clone, Debug)] 140pub enum BroadcastEvent { 141 #[allow(dead_code)] 142 Persisted(u64), 143 Ephemeral(MarshallableEvt<'static>), 144} 145 146#[derive(Debug, Serialize, Clone)] 147pub struct RecordEvt<'i> { 148 pub live: bool, 149 #[serde(borrow)] 150 pub did: Did<'i>, 151 pub rev: CowStr<'i>, 152 pub collection: CowStr<'i>, 153 pub rkey: CowStr<'i>, 154 pub action: CowStr<'i>, 155 #[serde(skip_serializing_if = "Option::is_none")] 156 pub record: Option<Value>, 157 #[serde(skip_serializing_if = "Option::is_none")] 158 pub cid: Option<CowStr<'i>>, 159} 160 161#[derive(Debug, Serialize, Clone)] 162pub struct IdentityEvt<'i> { 163 #[serde(borrow)] 164 pub did: Did<'i>, 165 #[serde(skip_serializing_if = "Option::is_none")] 166 pub handle: Option<CowStr<'i>>, 167} 168 169#[derive(Debug, Serialize, Clone)] 170pub struct AccountEvt<'i> { 171 #[serde(borrow)] 172 pub did: Did<'i>, 173 pub active: bool, 174 #[serde(skip_serializing_if = "Option::is_none")] 175 pub status: Option<CowStr<'i>>, 176} 177 178#[derive(Debug, Serialize, Deserialize, Clone)] 179#[serde(bound(deserialize = "'i: 'de"))] 180pub struct StoredEvent<'i> { 181 #[serde(default)] 182 pub live: bool, 183 #[serde(borrow)] 184 pub did: TrimmedDid<'i>, 185 pub rev: DbTid, 186 #[serde(borrow)] 187 pub collection: CowStr<'i>, 188 pub rkey: DbRkey, 189 pub action: DbAction, 190 #[serde(default)] 191 #[serde(skip_serializing_if = "Option::is_none")] 192 pub cid: Option<IpldCid>, 193} 194 195#[derive(Debug, PartialEq, Eq, Clone, Copy)] 196pub enum GaugeState { 197 Synced, 198 Pending, 199 Resync(Option<ResyncErrorKind>), 200} 201 202impl GaugeState { 203 pub fn is_resync(&self) -> bool { 204 matches!(self, GaugeState::Resync(_)) 205 } 206}