at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 3eb43b6191a55f71f8327cf6d64290a168d60cff 180 lines 4.8 kB view raw
1use std::fmt::Display; 2 3use jacquard::{CowStr, IntoStatic}; 4use jacquard_common::types::string::Did; 5use serde::{Deserialize, Serialize}; 6use serde_json::Value; 7use smol_str::SmolStr; 8 9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 10use jacquard::types::cid::IpldCid; 11 12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 13pub enum RepoStatus { 14 Backfilling, 15 Synced, 16 Error(SmolStr), 17 Deactivated, 18 Takendown, 19 Suspended, 20} 21 22impl Display for RepoStatus { 23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 match self { 25 RepoStatus::Backfilling => write!(f, "backfilling"), 26 RepoStatus::Synced => write!(f, "synced"), 27 RepoStatus::Error(e) => write!(f, "error({e})"), 28 RepoStatus::Deactivated => write!(f, "deactivated"), 29 RepoStatus::Takendown => write!(f, "takendown"), 30 RepoStatus::Suspended => write!(f, "suspended"), 31 } 32 } 33} 34 35#[derive(Debug, Clone, Serialize, Deserialize)] 36#[serde(bound(deserialize = "'i: 'de"))] 37pub struct RepoState<'i> { 38 #[serde(borrow)] 39 pub did: TrimmedDid<'i>, 40 pub status: RepoStatus, 41 pub rev: Option<DbTid>, 42 pub data: Option<IpldCid>, 43 pub last_seq: Option<i64>, 44 pub last_updated_at: i64, // unix timestamp 45 pub handle: Option<SmolStr>, 46} 47 48impl<'i> RepoState<'i> { 49 pub fn backfilling(did: &'i Did<'i>) -> Self { 50 Self { 51 did: TrimmedDid::from(did), 52 status: RepoStatus::Backfilling, 53 rev: None, 54 data: None, 55 last_seq: None, 56 last_updated_at: chrono::Utc::now().timestamp(), 57 handle: None, 58 } 59 } 60} 61 62impl<'i> IntoStatic for RepoState<'i> { 63 type Output = RepoState<'static>; 64 65 fn into_static(self) -> Self::Output { 66 RepoState { 67 did: self.did.into_static(), 68 status: self.status, 69 rev: self.rev, 70 data: self.data, 71 last_seq: self.last_seq, 72 last_updated_at: self.last_updated_at, 73 handle: self.handle, 74 } 75 } 76} 77 78// from src/backfill/resync_state.rs 79 80#[derive(Debug, Clone, Serialize, Deserialize)] 81pub enum ResyncState { 82 Error { 83 message: SmolStr, 84 retry_count: u32, 85 next_retry: i64, // unix timestamp 86 }, 87 Gone { 88 status: RepoStatus, // deactivated, takendown, suspended 89 }, 90} 91 92impl ResyncState { 93 pub fn next_backoff(retry_count: u32) -> i64 { 94 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 95 let base = 60; 96 let cap = 3600; 97 let mult = 2u64.pow(retry_count.min(10)) as i64; 98 let delay = (base * mult).min(cap); 99 100 // add +/- 10% jitter 101 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 102 let delay = (delay as f64 + jitter) as i64; 103 104 chrono::Utc::now().timestamp() + delay 105 } 106} 107 108// from src/api/event.rs 109 110#[derive(Debug, Serialize, Clone)] 111pub struct MarshallableEvt<'i> { 112 pub id: u64, 113 #[serde(rename = "type")] 114 pub event_type: SmolStr, 115 #[serde(borrow)] 116 #[serde(skip_serializing_if = "Option::is_none")] 117 pub record: Option<RecordEvt<'i>>, 118 #[serde(borrow)] 119 #[serde(skip_serializing_if = "Option::is_none")] 120 pub identity: Option<IdentityEvt<'i>>, 121 #[serde(borrow)] 122 #[serde(skip_serializing_if = "Option::is_none")] 123 pub account: Option<AccountEvt<'i>>, 124} 125 126#[derive(Clone, Debug)] 127pub enum BroadcastEvent { 128 #[allow(dead_code)] 129 Persisted(u64), 130 Ephemeral(MarshallableEvt<'static>), 131} 132 133#[derive(Debug, Serialize, Clone)] 134pub struct RecordEvt<'i> { 135 pub live: bool, 136 #[serde(borrow)] 137 pub did: Did<'i>, 138 pub rev: CowStr<'i>, 139 pub collection: CowStr<'i>, 140 pub rkey: CowStr<'i>, 141 pub action: CowStr<'i>, 142 #[serde(skip_serializing_if = "Option::is_none")] 143 pub record: Option<Value>, 144 #[serde(skip_serializing_if = "Option::is_none")] 145 pub cid: Option<CowStr<'i>>, 146} 147 148#[derive(Debug, Serialize, Clone)] 149pub struct IdentityEvt<'i> { 150 #[serde(borrow)] 151 pub did: Did<'i>, 152 #[serde(skip_serializing_if = "Option::is_none")] 153 pub handle: Option<CowStr<'i>>, 154} 155 156#[derive(Debug, Serialize, Clone)] 157pub struct AccountEvt<'i> { 158 #[serde(borrow)] 159 pub did: Did<'i>, 160 pub active: bool, 161 #[serde(skip_serializing_if = "Option::is_none")] 162 pub status: Option<CowStr<'i>>, 163} 164 165#[derive(Debug, Serialize, Deserialize, Clone)] 166#[serde(bound(deserialize = "'i: 'de"))] 167pub struct StoredEvent<'i> { 168 #[serde(default)] 169 pub live: bool, 170 #[serde(borrow)] 171 pub did: TrimmedDid<'i>, 172 pub rev: DbTid, 173 #[serde(borrow)] 174 pub collection: CowStr<'i>, 175 pub rkey: DbRkey, 176 pub action: DbAction, 177 #[serde(default)] 178 #[serde(skip_serializing_if = "Option::is_none")] 179 pub cid: Option<IpldCid>, 180}