at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::fmt::Display;
2
3use jacquard::{CowStr, IntoStatic};
4use jacquard_common::types::string::Did;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use smol_str::SmolStr;
8
9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
10use jacquard::types::cid::IpldCid;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13pub enum RepoStatus {
14 Backfilling,
15 Synced,
16 Error(SmolStr),
17 Deactivated,
18 Takendown,
19 Suspended,
20}
21
22impl Display for RepoStatus {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 RepoStatus::Backfilling => write!(f, "backfilling"),
26 RepoStatus::Synced => write!(f, "synced"),
27 RepoStatus::Error(e) => write!(f, "error({e})"),
28 RepoStatus::Deactivated => write!(f, "deactivated"),
29 RepoStatus::Takendown => write!(f, "takendown"),
30 RepoStatus::Suspended => write!(f, "suspended"),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(bound(deserialize = "'i: 'de"))]
37pub struct RepoState<'i> {
38 #[serde(borrow)]
39 pub did: TrimmedDid<'i>,
40 pub status: RepoStatus,
41 pub rev: Option<DbTid>,
42 pub data: Option<IpldCid>,
43 pub last_seq: Option<i64>,
44 pub last_updated_at: i64, // unix timestamp
45 pub handle: Option<SmolStr>,
46}
47
48impl<'i> RepoState<'i> {
49 pub fn backfilling(did: &'i Did<'i>) -> Self {
50 Self {
51 did: TrimmedDid::from(did),
52 status: RepoStatus::Backfilling,
53 rev: None,
54 data: None,
55 last_seq: None,
56 last_updated_at: chrono::Utc::now().timestamp(),
57 handle: None,
58 }
59 }
60}
61
62impl<'i> IntoStatic for RepoState<'i> {
63 type Output = RepoState<'static>;
64
65 fn into_static(self) -> Self::Output {
66 RepoState {
67 did: self.did.into_static(),
68 status: self.status,
69 rev: self.rev,
70 data: self.data,
71 last_seq: self.last_seq,
72 last_updated_at: self.last_updated_at,
73 handle: self.handle,
74 }
75 }
76}
77
78// from src/backfill/resync_state.rs
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum ResyncState {
82 Error {
83 message: SmolStr,
84 retry_count: u32,
85 next_retry: i64, // unix timestamp
86 },
87 Gone {
88 status: RepoStatus, // deactivated, takendown, suspended
89 },
90}
91
92impl ResyncState {
93 pub fn next_backoff(retry_count: u32) -> i64 {
94 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
95 let base = 60;
96 let cap = 3600;
97 let mult = 2u64.pow(retry_count.min(10)) as i64;
98 let delay = (base * mult).min(cap);
99
100 // add +/- 10% jitter
101 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
102 let delay = (delay as f64 + jitter) as i64;
103
104 chrono::Utc::now().timestamp() + delay
105 }
106}
107
108// from src/api/event.rs
109
110#[derive(Debug, Serialize, Clone)]
111pub struct MarshallableEvt<'i> {
112 pub id: u64,
113 #[serde(rename = "type")]
114 pub event_type: SmolStr,
115 #[serde(borrow)]
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub record: Option<RecordEvt<'i>>,
118 #[serde(borrow)]
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub identity: Option<IdentityEvt<'i>>,
121 #[serde(borrow)]
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub account: Option<AccountEvt<'i>>,
124}
125
126#[derive(Clone, Debug)]
127pub enum BroadcastEvent {
128 #[allow(dead_code)]
129 Persisted(u64),
130 Ephemeral(MarshallableEvt<'static>),
131}
132
133#[derive(Debug, Serialize, Clone)]
134pub struct RecordEvt<'i> {
135 pub live: bool,
136 #[serde(borrow)]
137 pub did: Did<'i>,
138 pub rev: CowStr<'i>,
139 pub collection: CowStr<'i>,
140 pub rkey: CowStr<'i>,
141 pub action: CowStr<'i>,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub record: Option<Value>,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 pub cid: Option<CowStr<'i>>,
146}
147
148#[derive(Debug, Serialize, Clone)]
149pub struct IdentityEvt<'i> {
150 #[serde(borrow)]
151 pub did: Did<'i>,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub handle: Option<CowStr<'i>>,
154}
155
156#[derive(Debug, Serialize, Clone)]
157pub struct AccountEvt<'i> {
158 #[serde(borrow)]
159 pub did: Did<'i>,
160 pub active: bool,
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub status: Option<CowStr<'i>>,
163}
164
165#[derive(Debug, Serialize, Deserialize, Clone)]
166#[serde(bound(deserialize = "'i: 'de"))]
167pub struct StoredEvent<'i> {
168 #[serde(default)]
169 pub live: bool,
170 #[serde(borrow)]
171 pub did: TrimmedDid<'i>,
172 pub rev: DbTid,
173 #[serde(borrow)]
174 pub collection: CowStr<'i>,
175 pub rkey: DbRkey,
176 pub action: DbAction,
177 #[serde(default)]
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub cid: Option<IpldCid>,
180}