at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 98b08acb7ebd53e2648888efacebe8ce2851d2c4 227 lines 6.1 kB view raw
1use std::fmt::Display; 2 3use jacquard_common::types::cid::IpldCid; 4use jacquard_common::types::string::Did; 5use jacquard_common::{CowStr, IntoStatic, types::string::Handle}; 6use serde::{Deserialize, Serialize}; 7use serde_json::Value; 8use smol_str::{SmolStr, ToSmolStr}; 9 10use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid}; 11use crate::resolver::MiniDoc; 12 13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 14pub enum RepoStatus { 15 Backfilling, 16 Synced, 17 Error(SmolStr), 18 Deactivated, 19 Takendown, 20 Suspended, 21} 22 23impl Display for RepoStatus { 24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 25 match self { 26 RepoStatus::Backfilling => write!(f, "backfilling"), 27 RepoStatus::Synced => write!(f, "synced"), 28 RepoStatus::Error(e) => write!(f, "error({e})"), 29 RepoStatus::Deactivated => write!(f, "deactivated"), 30 RepoStatus::Takendown => write!(f, "takendown"), 31 RepoStatus::Suspended => write!(f, "suspended"), 32 } 33 } 34} 35 36#[derive(Debug, Clone, Serialize, Deserialize)] 37#[serde(bound(deserialize = "'i: 'de"))] 38pub struct RepoState<'i> { 39 pub status: RepoStatus, 40 pub rev: Option<DbTid>, 41 pub data: Option<IpldCid>, 42 pub last_updated_at: i64, // unix timestamp 43 #[serde(borrow)] 44 pub handle: Option<Handle<'i>>, 45 pub index_id: u64, 46 #[serde(default = "default_tracked")] 47 pub tracked: bool, 48 #[serde(default)] 49 pub signing_key: Option<DidKey<'i>>, 50 #[serde(default)] 51 pub pds: Option<CowStr<'i>>, 52} 53 54fn default_tracked() -> bool { 55 true 56} 57 58impl<'i> RepoState<'i> { 59 pub fn backfilling(index_id: u64) -> Self { 60 Self { 61 status: RepoStatus::Backfilling, 62 rev: None, 63 data: None, 64 last_updated_at: chrono::Utc::now().timestamp(), 65 66 index_id, 67 tracked: true, 68 handle: None, 69 pds: None, 70 signing_key: None, 71 } 72 } 73 74 /// backfilling, but not tracked yet 75 pub fn untracked(index_id: u64) -> Self { 76 Self { 77 tracked: false, 78 ..Self::backfilling(index_id) 79 } 80 } 81 82 pub fn update_from_doc(&mut self, doc: MiniDoc) { 83 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr())); 84 self.handle = doc.handle; 85 self.signing_key = doc.key.map(From::from); 86 } 87} 88 89impl<'i> IntoStatic for RepoState<'i> { 90 type Output = RepoState<'static>; 91 92 fn into_static(self) -> Self::Output { 93 RepoState { 94 status: self.status, 95 rev: self.rev, 96 data: self.data, 97 last_updated_at: self.last_updated_at, 98 index_id: self.index_id, 99 tracked: self.tracked, 100 handle: self.handle.map(IntoStatic::into_static), 101 pds: self.pds.map(IntoStatic::into_static), 102 signing_key: self.signing_key.map(IntoStatic::into_static), 103 } 104 } 105} 106 107#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 108pub enum ResyncErrorKind { 109 Ratelimited, 110 Transport, 111 Generic, 112} 113 114#[derive(Debug, Clone, Serialize, Deserialize)] 115pub enum ResyncState { 116 Error { 117 kind: ResyncErrorKind, 118 retry_count: u32, 119 next_retry: i64, // unix timestamp 120 }, 121 Gone { 122 status: RepoStatus, // deactivated, takendown, suspended 123 }, 124} 125 126impl ResyncState { 127 pub fn next_backoff(retry_count: u32) -> i64 { 128 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 129 let base = 60; 130 let cap = 3600; 131 let mult = 2u64.pow(retry_count.min(10)) as i64; 132 let delay = (base * mult).min(cap); 133 134 // add +/- 10% jitter 135 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 136 let delay = (delay as f64 + jitter) as i64; 137 138 chrono::Utc::now().timestamp() + delay 139 } 140} 141 142// from src/api/event.rs 143 144#[derive(Debug, Serialize, Clone)] 145pub struct MarshallableEvt<'i> { 146 pub id: u64, 147 #[serde(rename = "type")] 148 pub event_type: SmolStr, 149 #[serde(borrow)] 150 #[serde(skip_serializing_if = "Option::is_none")] 151 pub record: Option<RecordEvt<'i>>, 152 #[serde(borrow)] 153 #[serde(skip_serializing_if = "Option::is_none")] 154 pub identity: Option<IdentityEvt<'i>>, 155 #[serde(borrow)] 156 #[serde(skip_serializing_if = "Option::is_none")] 157 pub account: Option<AccountEvt<'i>>, 158} 159 160#[derive(Clone, Debug)] 161pub enum BroadcastEvent { 162 #[allow(dead_code)] 163 Persisted(u64), 164 Ephemeral(MarshallableEvt<'static>), 165} 166 167#[derive(Debug, Serialize, Clone)] 168pub struct RecordEvt<'i> { 169 pub live: bool, 170 #[serde(borrow)] 171 pub did: Did<'i>, 172 pub rev: CowStr<'i>, 173 pub collection: CowStr<'i>, 174 pub rkey: CowStr<'i>, 175 pub action: CowStr<'i>, 176 #[serde(skip_serializing_if = "Option::is_none")] 177 pub record: Option<Value>, 178 #[serde(skip_serializing_if = "Option::is_none")] 179 pub cid: Option<CowStr<'i>>, 180} 181 182#[derive(Debug, Serialize, Clone)] 183pub struct IdentityEvt<'i> { 184 #[serde(borrow)] 185 pub did: Did<'i>, 186 #[serde(skip_serializing_if = "Option::is_none")] 187 pub handle: Option<Handle<'i>>, 188} 189 190#[derive(Debug, Serialize, Clone)] 191pub struct AccountEvt<'i> { 192 #[serde(borrow)] 193 pub did: Did<'i>, 194 pub active: bool, 195 #[serde(skip_serializing_if = "Option::is_none")] 196 pub status: Option<CowStr<'i>>, 197} 198 199#[derive(Debug, Serialize, Deserialize, Clone)] 200#[serde(bound(deserialize = "'i: 'de"))] 201pub struct StoredEvent<'i> { 202 #[serde(default)] 203 pub live: bool, 204 #[serde(borrow)] 205 pub did: TrimmedDid<'i>, 206 pub rev: DbTid, 207 #[serde(borrow)] 208 pub collection: CowStr<'i>, 209 pub rkey: DbRkey, 210 pub action: DbAction, 211 #[serde(default)] 212 #[serde(skip_serializing_if = "Option::is_none")] 213 pub cid: Option<IpldCid>, 214} 215 216#[derive(Debug, PartialEq, Eq, Clone, Copy)] 217pub enum GaugeState { 218 Synced, 219 Pending, 220 Resync(Option<ResyncErrorKind>), 221} 222 223impl GaugeState { 224 pub fn is_resync(&self) -> bool { 225 matches!(self, GaugeState::Resync(_)) 226 } 227}