this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{CommitFrame, Frame, FrameData}; 4use bytes::Bytes; 5use cid::Cid; 6use jacquard_repo::car::write_car_bytes; 7use jacquard_repo::storage::BlockStore; 8use std::collections::{BTreeMap, HashMap}; 9use std::str::FromStr; 10 11pub async fn format_event_for_sending( 12 state: &AppState, 13 event: SequencedEvent, 14) -> Result<Vec<u8>, anyhow::Error> { 15 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 16 let mut frame: CommitFrame = event.try_into() 17 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 18 19 let car_bytes = if !block_cids_str.is_empty() { 20 let cids: Vec<Cid> = block_cids_str 21 .iter() 22 .filter_map(|s| Cid::from_str(s).ok()) 23 .collect(); 24 25 let fetched = state.block_store.get_many(&cids).await?; 26 27 let mut blocks = std::collections::BTreeMap::new(); 28 for (cid, data_opt) in cids.into_iter().zip(fetched.into_iter()) { 29 if let Some(data) = data_opt { 30 blocks.insert(cid, data); 31 } 32 } 33 34 let root = Cid::from_str(&frame.commit)?; 35 write_car_bytes(root, blocks).await? 36 } else { 37 Vec::new() 38 }; 39 frame.blocks = car_bytes; 40 41 let frame = Frame { 42 op: 1, 43 data: FrameData::Commit(Box::new(frame)), 44 }; 45 46 let mut bytes = Vec::new(); 47 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 48 Ok(bytes) 49} 50 51pub async fn prefetch_blocks_for_events( 52 state: &AppState, 53 events: &[SequencedEvent], 54) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 55 let mut all_cids: Vec<Cid> = Vec::new(); 56 57 for event in events { 58 if let Some(ref block_cids_str) = event.blocks_cids { 59 for s in block_cids_str { 60 if let Ok(cid) = Cid::from_str(s) { 61 all_cids.push(cid); 62 } 63 } 64 } 65 } 66 67 all_cids.sort(); 68 all_cids.dedup(); 69 70 if all_cids.is_empty() { 71 return Ok(HashMap::new()); 72 } 73 74 let fetched = state.block_store.get_many(&all_cids).await?; 75 76 let mut blocks_map = HashMap::new(); 77 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 78 if let Some(data) = data_opt { 79 blocks_map.insert(cid, data); 80 } 81 } 82 83 Ok(blocks_map) 84} 85 86pub async fn format_event_with_prefetched_blocks( 87 event: SequencedEvent, 88 prefetched: &HashMap<Cid, Bytes>, 89) -> Result<Vec<u8>, anyhow::Error> { 90 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 91 let mut frame: CommitFrame = event.try_into() 92 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 93 94 let car_bytes = if !block_cids_str.is_empty() { 95 let cids: Vec<Cid> = block_cids_str 96 .iter() 97 .filter_map(|s| Cid::from_str(s).ok()) 98 .collect(); 99 100 let mut blocks = BTreeMap::new(); 101 for cid in cids { 102 if let Some(data) = prefetched.get(&cid) { 103 blocks.insert(cid, data.clone()); 104 } 105 } 106 107 let root = Cid::from_str(&frame.commit)?; 108 write_car_bytes(root, blocks).await? 109 } else { 110 Vec::new() 111 }; 112 frame.blocks = car_bytes; 113 114 let frame = Frame { 115 op: 1, 116 data: FrameData::Commit(Box::new(frame)), 117 }; 118 119 let mut bytes = Vec::new(); 120 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 121 Ok(bytes) 122}