this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{CommitFrame, Frame, FrameData};
4use bytes::Bytes;
5use cid::Cid;
6use jacquard_repo::car::write_car_bytes;
7use jacquard_repo::storage::BlockStore;
8use std::collections::{BTreeMap, HashMap};
9use std::str::FromStr;
10
11pub async fn format_event_for_sending(
12 state: &AppState,
13 event: SequencedEvent,
14) -> Result<Vec<u8>, anyhow::Error> {
15 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
16 let mut frame: CommitFrame = event.try_into()
17 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
18
19 let car_bytes = if !block_cids_str.is_empty() {
20 let cids: Vec<Cid> = block_cids_str
21 .iter()
22 .filter_map(|s| Cid::from_str(s).ok())
23 .collect();
24
25 let fetched = state.block_store.get_many(&cids).await?;
26
27 let mut blocks = std::collections::BTreeMap::new();
28 for (cid, data_opt) in cids.into_iter().zip(fetched.into_iter()) {
29 if let Some(data) = data_opt {
30 blocks.insert(cid, data);
31 }
32 }
33
34 let root = Cid::from_str(&frame.commit)?;
35 write_car_bytes(root, blocks).await?
36 } else {
37 Vec::new()
38 };
39 frame.blocks = car_bytes;
40
41 let frame = Frame {
42 op: 1,
43 data: FrameData::Commit(Box::new(frame)),
44 };
45
46 let mut bytes = Vec::new();
47 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
48 Ok(bytes)
49}
50
51pub async fn prefetch_blocks_for_events(
52 state: &AppState,
53 events: &[SequencedEvent],
54) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
55 let mut all_cids: Vec<Cid> = Vec::new();
56
57 for event in events {
58 if let Some(ref block_cids_str) = event.blocks_cids {
59 for s in block_cids_str {
60 if let Ok(cid) = Cid::from_str(s) {
61 all_cids.push(cid);
62 }
63 }
64 }
65 }
66
67 all_cids.sort();
68 all_cids.dedup();
69
70 if all_cids.is_empty() {
71 return Ok(HashMap::new());
72 }
73
74 let fetched = state.block_store.get_many(&all_cids).await?;
75
76 let mut blocks_map = HashMap::new();
77 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
78 if let Some(data) = data_opt {
79 blocks_map.insert(cid, data);
80 }
81 }
82
83 Ok(blocks_map)
84}
85
86pub async fn format_event_with_prefetched_blocks(
87 event: SequencedEvent,
88 prefetched: &HashMap<Cid, Bytes>,
89) -> Result<Vec<u8>, anyhow::Error> {
90 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
91 let mut frame: CommitFrame = event.try_into()
92 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
93
94 let car_bytes = if !block_cids_str.is_empty() {
95 let cids: Vec<Cid> = block_cids_str
96 .iter()
97 .filter_map(|s| Cid::from_str(s).ok())
98 .collect();
99
100 let mut blocks = BTreeMap::new();
101 for cid in cids {
102 if let Some(data) = prefetched.get(&cid) {
103 blocks.insert(cid, data.clone());
104 }
105 }
106
107 let root = Cid::from_str(&frame.commit)?;
108 write_car_bytes(root, blocks).await?
109 } else {
110 Vec::new()
111 };
112 frame.blocks = car_bytes;
113
114 let frame = Frame {
115 op: 1,
116 data: FrameData::Commit(Box::new(frame)),
117 };
118
119 let mut bytes = Vec::new();
120 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
121 Ok(bytes)
122}