this repo has no description
1use crate::sync::firehose::SequencedEvent;
2use cid::Cid;
3use serde::{Deserialize, Serialize};
4use std::str::FromStr;
5
6#[derive(Debug, Serialize, Deserialize)]
7pub struct FrameHeader {
8 pub op: i64,
9 pub t: String,
10}
11
12#[derive(Debug, Serialize, Deserialize)]
13pub struct CommitFrame {
14 pub seq: i64,
15 pub rebase: bool,
16 #[serde(rename = "tooBig")]
17 pub too_big: bool,
18 pub repo: String,
19 pub commit: Cid,
20 pub rev: String,
21 pub since: Option<String>,
22 #[serde(with = "serde_bytes")]
23 pub blocks: Vec<u8>,
24 pub ops: Vec<RepoOp>,
25 pub blobs: Vec<Cid>,
26 pub time: String,
27 #[serde(rename = "prevData", skip_serializing_if = "Option::is_none")]
28 pub prev_data: Option<Cid>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32struct JsonRepoOp {
33 action: String,
34 path: String,
35 cid: Option<String>,
36 prev: Option<String>,
37}
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct RepoOp {
41 pub action: String,
42 pub path: String,
43 pub cid: Option<Cid>,
44 #[serde(skip_serializing_if = "Option::is_none")]
45 pub prev: Option<Cid>,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
49pub struct IdentityFrame {
50 pub did: String,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub handle: Option<String>,
53 pub seq: i64,
54 pub time: String,
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58pub struct AccountFrame {
59 pub did: String,
60 pub active: bool,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub status: Option<String>,
63 pub seq: i64,
64 pub time: String,
65}
66
67#[derive(Debug, Serialize, Deserialize)]
68pub struct SyncFrame {
69 pub did: String,
70 pub rev: String,
71 #[serde(with = "serde_bytes")]
72 pub blocks: Vec<u8>,
73 pub seq: i64,
74 pub time: String,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct InfoFrame {
79 pub name: String,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub message: Option<String>,
82}
83
84#[derive(Debug, Serialize, Deserialize)]
85pub struct ErrorFrameHeader {
86 pub op: i64,
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub struct ErrorFrameBody {
91 pub error: String,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub message: Option<String>,
94}
95
96pub struct CommitFrameBuilder {
97 pub seq: i64,
98 pub did: String,
99 pub commit_cid_str: String,
100 pub prev_cid_str: Option<String>,
101 pub ops_json: serde_json::Value,
102 pub blobs: Vec<String>,
103 pub time: chrono::DateTime<chrono::Utc>,
104 pub rev: Option<String>,
105}
106
107impl CommitFrameBuilder {
108 pub fn build(self) -> Result<CommitFrame, &'static str> {
109 let commit_cid = Cid::from_str(&self.commit_cid_str).map_err(|_| "Invalid commit CID")?;
110 let json_ops: Vec<JsonRepoOp> =
111 serde_json::from_value(self.ops_json).unwrap_or_else(|_| vec![]);
112 let ops: Vec<RepoOp> = json_ops
113 .into_iter()
114 .map(|op| RepoOp {
115 action: op.action,
116 path: op.path,
117 cid: op.cid.and_then(|s| Cid::from_str(&s).ok()),
118 prev: op.prev.and_then(|s| Cid::from_str(&s).ok()),
119 })
120 .collect();
121 let blobs: Vec<Cid> = self
122 .blobs
123 .iter()
124 .filter_map(|s| Cid::from_str(s).ok())
125 .collect();
126 let rev = self.rev.unwrap_or_else(placeholder_rev);
127 let since = self.prev_cid_str.as_ref().map(|_| rev.clone());
128 Ok(CommitFrame {
129 seq: self.seq,
130 rebase: false,
131 too_big: false,
132 repo: self.did,
133 commit: commit_cid,
134 rev,
135 since,
136 blocks: Vec::new(),
137 ops,
138 blobs,
139 time: format_atproto_time(self.time),
140 prev_data: None,
141 })
142 }
143}
144
145fn placeholder_rev() -> String {
146 use jacquard::types::{integer::LimitedU32, string::Tid};
147 Tid::now(LimitedU32::MIN).to_string()
148}
149
150fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
151 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
152}
153
154impl TryFrom<SequencedEvent> for CommitFrame {
155 type Error = &'static str;
156
157 fn try_from(event: SequencedEvent) -> Result<Self, Self::Error> {
158 let builder = CommitFrameBuilder {
159 seq: event.seq,
160 did: event.did,
161 commit_cid_str: event.commit_cid.ok_or("Missing commit_cid in event")?,
162 prev_cid_str: event.prev_cid,
163 ops_json: event.ops.unwrap_or_default(),
164 blobs: event.blobs.unwrap_or_default(),
165 time: event.created_at,
166 rev: event.rev,
167 };
168 builder.build()
169 }
170}