at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 25e6d3c7400a40dfbadcae56258603ec3353c476 187 lines 5.0 kB view raw
1use std::fmt::Display; 2 3use jacquard::{CowStr, IntoStatic}; 4use jacquard_common::types::string::Did; 5use serde::{Deserialize, Serialize}; 6use serde_json::Value; 7use smol_str::SmolStr; 8 9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 10use jacquard::types::cid::IpldCid; 11 12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 13pub enum RepoStatus { 14 Backfilling, 15 Synced, 16 Error(SmolStr), 17 Deactivated, 18 Takendown, 19 Suspended, 20} 21 22impl Display for RepoStatus { 23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 match self { 25 RepoStatus::Backfilling => write!(f, "backfilling"), 26 RepoStatus::Synced => write!(f, "synced"), 27 RepoStatus::Error(e) => write!(f, "error({e})"), 28 RepoStatus::Deactivated => write!(f, "deactivated"), 29 RepoStatus::Takendown => write!(f, "takendown"), 30 RepoStatus::Suspended => write!(f, "suspended"), 31 } 32 } 33} 34 35#[derive(Debug, Clone, Serialize, Deserialize)] 36#[serde(bound(deserialize = "'i: 'de"))] 37pub struct RepoState<'i> { 38 #[serde(borrow)] 39 pub did: TrimmedDid<'i>, 40 pub status: RepoStatus, 41 pub rev: Option<DbTid>, 42 pub data: Option<IpldCid>, 43 pub last_seq: Option<i64>, 44 pub last_updated_at: i64, // unix timestamp 45 pub handle: Option<SmolStr>, 46} 47 48impl<'i> RepoState<'i> { 49 pub fn backfilling(did: &'i Did<'i>) -> Self { 50 Self { 51 did: TrimmedDid::from(did), 52 status: RepoStatus::Backfilling, 53 rev: None, 54 data: None, 55 last_seq: None, 56 last_updated_at: chrono::Utc::now().timestamp(), 57 handle: None, 58 } 59 } 60} 61 62impl<'i> IntoStatic for RepoState<'i> { 63 type Output = RepoState<'static>; 64 65 fn into_static(self) -> Self::Output { 66 RepoState { 67 did: self.did.into_static(), 68 status: self.status, 69 rev: self.rev, 70 data: self.data, 71 last_seq: self.last_seq, 72 last_updated_at: self.last_updated_at, 73 handle: self.handle, 74 } 75 } 76} 77 78// from src/backfill/resync_state.rs 79 80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 81pub enum ResyncErrorKind { 82 Ratelimited, 83 Transport, 84 Generic, 85} 86 87#[derive(Debug, Clone, Serialize, Deserialize)] 88pub enum ResyncState { 89 Error { 90 kind: ResyncErrorKind, 91 retry_count: u32, 92 next_retry: i64, // unix timestamp 93 }, 94 Gone { 95 status: RepoStatus, // deactivated, takendown, suspended 96 }, 97} 98 99impl ResyncState { 100 pub fn next_backoff(retry_count: u32) -> i64 { 101 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 102 let base = 60; 103 let cap = 3600; 104 let mult = 2u64.pow(retry_count.min(10)) as i64; 105 let delay = (base * mult).min(cap); 106 107 // add +/- 10% jitter 108 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 109 let delay = (delay as f64 + jitter) as i64; 110 111 chrono::Utc::now().timestamp() + delay 112 } 113} 114 115// from src/api/event.rs 116 117#[derive(Debug, Serialize, Clone)] 118pub struct MarshallableEvt<'i> { 119 pub id: u64, 120 #[serde(rename = "type")] 121 pub event_type: SmolStr, 122 #[serde(borrow)] 123 #[serde(skip_serializing_if = "Option::is_none")] 124 pub record: Option<RecordEvt<'i>>, 125 #[serde(borrow)] 126 #[serde(skip_serializing_if = "Option::is_none")] 127 pub identity: Option<IdentityEvt<'i>>, 128 #[serde(borrow)] 129 #[serde(skip_serializing_if = "Option::is_none")] 130 pub account: Option<AccountEvt<'i>>, 131} 132 133#[derive(Clone, Debug)] 134pub enum BroadcastEvent { 135 #[allow(dead_code)] 136 Persisted(u64), 137 Ephemeral(MarshallableEvt<'static>), 138} 139 140#[derive(Debug, Serialize, Clone)] 141pub struct RecordEvt<'i> { 142 pub live: bool, 143 #[serde(borrow)] 144 pub did: Did<'i>, 145 pub rev: CowStr<'i>, 146 pub collection: CowStr<'i>, 147 pub rkey: CowStr<'i>, 148 pub action: CowStr<'i>, 149 #[serde(skip_serializing_if = "Option::is_none")] 150 pub record: Option<Value>, 151 #[serde(skip_serializing_if = "Option::is_none")] 152 pub cid: Option<CowStr<'i>>, 153} 154 155#[derive(Debug, Serialize, Clone)] 156pub struct IdentityEvt<'i> { 157 #[serde(borrow)] 158 pub did: Did<'i>, 159 #[serde(skip_serializing_if = "Option::is_none")] 160 pub handle: Option<CowStr<'i>>, 161} 162 163#[derive(Debug, Serialize, Clone)] 164pub struct AccountEvt<'i> { 165 #[serde(borrow)] 166 pub did: Did<'i>, 167 pub active: bool, 168 #[serde(skip_serializing_if = "Option::is_none")] 169 pub status: Option<CowStr<'i>>, 170} 171 172#[derive(Debug, Serialize, Deserialize, Clone)] 173#[serde(bound(deserialize = "'i: 'de"))] 174pub struct StoredEvent<'i> { 175 #[serde(default)] 176 pub live: bool, 177 #[serde(borrow)] 178 pub did: TrimmedDid<'i>, 179 pub rev: DbTid, 180 #[serde(borrow)] 181 pub collection: CowStr<'i>, 182 pub rkey: DbRkey, 183 pub action: DbAction, 184 #[serde(default)] 185 #[serde(skip_serializing_if = "Option::is_none")] 186 pub cid: Option<IpldCid>, 187}