this repo has no description
1use crate::sync::firehose::SequencedEvent;
2use cid::Cid;
3use serde::{Deserialize, Serialize};
4use std::str::FromStr;
5
6#[derive(Debug, Serialize, Deserialize)]
7pub struct FrameHeader {
8 pub op: i64,
9 pub t: String,
10}
11
12#[derive(Debug, Serialize, Deserialize)]
13pub struct CommitFrame {
14 pub seq: i64,
15 pub rebase: bool,
16 #[serde(rename = "tooBig")]
17 pub too_big: bool,
18 pub repo: String,
19 pub commit: Cid,
20 pub rev: String,
21 pub since: Option<String>,
22 #[serde(with = "serde_bytes")]
23 pub blocks: Vec<u8>,
24 pub ops: Vec<RepoOp>,
25 pub blobs: Vec<Cid>,
26 pub time: String,
27 #[serde(rename = "prevData", skip_serializing_if = "Option::is_none")]
28 pub prev_data: Option<Cid>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32struct JsonRepoOp {
33 action: String,
34 path: String,
35 cid: Option<String>,
36 prev: Option<String>,
37}
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct RepoOp {
41 pub action: String,
42 pub path: String,
43 pub cid: Option<Cid>,
44 #[serde(skip_serializing_if = "Option::is_none")]
45 pub prev: Option<Cid>,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
49pub struct IdentityFrame {
50 pub did: String,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub handle: Option<String>,
53 pub seq: i64,
54 pub time: String,
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58pub struct AccountFrame {
59 pub did: String,
60 pub active: bool,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub status: Option<String>,
63 pub seq: i64,
64 pub time: String,
65}
66
67#[derive(Debug, Serialize, Deserialize)]
68pub struct SyncFrame {
69 pub did: String,
70 pub rev: String,
71 #[serde(with = "serde_bytes")]
72 pub blocks: Vec<u8>,
73 pub seq: i64,
74 pub time: String,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct InfoFrame {
79 pub name: String,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub message: Option<String>,
82}
83
84#[derive(Debug, Serialize, Deserialize)]
85pub struct ErrorFrameHeader {
86 pub op: i64,
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub struct ErrorFrameBody {
91 pub error: String,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub message: Option<String>,
94}
95
96#[derive(Debug, Clone)]
97pub enum CommitFrameError {
98 InvalidCommitCid(String),
99 InvalidBlobCid(String),
100}
101
102impl std::fmt::Display for CommitFrameError {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match self {
105 Self::InvalidCommitCid(s) => write!(f, "Invalid commit CID: {}", s),
106 Self::InvalidBlobCid(s) => write!(f, "Invalid blob CID: {}", s),
107 }
108 }
109}
110
111impl std::error::Error for CommitFrameError {}
112
113pub struct CommitFrameBuilder {
114 seq: i64,
115 did: String,
116 commit_cid: Cid,
117 prev_cid: Option<Cid>,
118 ops_json: serde_json::Value,
119 blob_cids: Vec<Cid>,
120 time: chrono::DateTime<chrono::Utc>,
121 rev: Option<String>,
122}
123
124impl CommitFrameBuilder {
125 pub fn new(
126 seq: i64,
127 did: String,
128 commit_cid_str: &str,
129 prev_cid_str: Option<&str>,
130 ops_json: serde_json::Value,
131 blob_strs: Vec<String>,
132 time: chrono::DateTime<chrono::Utc>,
133 rev: Option<String>,
134 ) -> Result<Self, CommitFrameError> {
135 let commit_cid = Cid::from_str(commit_cid_str)
136 .map_err(|_| CommitFrameError::InvalidCommitCid(commit_cid_str.to_string()))?;
137 let prev_cid = prev_cid_str
138 .map(|s| Cid::from_str(s))
139 .transpose()
140 .map_err(|_| CommitFrameError::InvalidCommitCid(prev_cid_str.unwrap_or("").to_string()))?;
141 let blob_cids: Vec<Cid> = blob_strs
142 .iter()
143 .filter_map(|s| Cid::from_str(s).ok())
144 .collect();
145 Ok(Self {
146 seq,
147 did,
148 commit_cid,
149 prev_cid,
150 ops_json,
151 blob_cids,
152 time,
153 rev,
154 })
155 }
156
157 pub fn build(self) -> CommitFrame {
158 let json_ops: Vec<JsonRepoOp> =
159 serde_json::from_value(self.ops_json).unwrap_or_else(|_| vec![]);
160 let ops: Vec<RepoOp> = json_ops
161 .into_iter()
162 .map(|op| RepoOp {
163 action: op.action,
164 path: op.path,
165 cid: op.cid.and_then(|s| Cid::from_str(&s).ok()),
166 prev: op.prev.and_then(|s| Cid::from_str(&s).ok()),
167 })
168 .collect();
169 let rev = self.rev.unwrap_or_else(placeholder_rev);
170 let since = self.prev_cid.as_ref().map(|_| rev.clone());
171 CommitFrame {
172 seq: self.seq,
173 rebase: false,
174 too_big: false,
175 repo: self.did,
176 commit: self.commit_cid,
177 rev,
178 since,
179 blocks: Vec::new(),
180 ops,
181 blobs: self.blob_cids,
182 time: format_atproto_time(self.time),
183 prev_data: None,
184 }
185 }
186}
187
188fn placeholder_rev() -> String {
189 use jacquard::types::{integer::LimitedU32, string::Tid};
190 Tid::now(LimitedU32::MIN).to_string()
191}
192
193fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
194 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
195}
196
197impl TryFrom<SequencedEvent> for CommitFrame {
198 type Error = CommitFrameError;
199
200 fn try_from(event: SequencedEvent) -> Result<Self, Self::Error> {
201 let commit_cid_str = event.commit_cid.ok_or_else(|| {
202 CommitFrameError::InvalidCommitCid("Missing commit_cid in event".to_string())
203 })?;
204 let builder = CommitFrameBuilder::new(
205 event.seq,
206 event.did,
207 &commit_cid_str,
208 event.prev_cid.as_deref(),
209 event.ops.unwrap_or_default(),
210 event.blobs.unwrap_or_default(),
211 event.created_at,
212 event.rev,
213 )?;
214 Ok(builder.build())
215 }
216}