this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{CommitFrame, Frame, FrameData};
4use cid::Cid;
5use jacquard_repo::car::write_car;
6use jacquard_repo::storage::BlockStore;
7use std::fs;
8use std::str::FromStr;
9use tokio::fs::File;
10use tokio::io::AsyncReadExt;
11use uuid::Uuid;
12
13pub async fn format_event_for_sending(
14 state: &AppState,
15 event: SequencedEvent,
16) -> Result<Vec<u8>, anyhow::Error> {
17 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
18 let mut frame: CommitFrame = event.into();
19
20 let mut car_bytes = Vec::new();
21 if !block_cids_str.is_empty() {
22 let temp_path = format!("/tmp/{}.car", Uuid::new_v4());
23 let mut blocks = std::collections::BTreeMap::new();
24
25 for cid_str in block_cids_str {
26 let cid = Cid::from_str(&cid_str)?;
27 let data = state
28 .block_store
29 .get(&cid)
30 .await?
31 .ok_or_else(|| anyhow::anyhow!("Block not found: {}", cid))?;
32 blocks.insert(cid, data);
33 }
34
35 let root = Cid::from_str(&frame.commit)?;
36 write_car(&temp_path, vec![root], blocks).await?;
37
38 let mut file = File::open(&temp_path).await?;
39 file.read_to_end(&mut car_bytes).await?;
40 fs::remove_file(&temp_path)?;
41 }
42 frame.blocks = car_bytes;
43
44 let frame = Frame {
45 op: 1,
46 data: FrameData::Commit(Box::new(frame)),
47 };
48
49 let mut bytes = Vec::new();
50 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
51 Ok(bytes)
52}