forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use std::fmt::Display;
2
3use jacquard::{CowStr, IntoStatic, types::string::Handle};
4use jacquard_common::types::string::Did;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use smol_str::SmolStr;
8
9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
10use jacquard::types::cid::IpldCid;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13pub enum RepoStatus {
14 Backfilling,
15 Synced,
16 Error(SmolStr),
17 Deactivated,
18 Takendown,
19 Suspended,
20}
21
22impl Display for RepoStatus {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 RepoStatus::Backfilling => write!(f, "backfilling"),
26 RepoStatus::Synced => write!(f, "synced"),
27 RepoStatus::Error(e) => write!(f, "error({e})"),
28 RepoStatus::Deactivated => write!(f, "deactivated"),
29 RepoStatus::Takendown => write!(f, "takendown"),
30 RepoStatus::Suspended => write!(f, "suspended"),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(bound(deserialize = "'i: 'de"))]
37pub struct RepoState<'i> {
38 pub status: RepoStatus,
39 pub rev: Option<DbTid>,
40 pub data: Option<IpldCid>,
41 pub last_seq: Option<i64>,
42 pub last_updated_at: i64, // unix timestamp
43 #[serde(borrow)]
44 pub handle: Option<Handle<'i>>,
45 pub index_id: u64,
46}
47
48impl<'i> RepoState<'i> {
49 pub fn backfilling(index_id: u64) -> Self {
50 Self {
51 status: RepoStatus::Backfilling,
52 rev: None,
53 data: None,
54 last_seq: None,
55 last_updated_at: chrono::Utc::now().timestamp(),
56 handle: None,
57 index_id,
58 }
59 }
60}
61
62impl<'i> IntoStatic for RepoState<'i> {
63 type Output = RepoState<'static>;
64
65 fn into_static(self) -> Self::Output {
66 RepoState {
67 status: self.status,
68 rev: self.rev,
69 data: self.data,
70 last_seq: self.last_seq,
71 last_updated_at: self.last_updated_at,
72 handle: self.handle.map(|s| s.into_static()),
73 index_id: self.index_id,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
79pub enum ResyncErrorKind {
80 Ratelimited,
81 Transport,
82 Generic,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub enum ResyncState {
87 Error {
88 kind: ResyncErrorKind,
89 retry_count: u32,
90 next_retry: i64, // unix timestamp
91 },
92 Gone {
93 status: RepoStatus, // deactivated, takendown, suspended
94 },
95}
96
97impl ResyncState {
98 pub fn next_backoff(retry_count: u32) -> i64 {
99 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
100 let base = 60;
101 let cap = 3600;
102 let mult = 2u64.pow(retry_count.min(10)) as i64;
103 let delay = (base * mult).min(cap);
104
105 // add +/- 10% jitter
106 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
107 let delay = (delay as f64 + jitter) as i64;
108
109 chrono::Utc::now().timestamp() + delay
110 }
111}
112
113// from src/api/event.rs
114
115#[derive(Debug, Serialize, Clone)]
116pub struct MarshallableEvt<'i> {
117 pub id: u64,
118 #[serde(rename = "type")]
119 pub event_type: SmolStr,
120 #[serde(borrow)]
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub record: Option<RecordEvt<'i>>,
123 #[serde(borrow)]
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub identity: Option<IdentityEvt<'i>>,
126 #[serde(borrow)]
127 #[serde(skip_serializing_if = "Option::is_none")]
128 pub account: Option<AccountEvt<'i>>,
129}
130
131#[derive(Clone, Debug)]
132pub enum BroadcastEvent {
133 #[allow(dead_code)]
134 Persisted(u64),
135 Ephemeral(MarshallableEvt<'static>),
136}
137
138#[derive(Debug, Serialize, Clone)]
139pub struct RecordEvt<'i> {
140 pub live: bool,
141 #[serde(borrow)]
142 pub did: Did<'i>,
143 pub rev: CowStr<'i>,
144 pub collection: CowStr<'i>,
145 pub rkey: CowStr<'i>,
146 pub action: CowStr<'i>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 pub record: Option<Value>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub cid: Option<CowStr<'i>>,
151}
152
153#[derive(Debug, Serialize, Clone)]
154pub struct IdentityEvt<'i> {
155 #[serde(borrow)]
156 pub did: Did<'i>,
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub handle: Option<CowStr<'i>>,
159}
160
161#[derive(Debug, Serialize, Clone)]
162pub struct AccountEvt<'i> {
163 #[serde(borrow)]
164 pub did: Did<'i>,
165 pub active: bool,
166 #[serde(skip_serializing_if = "Option::is_none")]
167 pub status: Option<CowStr<'i>>,
168}
169
170#[derive(Debug, Serialize, Deserialize, Clone)]
171#[serde(bound(deserialize = "'i: 'de"))]
172pub struct StoredEvent<'i> {
173 #[serde(default)]
174 pub live: bool,
175 #[serde(borrow)]
176 pub did: TrimmedDid<'i>,
177 pub rev: DbTid,
178 #[serde(borrow)]
179 pub collection: CowStr<'i>,
180 pub rkey: DbRkey,
181 pub action: DbAction,
182 #[serde(default)]
183 #[serde(skip_serializing_if = "Option::is_none")]
184 pub cid: Option<IpldCid>,
185}
186
187#[derive(Debug, PartialEq, Eq, Clone, Copy)]
188pub enum GaugeState {
189 Synced,
190 Pending,
191 Resync(Option<ResyncErrorKind>),
192}
193
194impl GaugeState {
195 pub fn is_resync(&self) -> bool {
196 matches!(self, GaugeState::Resync(_))
197 }
198}