at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::fmt::Display;
2
3use jacquard_common::types::cid::IpldCid;
4use jacquard_common::types::string::Did;
5use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use smol_str::SmolStr;
9
10use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13pub enum RepoStatus {
14 Backfilling,
15 Synced,
16 Error(SmolStr),
17 Deactivated,
18 Takendown,
19 Suspended,
20}
21
22impl Display for RepoStatus {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 RepoStatus::Backfilling => write!(f, "backfilling"),
26 RepoStatus::Synced => write!(f, "synced"),
27 RepoStatus::Error(e) => write!(f, "error({e})"),
28 RepoStatus::Deactivated => write!(f, "deactivated"),
29 RepoStatus::Takendown => write!(f, "takendown"),
30 RepoStatus::Suspended => write!(f, "suspended"),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(bound(deserialize = "'i: 'de"))]
37pub struct RepoState<'i> {
38 pub status: RepoStatus,
39 pub rev: Option<DbTid>,
40 pub data: Option<IpldCid>,
41 pub last_seq: Option<i64>,
42 pub last_updated_at: i64, // unix timestamp
43 #[serde(borrow)]
44 pub handle: Option<Handle<'i>>,
45 pub index_id: u64,
46 #[serde(default = "default_tracked")]
47 pub tracked: bool,
48}
49
50fn default_tracked() -> bool {
51 true
52}
53
54impl<'i> RepoState<'i> {
55 pub fn backfilling(index_id: u64) -> Self {
56 Self {
57 status: RepoStatus::Backfilling,
58 rev: None,
59 data: None,
60 last_seq: None,
61 last_updated_at: chrono::Utc::now().timestamp(),
62 handle: None,
63 index_id,
64 tracked: true,
65 }
66 }
67
68 /// backfilling, but not tracked yet
69 pub fn untracked(index_id: u64) -> Self {
70 Self {
71 tracked: false,
72 ..Self::backfilling(index_id)
73 }
74 }
75}
76
77impl<'i> IntoStatic for RepoState<'i> {
78 type Output = RepoState<'static>;
79
80 fn into_static(self) -> Self::Output {
81 RepoState {
82 status: self.status,
83 rev: self.rev,
84 data: self.data,
85 last_seq: self.last_seq,
86 last_updated_at: self.last_updated_at,
87 handle: self.handle.map(|s| s.into_static()),
88 index_id: self.index_id,
89 tracked: self.tracked,
90 }
91 }
92}
93
94#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
95pub enum ResyncErrorKind {
96 Ratelimited,
97 Transport,
98 Generic,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum ResyncState {
103 Error {
104 kind: ResyncErrorKind,
105 retry_count: u32,
106 next_retry: i64, // unix timestamp
107 },
108 Gone {
109 status: RepoStatus, // deactivated, takendown, suspended
110 },
111}
112
113impl ResyncState {
114 pub fn next_backoff(retry_count: u32) -> i64 {
115 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
116 let base = 60;
117 let cap = 3600;
118 let mult = 2u64.pow(retry_count.min(10)) as i64;
119 let delay = (base * mult).min(cap);
120
121 // add +/- 10% jitter
122 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
123 let delay = (delay as f64 + jitter) as i64;
124
125 chrono::Utc::now().timestamp() + delay
126 }
127}
128
129// from src/api/event.rs
130
131#[derive(Debug, Serialize, Clone)]
132pub struct MarshallableEvt<'i> {
133 pub id: u64,
134 #[serde(rename = "type")]
135 pub event_type: SmolStr,
136 #[serde(borrow)]
137 #[serde(skip_serializing_if = "Option::is_none")]
138 pub record: Option<RecordEvt<'i>>,
139 #[serde(borrow)]
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub identity: Option<IdentityEvt<'i>>,
142 #[serde(borrow)]
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub account: Option<AccountEvt<'i>>,
145}
146
147#[derive(Clone, Debug)]
148pub enum BroadcastEvent {
149 #[allow(dead_code)]
150 Persisted(u64),
151 Ephemeral(MarshallableEvt<'static>),
152}
153
154#[derive(Debug, Serialize, Clone)]
155pub struct RecordEvt<'i> {
156 pub live: bool,
157 #[serde(borrow)]
158 pub did: Did<'i>,
159 pub rev: CowStr<'i>,
160 pub collection: CowStr<'i>,
161 pub rkey: CowStr<'i>,
162 pub action: CowStr<'i>,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub record: Option<Value>,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub cid: Option<CowStr<'i>>,
167}
168
169#[derive(Debug, Serialize, Clone)]
170pub struct IdentityEvt<'i> {
171 #[serde(borrow)]
172 pub did: Did<'i>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub handle: Option<CowStr<'i>>,
175}
176
177#[derive(Debug, Serialize, Clone)]
178pub struct AccountEvt<'i> {
179 #[serde(borrow)]
180 pub did: Did<'i>,
181 pub active: bool,
182 #[serde(skip_serializing_if = "Option::is_none")]
183 pub status: Option<CowStr<'i>>,
184}
185
186#[derive(Debug, Serialize, Deserialize, Clone)]
187#[serde(bound(deserialize = "'i: 'de"))]
188pub struct StoredEvent<'i> {
189 #[serde(default)]
190 pub live: bool,
191 #[serde(borrow)]
192 pub did: TrimmedDid<'i>,
193 pub rev: DbTid,
194 #[serde(borrow)]
195 pub collection: CowStr<'i>,
196 pub rkey: DbRkey,
197 pub action: DbAction,
198 #[serde(default)]
199 #[serde(skip_serializing_if = "Option::is_none")]
200 pub cid: Option<IpldCid>,
201}
202
203#[derive(Debug, PartialEq, Eq, Clone, Copy)]
204pub enum GaugeState {
205 Synced,
206 Pending,
207 Resync(Option<ResyncErrorKind>),
208}
209
210impl GaugeState {
211 pub fn is_resync(&self) -> bool {
212 matches!(self, GaugeState::Resync(_))
213 }
214}