this repo has no description
1use crate::sync::firehose::SequencedEvent; 2use cid::Cid; 3use serde::{Deserialize, Serialize}; 4use std::str::FromStr; 5 6#[derive(Debug, Serialize, Deserialize)] 7pub struct FrameHeader { 8 pub op: i64, 9 pub t: String, 10} 11 12#[derive(Debug, Serialize, Deserialize)] 13pub struct CommitFrame { 14 pub seq: i64, 15 pub rebase: bool, 16 #[serde(rename = "tooBig")] 17 pub too_big: bool, 18 pub repo: String, 19 pub commit: Cid, 20 pub rev: String, 21 pub since: Option<String>, 22 #[serde(with = "serde_bytes")] 23 pub blocks: Vec<u8>, 24 pub ops: Vec<RepoOp>, 25 pub blobs: Vec<Cid>, 26 pub time: String, 27 #[serde(rename = "prevData", skip_serializing_if = "Option::is_none")] 28 pub prev_data: Option<Cid>, 29} 30 31#[derive(Debug, Clone, Serialize, Deserialize)] 32struct JsonRepoOp { 33 action: String, 34 path: String, 35 cid: Option<String>, 36 prev: Option<String>, 37} 38 39#[derive(Debug, Serialize, Deserialize)] 40pub struct RepoOp { 41 pub action: String, 42 pub path: String, 43 pub cid: Option<Cid>, 44 #[serde(skip_serializing_if = "Option::is_none")] 45 pub prev: Option<Cid>, 46} 47 48#[derive(Debug, Serialize, Deserialize)] 49pub struct IdentityFrame { 50 pub did: String, 51 #[serde(skip_serializing_if = "Option::is_none")] 52 pub handle: Option<String>, 53 pub seq: i64, 54 pub time: String, 55} 56 57#[derive(Debug, Serialize, Deserialize)] 58pub struct AccountFrame { 59 pub did: String, 60 pub active: bool, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub status: Option<String>, 63 pub seq: i64, 64 pub time: String, 65} 66 67#[derive(Debug, Serialize, Deserialize)] 68pub struct SyncFrame { 69 pub did: String, 70 pub rev: String, 71 #[serde(with = "serde_bytes")] 72 pub blocks: Vec<u8>, 73 pub seq: i64, 74 pub time: String, 75} 76 77#[derive(Debug, Serialize, Deserialize)] 78pub struct InfoFrame { 79 pub name: String, 80 #[serde(skip_serializing_if = "Option::is_none")] 81 pub message: Option<String>, 82} 83 84#[derive(Debug, Serialize, Deserialize)] 85pub struct ErrorFrameHeader { 86 pub op: i64, 87} 88 89#[derive(Debug, Serialize, Deserialize)] 90pub struct ErrorFrameBody { 91 pub error: String, 92 #[serde(skip_serializing_if = "Option::is_none")] 93 pub message: Option<String>, 94} 95 96#[derive(Debug, Clone)] 97pub enum CommitFrameError { 98 InvalidCommitCid(String), 99 InvalidBlobCid(String), 100} 101 102impl std::fmt::Display for CommitFrameError { 103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 104 match self { 105 Self::InvalidCommitCid(s) => write!(f, "Invalid commit CID: {}", s), 106 Self::InvalidBlobCid(s) => write!(f, "Invalid blob CID: {}", s), 107 } 108 } 109} 110 111impl std::error::Error for CommitFrameError {} 112 113pub struct CommitFrameBuilder { 114 seq: i64, 115 did: String, 116 commit_cid: Cid, 117 prev_cid: Option<Cid>, 118 ops_json: serde_json::Value, 119 blob_cids: Vec<Cid>, 120 time: chrono::DateTime<chrono::Utc>, 121 rev: Option<String>, 122} 123 124impl CommitFrameBuilder { 125 pub fn new( 126 seq: i64, 127 did: String, 128 commit_cid_str: &str, 129 prev_cid_str: Option<&str>, 130 ops_json: serde_json::Value, 131 blob_strs: Vec<String>, 132 time: chrono::DateTime<chrono::Utc>, 133 rev: Option<String>, 134 ) -> Result<Self, CommitFrameError> { 135 let commit_cid = Cid::from_str(commit_cid_str) 136 .map_err(|_| CommitFrameError::InvalidCommitCid(commit_cid_str.to_string()))?; 137 let prev_cid = prev_cid_str 138 .map(|s| Cid::from_str(s)) 139 .transpose() 140 .map_err(|_| CommitFrameError::InvalidCommitCid(prev_cid_str.unwrap_or("").to_string()))?; 141 let blob_cids: Vec<Cid> = blob_strs 142 .iter() 143 .filter_map(|s| Cid::from_str(s).ok()) 144 .collect(); 145 Ok(Self { 146 seq, 147 did, 148 commit_cid, 149 prev_cid, 150 ops_json, 151 blob_cids, 152 time, 153 rev, 154 }) 155 } 156 157 pub fn build(self) -> CommitFrame { 158 let json_ops: Vec<JsonRepoOp> = 159 serde_json::from_value(self.ops_json).unwrap_or_else(|_| vec![]); 160 let ops: Vec<RepoOp> = json_ops 161 .into_iter() 162 .map(|op| RepoOp { 163 action: op.action, 164 path: op.path, 165 cid: op.cid.and_then(|s| Cid::from_str(&s).ok()), 166 prev: op.prev.and_then(|s| Cid::from_str(&s).ok()), 167 }) 168 .collect(); 169 let rev = self.rev.unwrap_or_else(placeholder_rev); 170 let since = self.prev_cid.as_ref().map(|_| rev.clone()); 171 CommitFrame { 172 seq: self.seq, 173 rebase: false, 174 too_big: false, 175 repo: self.did, 176 commit: self.commit_cid, 177 rev, 178 since, 179 blocks: Vec::new(), 180 ops, 181 blobs: self.blob_cids, 182 time: format_atproto_time(self.time), 183 prev_data: None, 184 } 185 } 186} 187 188fn placeholder_rev() -> String { 189 use jacquard::types::{integer::LimitedU32, string::Tid}; 190 Tid::now(LimitedU32::MIN).to_string() 191} 192 193fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 194 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 195} 196 197impl TryFrom<SequencedEvent> for CommitFrame { 198 type Error = CommitFrameError; 199 200 fn try_from(event: SequencedEvent) -> Result<Self, Self::Error> { 201 let commit_cid_str = event.commit_cid.ok_or_else(|| { 202 CommitFrameError::InvalidCommitCid("Missing commit_cid in event".to_string()) 203 })?; 204 let builder = CommitFrameBuilder::new( 205 event.seq, 206 event.did, 207 &commit_cid_str, 208 event.prev_cid.as_deref(), 209 event.ops.unwrap_or_default(), 210 event.blobs.unwrap_or_default(), 211 event.created_at, 212 event.rev, 213 )?; 214 Ok(builder.build()) 215 } 216}