at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::fmt::Display;
2
3use jacquard_common::types::cid::IpldCid;
4use jacquard_common::types::string::Did;
5use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use smol_str::{SmolStr, ToSmolStr};
9
10use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid};
11use crate::resolver::MiniDoc;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14pub enum RepoStatus {
15 Backfilling,
16 Synced,
17 Error(SmolStr),
18 Deactivated,
19 Takendown,
20 Suspended,
21}
22
23impl Display for RepoStatus {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 RepoStatus::Backfilling => write!(f, "backfilling"),
27 RepoStatus::Synced => write!(f, "synced"),
28 RepoStatus::Error(e) => write!(f, "error({e})"),
29 RepoStatus::Deactivated => write!(f, "deactivated"),
30 RepoStatus::Takendown => write!(f, "takendown"),
31 RepoStatus::Suspended => write!(f, "suspended"),
32 }
33 }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(bound(deserialize = "'i: 'de"))]
38pub struct RepoState<'i> {
39 pub status: RepoStatus,
40 pub rev: Option<DbTid>,
41 pub data: Option<IpldCid>,
42 pub last_updated_at: i64, // unix timestamp
43 #[serde(borrow)]
44 pub handle: Option<Handle<'i>>,
45 pub index_id: u64,
46 #[serde(default = "default_tracked")]
47 pub tracked: bool,
48 #[serde(default)]
49 pub signing_key: Option<DidKey<'i>>,
50 #[serde(default)]
51 pub pds: Option<CowStr<'i>>,
52}
53
54fn default_tracked() -> bool {
55 true
56}
57
58impl<'i> RepoState<'i> {
59 pub fn backfilling(index_id: u64) -> Self {
60 Self {
61 status: RepoStatus::Backfilling,
62 rev: None,
63 data: None,
64 last_updated_at: chrono::Utc::now().timestamp(),
65
66 index_id,
67 tracked: true,
68 handle: None,
69 pds: None,
70 signing_key: None,
71 }
72 }
73
74 /// backfilling, but not tracked yet
75 pub fn untracked(index_id: u64) -> Self {
76 Self {
77 tracked: false,
78 ..Self::backfilling(index_id)
79 }
80 }
81
82 pub fn update_from_doc(&mut self, doc: MiniDoc) {
83 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr()));
84 self.handle = doc.handle;
85 self.signing_key = doc.key.map(From::from);
86 }
87}
88
89impl<'i> IntoStatic for RepoState<'i> {
90 type Output = RepoState<'static>;
91
92 fn into_static(self) -> Self::Output {
93 RepoState {
94 status: self.status,
95 rev: self.rev,
96 data: self.data,
97 last_updated_at: self.last_updated_at,
98 index_id: self.index_id,
99 tracked: self.tracked,
100 handle: self.handle.map(IntoStatic::into_static),
101 pds: self.pds.map(IntoStatic::into_static),
102 signing_key: self.signing_key.map(IntoStatic::into_static),
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
108pub enum ResyncErrorKind {
109 Ratelimited,
110 Transport,
111 Generic,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub enum ResyncState {
116 Error {
117 kind: ResyncErrorKind,
118 retry_count: u32,
119 next_retry: i64, // unix timestamp
120 },
121 Gone {
122 status: RepoStatus, // deactivated, takendown, suspended
123 },
124}
125
126impl ResyncState {
127 pub fn next_backoff(retry_count: u32) -> i64 {
128 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
129 let base = 60;
130 let cap = 3600;
131 let mult = 2u64.pow(retry_count.min(10)) as i64;
132 let delay = (base * mult).min(cap);
133
134 // add +/- 10% jitter
135 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
136 let delay = (delay as f64 + jitter) as i64;
137
138 chrono::Utc::now().timestamp() + delay
139 }
140}
141
142// from src/api/event.rs
143
144#[derive(Debug, Serialize, Clone)]
145pub struct MarshallableEvt<'i> {
146 pub id: u64,
147 #[serde(rename = "type")]
148 pub event_type: SmolStr,
149 #[serde(borrow)]
150 #[serde(skip_serializing_if = "Option::is_none")]
151 pub record: Option<RecordEvt<'i>>,
152 #[serde(borrow)]
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub identity: Option<IdentityEvt<'i>>,
155 #[serde(borrow)]
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub account: Option<AccountEvt<'i>>,
158}
159
160#[derive(Clone, Debug)]
161pub enum BroadcastEvent {
162 #[allow(dead_code)]
163 Persisted(u64),
164 Ephemeral(MarshallableEvt<'static>),
165}
166
167#[derive(Debug, Serialize, Clone)]
168pub struct RecordEvt<'i> {
169 pub live: bool,
170 #[serde(borrow)]
171 pub did: Did<'i>,
172 pub rev: CowStr<'i>,
173 pub collection: CowStr<'i>,
174 pub rkey: CowStr<'i>,
175 pub action: CowStr<'i>,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub record: Option<Value>,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub cid: Option<CowStr<'i>>,
180}
181
182#[derive(Debug, Serialize, Clone)]
183pub struct IdentityEvt<'i> {
184 #[serde(borrow)]
185 pub did: Did<'i>,
186 #[serde(skip_serializing_if = "Option::is_none")]
187 pub handle: Option<Handle<'i>>,
188}
189
190#[derive(Debug, Serialize, Clone)]
191pub struct AccountEvt<'i> {
192 #[serde(borrow)]
193 pub did: Did<'i>,
194 pub active: bool,
195 #[serde(skip_serializing_if = "Option::is_none")]
196 pub status: Option<CowStr<'i>>,
197}
198
199#[derive(Debug, Serialize, Deserialize, Clone)]
200#[serde(bound(deserialize = "'i: 'de"))]
201pub struct StoredEvent<'i> {
202 #[serde(default)]
203 pub live: bool,
204 #[serde(borrow)]
205 pub did: TrimmedDid<'i>,
206 pub rev: DbTid,
207 #[serde(borrow)]
208 pub collection: CowStr<'i>,
209 pub rkey: DbRkey,
210 pub action: DbAction,
211 #[serde(default)]
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub cid: Option<IpldCid>,
214}
215
216#[derive(Debug, PartialEq, Eq, Clone, Copy)]
217pub enum GaugeState {
218 Synced,
219 Pending,
220 Resync(Option<ResyncErrorKind>),
221}
222
223impl GaugeState {
224 pub fn is_resync(&self) -> bool {
225 matches!(self, GaugeState::Resync(_))
226 }
227}