at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::fmt::Display;
2
3use jacquard::{CowStr, IntoStatic, types::string::Handle};
4use jacquard_common::types::string::Did;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use smol_str::SmolStr;
8
9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
10use jacquard::types::cid::IpldCid;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13pub enum RepoStatus {
14 Backfilling,
15 Synced,
16 Error(SmolStr),
17 Deactivated,
18 Takendown,
19 Suspended,
20}
21
22impl Display for RepoStatus {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 RepoStatus::Backfilling => write!(f, "backfilling"),
26 RepoStatus::Synced => write!(f, "synced"),
27 RepoStatus::Error(e) => write!(f, "error({e})"),
28 RepoStatus::Deactivated => write!(f, "deactivated"),
29 RepoStatus::Takendown => write!(f, "takendown"),
30 RepoStatus::Suspended => write!(f, "suspended"),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(bound(deserialize = "'i: 'de"))]
37pub struct RepoState<'i> {
38 pub status: RepoStatus,
39 pub rev: Option<DbTid>,
40 pub data: Option<IpldCid>,
41 pub last_seq: Option<i64>,
42 pub last_updated_at: i64, // unix timestamp
43 #[serde(borrow)]
44 pub handle: Option<Handle<'i>>,
45 pub index_id: u64,
46 #[serde(default = "default_tracked")]
47 pub tracked: bool,
48}
49
50fn default_tracked() -> bool {
51 true
52}
53
54impl<'i> RepoState<'i> {
55 pub fn backfilling(index_id: u64) -> Self {
56 Self {
57 status: RepoStatus::Backfilling,
58 rev: None,
59 data: None,
60 last_seq: None,
61 last_updated_at: chrono::Utc::now().timestamp(),
62 handle: None,
63 index_id,
64 tracked: true,
65 }
66 }
67}
68
69impl<'i> IntoStatic for RepoState<'i> {
70 type Output = RepoState<'static>;
71
72 fn into_static(self) -> Self::Output {
73 RepoState {
74 status: self.status,
75 rev: self.rev,
76 data: self.data,
77 last_seq: self.last_seq,
78 last_updated_at: self.last_updated_at,
79 handle: self.handle.map(|s| s.into_static()),
80 index_id: self.index_id,
81 tracked: self.tracked,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
87pub enum ResyncErrorKind {
88 Ratelimited,
89 Transport,
90 Generic,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub enum ResyncState {
95 Error {
96 kind: ResyncErrorKind,
97 retry_count: u32,
98 next_retry: i64, // unix timestamp
99 },
100 Gone {
101 status: RepoStatus, // deactivated, takendown, suspended
102 },
103}
104
105impl ResyncState {
106 pub fn next_backoff(retry_count: u32) -> i64 {
107 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
108 let base = 60;
109 let cap = 3600;
110 let mult = 2u64.pow(retry_count.min(10)) as i64;
111 let delay = (base * mult).min(cap);
112
113 // add +/- 10% jitter
114 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
115 let delay = (delay as f64 + jitter) as i64;
116
117 chrono::Utc::now().timestamp() + delay
118 }
119}
120
121// from src/api/event.rs
122
123#[derive(Debug, Serialize, Clone)]
124pub struct MarshallableEvt<'i> {
125 pub id: u64,
126 #[serde(rename = "type")]
127 pub event_type: SmolStr,
128 #[serde(borrow)]
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub record: Option<RecordEvt<'i>>,
131 #[serde(borrow)]
132 #[serde(skip_serializing_if = "Option::is_none")]
133 pub identity: Option<IdentityEvt<'i>>,
134 #[serde(borrow)]
135 #[serde(skip_serializing_if = "Option::is_none")]
136 pub account: Option<AccountEvt<'i>>,
137}
138
139#[derive(Clone, Debug)]
140pub enum BroadcastEvent {
141 #[allow(dead_code)]
142 Persisted(u64),
143 Ephemeral(MarshallableEvt<'static>),
144}
145
146#[derive(Debug, Serialize, Clone)]
147pub struct RecordEvt<'i> {
148 pub live: bool,
149 #[serde(borrow)]
150 pub did: Did<'i>,
151 pub rev: CowStr<'i>,
152 pub collection: CowStr<'i>,
153 pub rkey: CowStr<'i>,
154 pub action: CowStr<'i>,
155 #[serde(skip_serializing_if = "Option::is_none")]
156 pub record: Option<Value>,
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub cid: Option<CowStr<'i>>,
159}
160
161#[derive(Debug, Serialize, Clone)]
162pub struct IdentityEvt<'i> {
163 #[serde(borrow)]
164 pub did: Did<'i>,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub handle: Option<CowStr<'i>>,
167}
168
169#[derive(Debug, Serialize, Clone)]
170pub struct AccountEvt<'i> {
171 #[serde(borrow)]
172 pub did: Did<'i>,
173 pub active: bool,
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub status: Option<CowStr<'i>>,
176}
177
178#[derive(Debug, Serialize, Deserialize, Clone)]
179#[serde(bound(deserialize = "'i: 'de"))]
180pub struct StoredEvent<'i> {
181 #[serde(default)]
182 pub live: bool,
183 #[serde(borrow)]
184 pub did: TrimmedDid<'i>,
185 pub rev: DbTid,
186 #[serde(borrow)]
187 pub collection: CowStr<'i>,
188 pub rkey: DbRkey,
189 pub action: DbAction,
190 #[serde(default)]
191 #[serde(skip_serializing_if = "Option::is_none")]
192 pub cid: Option<IpldCid>,
193}
194
195#[derive(Debug, PartialEq, Eq, Clone, Copy)]
196pub enum GaugeState {
197 Synced,
198 Pending,
199 Resync(Option<ResyncErrorKind>),
200}
201
202impl GaugeState {
203 pub fn is_resync(&self) -> bool {
204 matches!(self, GaugeState::Resync(_))
205 }
206}