this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame};
4use bytes::Bytes;
5use cid::Cid;
6use iroh_car::{CarHeader, CarWriter};
7use jacquard_repo::commit::Commit;
8use jacquard_repo::storage::BlockStore;
9use std::collections::{BTreeMap, HashMap};
10use std::io::Cursor;
11use std::str::FromStr;
12use tokio::io::AsyncWriteExt;
13
14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
15 Commit::from_cbor(commit_bytes)
16 .ok()
17 .map(|c| c.rev().to_string())
18}
19
20async fn write_car_blocks(
21 commit_cid: Cid,
22 commit_bytes: Option<Bytes>,
23 other_blocks: BTreeMap<Cid, Bytes>,
24) -> Result<Vec<u8>, anyhow::Error> {
25 let mut buffer = Cursor::new(Vec::new());
26 let header = CarHeader::new_v1(vec![commit_cid]);
27 let mut writer = CarWriter::new(header, &mut buffer);
28 for (cid, data) in other_blocks {
29 if cid != commit_cid {
30 writer
31 .write(cid, data.as_ref())
32 .await
33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
34 }
35 }
36 if let Some(data) = commit_bytes {
37 writer
38 .write(commit_cid, data.as_ref())
39 .await
40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
41 }
42 writer
43 .finish()
44 .await
45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
46 buffer
47 .flush()
48 .await
49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
50 Ok(buffer.into_inner())
51}
52
53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
55}
56
57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
58 let frame = IdentityFrame {
59 did: event.did.clone(),
60 handle: event.handle.clone(),
61 seq: event.seq,
62 time: format_atproto_time(event.created_at),
63 };
64 let header = FrameHeader {
65 op: 1,
66 t: "#identity".to_string(),
67 };
68 let mut bytes = Vec::new();
69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
71 Ok(bytes)
72}
73
74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
75 let frame = AccountFrame {
76 did: event.did.clone(),
77 active: event.active.unwrap_or(true),
78 status: event.status.clone(),
79 seq: event.seq,
80 time: format_atproto_time(event.created_at),
81 };
82 let header = FrameHeader {
83 op: 1,
84 t: "#account".to_string(),
85 };
86 let mut bytes = Vec::new();
87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
89 Ok(bytes)
90}
91
92async fn format_sync_event(
93 state: &AppState,
94 event: &SequencedEvent,
95) -> Result<Vec<u8>, anyhow::Error> {
96 let commit_cid_str = event
97 .commit_cid
98 .as_ref()
99 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
100 let commit_cid = Cid::from_str(commit_cid_str)?;
101 let commit_bytes = state
102 .block_store
103 .get(&commit_cid)
104 .await?
105 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
106 let rev = extract_rev_from_commit_bytes(&commit_bytes)
107 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
108 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
109 let frame = SyncFrame {
110 did: event.did.clone(),
111 rev,
112 blocks: car_bytes,
113 seq: event.seq,
114 time: format_atproto_time(event.created_at),
115 };
116 let header = FrameHeader {
117 op: 1,
118 t: "#sync".to_string(),
119 };
120 let mut bytes = Vec::new();
121 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
122 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
123 Ok(bytes)
124}
125
126pub async fn format_event_for_sending(
127 state: &AppState,
128 event: SequencedEvent,
129) -> Result<Vec<u8>, anyhow::Error> {
130 match event.event_type.as_str() {
131 "identity" => return format_identity_event(&event),
132 "account" => return format_account_event(&event),
133 "sync" => return format_sync_event(state, &event).await,
134 _ => {}
135 }
136 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
137 let prev_cid_str = event.prev_cid.clone();
138 let prev_data_cid_str = event.prev_data_cid.clone();
139 let mut frame: CommitFrame = event
140 .try_into()
141 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
142 if let Some(ref pdc) = prev_data_cid_str
143 && let Ok(cid) = Cid::from_str(pdc) {
144 frame.prev_data = Some(cid);
145 }
146 let commit_cid = frame.commit;
147 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
148 let mut all_cids: Vec<Cid> = block_cids_str
149 .iter()
150 .filter_map(|s| Cid::from_str(s).ok())
151 .filter(|c| Some(*c) != prev_cid)
152 .collect();
153 if !all_cids.contains(&commit_cid) {
154 all_cids.push(commit_cid);
155 }
156 if let Some(ref pc) = prev_cid
157 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
158 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) {
159 frame.since = Some(rev);
160 }
161 let car_bytes = if !all_cids.is_empty() {
162 let fetched = state.block_store.get_many(&all_cids).await?;
163 let mut blocks = std::collections::BTreeMap::new();
164 let mut commit_bytes: Option<Bytes> = None;
165 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
166 if let Some(data) = data_opt {
167 if *cid == commit_cid {
168 commit_bytes = Some(data.clone());
169 if let Some(rev) = extract_rev_from_commit_bytes(data) {
170 frame.rev = rev;
171 }
172 } else {
173 blocks.insert(*cid, data.clone());
174 }
175 }
176 }
177 write_car_blocks(commit_cid, commit_bytes, blocks).await?
178 } else {
179 Vec::new()
180 };
181 frame.blocks = car_bytes;
182 let header = FrameHeader {
183 op: 1,
184 t: "#commit".to_string(),
185 };
186 let mut bytes = Vec::new();
187 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
188 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
189 Ok(bytes)
190}
191
192pub async fn prefetch_blocks_for_events(
193 state: &AppState,
194 events: &[SequencedEvent],
195) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
196 let mut all_cids: Vec<Cid> = Vec::new();
197 for event in events {
198 if let Some(ref commit_cid_str) = event.commit_cid
199 && let Ok(cid) = Cid::from_str(commit_cid_str) {
200 all_cids.push(cid);
201 }
202 if let Some(ref prev_cid_str) = event.prev_cid
203 && let Ok(cid) = Cid::from_str(prev_cid_str) {
204 all_cids.push(cid);
205 }
206 if let Some(ref block_cids_str) = event.blocks_cids {
207 for s in block_cids_str {
208 if let Ok(cid) = Cid::from_str(s) {
209 all_cids.push(cid);
210 }
211 }
212 }
213 }
214 all_cids.sort();
215 all_cids.dedup();
216 if all_cids.is_empty() {
217 return Ok(HashMap::new());
218 }
219 let fetched = state.block_store.get_many(&all_cids).await?;
220 let mut blocks_map = HashMap::new();
221 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
222 if let Some(data) = data_opt {
223 blocks_map.insert(cid, data);
224 }
225 }
226 Ok(blocks_map)
227}
228
229fn format_sync_event_with_prefetched(
230 event: &SequencedEvent,
231 prefetched: &HashMap<Cid, Bytes>,
232) -> Result<Vec<u8>, anyhow::Error> {
233 let commit_cid_str = event
234 .commit_cid
235 .as_ref()
236 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
237 let commit_cid = Cid::from_str(commit_cid_str)?;
238 let commit_bytes = prefetched
239 .get(&commit_cid)
240 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
241 let rev = extract_rev_from_commit_bytes(commit_bytes)
242 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
243 let car_bytes = futures::executor::block_on(write_car_blocks(
244 commit_cid,
245 Some(commit_bytes.clone()),
246 BTreeMap::new(),
247 ))?;
248 let frame = SyncFrame {
249 did: event.did.clone(),
250 rev,
251 blocks: car_bytes,
252 seq: event.seq,
253 time: format_atproto_time(event.created_at),
254 };
255 let header = FrameHeader {
256 op: 1,
257 t: "#sync".to_string(),
258 };
259 let mut bytes = Vec::new();
260 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
261 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
262 Ok(bytes)
263}
264
265pub async fn format_event_with_prefetched_blocks(
266 event: SequencedEvent,
267 prefetched: &HashMap<Cid, Bytes>,
268) -> Result<Vec<u8>, anyhow::Error> {
269 match event.event_type.as_str() {
270 "identity" => return format_identity_event(&event),
271 "account" => return format_account_event(&event),
272 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
273 _ => {}
274 }
275 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
276 let prev_cid_str = event.prev_cid.clone();
277 let prev_data_cid_str = event.prev_data_cid.clone();
278 let mut frame: CommitFrame = event
279 .try_into()
280 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
281 if let Some(ref pdc) = prev_data_cid_str
282 && let Ok(cid) = Cid::from_str(pdc) {
283 frame.prev_data = Some(cid);
284 }
285 let commit_cid = frame.commit;
286 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
287 let mut all_cids: Vec<Cid> = block_cids_str
288 .iter()
289 .filter_map(|s| Cid::from_str(s).ok())
290 .filter(|c| Some(*c) != prev_cid)
291 .collect();
292 if !all_cids.contains(&commit_cid) {
293 all_cids.push(commit_cid);
294 }
295 if let Some(commit_bytes) = prefetched.get(&commit_cid)
296 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) {
297 frame.rev = rev;
298 }
299 if let Some(ref pc) = prev_cid
300 && let Some(prev_bytes) = prefetched.get(pc)
301 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) {
302 frame.since = Some(rev);
303 }
304 let car_bytes = if !all_cids.is_empty() {
305 let mut blocks = BTreeMap::new();
306 let mut commit_bytes_for_car: Option<Bytes> = None;
307 for cid in all_cids {
308 if let Some(data) = prefetched.get(&cid) {
309 if cid == commit_cid {
310 commit_bytes_for_car = Some(data.clone());
311 } else {
312 blocks.insert(cid, data.clone());
313 }
314 }
315 }
316 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
317 } else {
318 Vec::new()
319 };
320 frame.blocks = car_bytes;
321 let header = FrameHeader {
322 op: 1,
323 t: "#commit".to_string(),
324 };
325 let mut bytes = Vec::new();
326 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
327 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
328 Ok(bytes)
329}