kind of like tap but different and in rust
at main 198 lines 5.2 kB view raw
1use std::fmt::Display; 2 3use jacquard::{CowStr, IntoStatic, types::string::Handle}; 4use jacquard_common::types::string::Did; 5use serde::{Deserialize, Serialize}; 6use serde_json::Value; 7use smol_str::SmolStr; 8 9use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 10use jacquard::types::cid::IpldCid; 11 12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 13pub enum RepoStatus { 14 Backfilling, 15 Synced, 16 Error(SmolStr), 17 Deactivated, 18 Takendown, 19 Suspended, 20} 21 22impl Display for RepoStatus { 23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 match self { 25 RepoStatus::Backfilling => write!(f, "backfilling"), 26 RepoStatus::Synced => write!(f, "synced"), 27 RepoStatus::Error(e) => write!(f, "error({e})"), 28 RepoStatus::Deactivated => write!(f, "deactivated"), 29 RepoStatus::Takendown => write!(f, "takendown"), 30 RepoStatus::Suspended => write!(f, "suspended"), 31 } 32 } 33} 34 35#[derive(Debug, Clone, Serialize, Deserialize)] 36#[serde(bound(deserialize = "'i: 'de"))] 37pub struct RepoState<'i> { 38 pub status: RepoStatus, 39 pub rev: Option<DbTid>, 40 pub data: Option<IpldCid>, 41 pub last_seq: Option<i64>, 42 pub last_updated_at: i64, // unix timestamp 43 #[serde(borrow)] 44 pub handle: Option<Handle<'i>>, 45 pub index_id: u64, 46} 47 48impl<'i> RepoState<'i> { 49 pub fn backfilling(index_id: u64) -> Self { 50 Self { 51 status: RepoStatus::Backfilling, 52 rev: None, 53 data: None, 54 last_seq: None, 55 last_updated_at: chrono::Utc::now().timestamp(), 56 handle: None, 57 index_id, 58 } 59 } 60} 61 62impl<'i> IntoStatic for RepoState<'i> { 63 type Output = RepoState<'static>; 64 65 fn into_static(self) -> Self::Output { 66 RepoState { 67 status: self.status, 68 rev: self.rev, 69 data: self.data, 70 last_seq: self.last_seq, 71 last_updated_at: self.last_updated_at, 72 handle: self.handle.map(|s| s.into_static()), 73 index_id: self.index_id, 74 } 75 } 76} 77 78#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 79pub enum ResyncErrorKind { 80 Ratelimited, 81 Transport, 82 Generic, 83} 84 85#[derive(Debug, Clone, Serialize, Deserialize)] 86pub enum ResyncState { 87 Error { 88 kind: ResyncErrorKind, 89 retry_count: u32, 90 next_retry: i64, // unix timestamp 91 }, 92 Gone { 93 status: RepoStatus, // deactivated, takendown, suspended 94 }, 95} 96 97impl ResyncState { 98 pub fn next_backoff(retry_count: u32) -> i64 { 99 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 100 let base = 60; 101 let cap = 3600; 102 let mult = 2u64.pow(retry_count.min(10)) as i64; 103 let delay = (base * mult).min(cap); 104 105 // add +/- 10% jitter 106 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 107 let delay = (delay as f64 + jitter) as i64; 108 109 chrono::Utc::now().timestamp() + delay 110 } 111} 112 113// from src/api/event.rs 114 115#[derive(Debug, Serialize, Clone)] 116pub struct MarshallableEvt<'i> { 117 pub id: u64, 118 #[serde(rename = "type")] 119 pub event_type: SmolStr, 120 #[serde(borrow)] 121 #[serde(skip_serializing_if = "Option::is_none")] 122 pub record: Option<RecordEvt<'i>>, 123 #[serde(borrow)] 124 #[serde(skip_serializing_if = "Option::is_none")] 125 pub identity: Option<IdentityEvt<'i>>, 126 #[serde(borrow)] 127 #[serde(skip_serializing_if = "Option::is_none")] 128 pub account: Option<AccountEvt<'i>>, 129} 130 131#[derive(Clone, Debug)] 132pub enum BroadcastEvent { 133 #[allow(dead_code)] 134 Persisted(u64), 135 Ephemeral(MarshallableEvt<'static>), 136} 137 138#[derive(Debug, Serialize, Clone)] 139pub struct RecordEvt<'i> { 140 pub live: bool, 141 #[serde(borrow)] 142 pub did: Did<'i>, 143 pub rev: CowStr<'i>, 144 pub collection: CowStr<'i>, 145 pub rkey: CowStr<'i>, 146 pub action: CowStr<'i>, 147 #[serde(skip_serializing_if = "Option::is_none")] 148 pub record: Option<Value>, 149 #[serde(skip_serializing_if = "Option::is_none")] 150 pub cid: Option<CowStr<'i>>, 151} 152 153#[derive(Debug, Serialize, Clone)] 154pub struct IdentityEvt<'i> { 155 #[serde(borrow)] 156 pub did: Did<'i>, 157 #[serde(skip_serializing_if = "Option::is_none")] 158 pub handle: Option<CowStr<'i>>, 159} 160 161#[derive(Debug, Serialize, Clone)] 162pub struct AccountEvt<'i> { 163 #[serde(borrow)] 164 pub did: Did<'i>, 165 pub active: bool, 166 #[serde(skip_serializing_if = "Option::is_none")] 167 pub status: Option<CowStr<'i>>, 168} 169 170#[derive(Debug, Serialize, Deserialize, Clone)] 171#[serde(bound(deserialize = "'i: 'de"))] 172pub struct StoredEvent<'i> { 173 #[serde(default)] 174 pub live: bool, 175 #[serde(borrow)] 176 pub did: TrimmedDid<'i>, 177 pub rev: DbTid, 178 #[serde(borrow)] 179 pub collection: CowStr<'i>, 180 pub rkey: DbRkey, 181 pub action: DbAction, 182 #[serde(default)] 183 #[serde(skip_serializing_if = "Option::is_none")] 184 pub cid: Option<IpldCid>, 185} 186 187#[derive(Debug, PartialEq, Eq, Clone, Copy)] 188pub enum GaugeState { 189 Synced, 190 Pending, 191 Resync(Option<ResyncErrorKind>), 192} 193 194impl GaugeState { 195 pub fn is_resync(&self) -> bool { 196 matches!(self, GaugeState::Resync(_)) 197 } 198}