at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::fmt::Display;
2
3use jacquard::{CowStr, IntoStatic};
4use jacquard_common::types::string::Did;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use smol_str::SmolStr;
8
9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
10use jacquard::types::cid::IpldCid;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13pub enum RepoStatus {
14 Backfilling,
15 Synced,
16 Error(SmolStr),
17 Deactivated,
18 Takendown,
19 Suspended,
20}
21
22impl Display for RepoStatus {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 RepoStatus::Backfilling => write!(f, "backfilling"),
26 RepoStatus::Synced => write!(f, "synced"),
27 RepoStatus::Error(e) => write!(f, "error({e})"),
28 RepoStatus::Deactivated => write!(f, "deactivated"),
29 RepoStatus::Takendown => write!(f, "takendown"),
30 RepoStatus::Suspended => write!(f, "suspended"),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(bound(deserialize = "'i: 'de"))]
37pub struct RepoState<'i> {
38 #[serde(borrow)]
39 pub did: TrimmedDid<'i>,
40 pub status: RepoStatus,
41 pub rev: Option<DbTid>,
42 pub data: Option<IpldCid>,
43 pub last_seq: Option<i64>,
44 pub last_updated_at: i64, // unix timestamp
45 pub handle: Option<SmolStr>,
46}
47
48impl<'i> RepoState<'i> {
49 pub fn backfilling(did: &'i Did<'i>) -> Self {
50 Self {
51 did: TrimmedDid::from(did),
52 status: RepoStatus::Backfilling,
53 rev: None,
54 data: None,
55 last_seq: None,
56 last_updated_at: chrono::Utc::now().timestamp(),
57 handle: None,
58 }
59 }
60}
61
62impl<'i> IntoStatic for RepoState<'i> {
63 type Output = RepoState<'static>;
64
65 fn into_static(self) -> Self::Output {
66 RepoState {
67 did: self.did.into_static(),
68 status: self.status,
69 rev: self.rev,
70 data: self.data,
71 last_seq: self.last_seq,
72 last_updated_at: self.last_updated_at,
73 handle: self.handle,
74 }
75 }
76}
77
78// from src/backfill/resync_state.rs
79
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
81pub enum ResyncErrorKind {
82 Ratelimited,
83 Transport,
84 Generic,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum ResyncState {
89 Error {
90 kind: ResyncErrorKind,
91 retry_count: u32,
92 next_retry: i64, // unix timestamp
93 },
94 Gone {
95 status: RepoStatus, // deactivated, takendown, suspended
96 },
97}
98
99impl ResyncState {
100 pub fn next_backoff(retry_count: u32) -> i64 {
101 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
102 let base = 60;
103 let cap = 3600;
104 let mult = 2u64.pow(retry_count.min(10)) as i64;
105 let delay = (base * mult).min(cap);
106
107 // add +/- 10% jitter
108 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
109 let delay = (delay as f64 + jitter) as i64;
110
111 chrono::Utc::now().timestamp() + delay
112 }
113}
114
115// from src/api/event.rs
116
117#[derive(Debug, Serialize, Clone)]
118pub struct MarshallableEvt<'i> {
119 pub id: u64,
120 #[serde(rename = "type")]
121 pub event_type: SmolStr,
122 #[serde(borrow)]
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub record: Option<RecordEvt<'i>>,
125 #[serde(borrow)]
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub identity: Option<IdentityEvt<'i>>,
128 #[serde(borrow)]
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub account: Option<AccountEvt<'i>>,
131}
132
133#[derive(Clone, Debug)]
134pub enum BroadcastEvent {
135 #[allow(dead_code)]
136 Persisted(u64),
137 Ephemeral(MarshallableEvt<'static>),
138}
139
140#[derive(Debug, Serialize, Clone)]
141pub struct RecordEvt<'i> {
142 pub live: bool,
143 #[serde(borrow)]
144 pub did: Did<'i>,
145 pub rev: CowStr<'i>,
146 pub collection: CowStr<'i>,
147 pub rkey: CowStr<'i>,
148 pub action: CowStr<'i>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub record: Option<Value>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub cid: Option<CowStr<'i>>,
153}
154
155#[derive(Debug, Serialize, Clone)]
156pub struct IdentityEvt<'i> {
157 #[serde(borrow)]
158 pub did: Did<'i>,
159 #[serde(skip_serializing_if = "Option::is_none")]
160 pub handle: Option<CowStr<'i>>,
161}
162
163#[derive(Debug, Serialize, Clone)]
164pub struct AccountEvt<'i> {
165 #[serde(borrow)]
166 pub did: Did<'i>,
167 pub active: bool,
168 #[serde(skip_serializing_if = "Option::is_none")]
169 pub status: Option<CowStr<'i>>,
170}
171
172#[derive(Debug, Serialize, Deserialize, Clone)]
173#[serde(bound(deserialize = "'i: 'de"))]
174pub struct StoredEvent<'i> {
175 #[serde(default)]
176 pub live: bool,
177 #[serde(borrow)]
178 pub did: TrimmedDid<'i>,
179 pub rev: DbTid,
180 #[serde(borrow)]
181 pub collection: CowStr<'i>,
182 pub rkey: DbRkey,
183 pub action: DbAction,
184 #[serde(default)]
185 #[serde(skip_serializing_if = "Option::is_none")]
186 pub cid: Option<IpldCid>,
187}