this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 4use bytes::Bytes; 5use cid::Cid; 6use iroh_car::{CarHeader, CarWriter}; 7use jacquard_repo::commit::Commit; 8use jacquard_repo::storage::BlockStore; 9use std::collections::{BTreeMap, HashMap}; 10use std::io::Cursor; 11use std::str::FromStr; 12use tokio::io::AsyncWriteExt; 13 14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 15 Commit::from_cbor(commit_bytes) 16 .ok() 17 .map(|c| c.rev().to_string()) 18} 19 20async fn write_car_blocks( 21 commit_cid: Cid, 22 commit_bytes: Option<Bytes>, 23 other_blocks: BTreeMap<Cid, Bytes>, 24) -> Result<Vec<u8>, anyhow::Error> { 25 let mut buffer = Cursor::new(Vec::new()); 26 let header = CarHeader::new_v1(vec![commit_cid]); 27 let mut writer = CarWriter::new(header, &mut buffer); 28 for (cid, data) in other_blocks { 29 if cid != commit_cid { 30 writer 31 .write(cid, data.as_ref()) 32 .await 33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 34 } 35 } 36 if let Some(data) = commit_bytes { 37 writer 38 .write(commit_cid, data.as_ref()) 39 .await 40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 41 } 42 writer 43 .finish() 44 .await 45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 46 buffer 47 .flush() 48 .await 49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 50 Ok(buffer.into_inner()) 51} 52 53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 55} 56 57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 58 let frame = IdentityFrame { 59 did: event.did.clone(), 60 handle: event.handle.clone(), 61 seq: event.seq, 62 time: format_atproto_time(event.created_at), 63 }; 64 let header = FrameHeader { 65 op: 1, 66 t: "#identity".to_string(), 67 }; 68 let mut bytes = Vec::new(); 69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 71 Ok(bytes) 72} 73 74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 75 let frame = AccountFrame { 76 did: event.did.clone(), 77 active: event.active.unwrap_or(true), 78 status: event.status.clone(), 79 seq: event.seq, 80 time: format_atproto_time(event.created_at), 81 }; 82 let header = FrameHeader { 83 op: 1, 84 t: "#account".to_string(), 85 }; 86 let mut bytes = Vec::new(); 87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 89 Ok(bytes) 90} 91 92async fn format_sync_event( 93 state: &AppState, 94 event: &SequencedEvent, 95) -> Result<Vec<u8>, anyhow::Error> { 96 let commit_cid_str = event 97 .commit_cid 98 .as_ref() 99 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 100 let commit_cid = Cid::from_str(commit_cid_str)?; 101 let commit_bytes = state 102 .block_store 103 .get(&commit_cid) 104 .await? 105 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 106 let rev = extract_rev_from_commit_bytes(&commit_bytes) 107 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 108 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 109 let frame = SyncFrame { 110 did: event.did.clone(), 111 rev, 112 blocks: car_bytes, 113 seq: event.seq, 114 time: format_atproto_time(event.created_at), 115 }; 116 let header = FrameHeader { 117 op: 1, 118 t: "#sync".to_string(), 119 }; 120 let mut bytes = Vec::new(); 121 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 122 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 123 Ok(bytes) 124} 125 126pub async fn format_event_for_sending( 127 state: &AppState, 128 event: SequencedEvent, 129) -> Result<Vec<u8>, anyhow::Error> { 130 match event.event_type.as_str() { 131 "identity" => return format_identity_event(&event), 132 "account" => return format_account_event(&event), 133 "sync" => return format_sync_event(state, &event).await, 134 _ => {} 135 } 136 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 137 let prev_cid_str = event.prev_cid.clone(); 138 let prev_data_cid_str = event.prev_data_cid.clone(); 139 let mut frame: CommitFrame = event 140 .try_into() 141 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 142 if let Some(ref pdc) = prev_data_cid_str 143 && let Ok(cid) = Cid::from_str(pdc) { 144 frame.prev_data = Some(cid); 145 } 146 let commit_cid = frame.commit; 147 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 148 let mut all_cids: Vec<Cid> = block_cids_str 149 .iter() 150 .filter_map(|s| Cid::from_str(s).ok()) 151 .filter(|c| Some(*c) != prev_cid) 152 .collect(); 153 if !all_cids.contains(&commit_cid) { 154 all_cids.push(commit_cid); 155 } 156 if let Some(ref pc) = prev_cid 157 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 158 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) { 159 frame.since = Some(rev); 160 } 161 let car_bytes = if !all_cids.is_empty() { 162 let fetched = state.block_store.get_many(&all_cids).await?; 163 let mut blocks = std::collections::BTreeMap::new(); 164 let mut commit_bytes: Option<Bytes> = None; 165 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 166 if let Some(data) = data_opt { 167 if *cid == commit_cid { 168 commit_bytes = Some(data.clone()); 169 if let Some(rev) = extract_rev_from_commit_bytes(data) { 170 frame.rev = rev; 171 } 172 } else { 173 blocks.insert(*cid, data.clone()); 174 } 175 } 176 } 177 write_car_blocks(commit_cid, commit_bytes, blocks).await? 178 } else { 179 Vec::new() 180 }; 181 frame.blocks = car_bytes; 182 let header = FrameHeader { 183 op: 1, 184 t: "#commit".to_string(), 185 }; 186 let mut bytes = Vec::new(); 187 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 188 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 189 Ok(bytes) 190} 191 192pub async fn prefetch_blocks_for_events( 193 state: &AppState, 194 events: &[SequencedEvent], 195) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 196 let mut all_cids: Vec<Cid> = Vec::new(); 197 for event in events { 198 if let Some(ref commit_cid_str) = event.commit_cid 199 && let Ok(cid) = Cid::from_str(commit_cid_str) { 200 all_cids.push(cid); 201 } 202 if let Some(ref prev_cid_str) = event.prev_cid 203 && let Ok(cid) = Cid::from_str(prev_cid_str) { 204 all_cids.push(cid); 205 } 206 if let Some(ref block_cids_str) = event.blocks_cids { 207 for s in block_cids_str { 208 if let Ok(cid) = Cid::from_str(s) { 209 all_cids.push(cid); 210 } 211 } 212 } 213 } 214 all_cids.sort(); 215 all_cids.dedup(); 216 if all_cids.is_empty() { 217 return Ok(HashMap::new()); 218 } 219 let fetched = state.block_store.get_many(&all_cids).await?; 220 let mut blocks_map = HashMap::new(); 221 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 222 if let Some(data) = data_opt { 223 blocks_map.insert(cid, data); 224 } 225 } 226 Ok(blocks_map) 227} 228 229fn format_sync_event_with_prefetched( 230 event: &SequencedEvent, 231 prefetched: &HashMap<Cid, Bytes>, 232) -> Result<Vec<u8>, anyhow::Error> { 233 let commit_cid_str = event 234 .commit_cid 235 .as_ref() 236 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 237 let commit_cid = Cid::from_str(commit_cid_str)?; 238 let commit_bytes = prefetched 239 .get(&commit_cid) 240 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 241 let rev = extract_rev_from_commit_bytes(commit_bytes) 242 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 243 let car_bytes = futures::executor::block_on(write_car_blocks( 244 commit_cid, 245 Some(commit_bytes.clone()), 246 BTreeMap::new(), 247 ))?; 248 let frame = SyncFrame { 249 did: event.did.clone(), 250 rev, 251 blocks: car_bytes, 252 seq: event.seq, 253 time: format_atproto_time(event.created_at), 254 }; 255 let header = FrameHeader { 256 op: 1, 257 t: "#sync".to_string(), 258 }; 259 let mut bytes = Vec::new(); 260 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 261 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 262 Ok(bytes) 263} 264 265pub async fn format_event_with_prefetched_blocks( 266 event: SequencedEvent, 267 prefetched: &HashMap<Cid, Bytes>, 268) -> Result<Vec<u8>, anyhow::Error> { 269 match event.event_type.as_str() { 270 "identity" => return format_identity_event(&event), 271 "account" => return format_account_event(&event), 272 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 273 _ => {} 274 } 275 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 276 let prev_cid_str = event.prev_cid.clone(); 277 let prev_data_cid_str = event.prev_data_cid.clone(); 278 let mut frame: CommitFrame = event 279 .try_into() 280 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 281 if let Some(ref pdc) = prev_data_cid_str 282 && let Ok(cid) = Cid::from_str(pdc) { 283 frame.prev_data = Some(cid); 284 } 285 let commit_cid = frame.commit; 286 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 287 let mut all_cids: Vec<Cid> = block_cids_str 288 .iter() 289 .filter_map(|s| Cid::from_str(s).ok()) 290 .filter(|c| Some(*c) != prev_cid) 291 .collect(); 292 if !all_cids.contains(&commit_cid) { 293 all_cids.push(commit_cid); 294 } 295 if let Some(commit_bytes) = prefetched.get(&commit_cid) 296 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) { 297 frame.rev = rev; 298 } 299 if let Some(ref pc) = prev_cid 300 && let Some(prev_bytes) = prefetched.get(pc) 301 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) { 302 frame.since = Some(rev); 303 } 304 let car_bytes = if !all_cids.is_empty() { 305 let mut blocks = BTreeMap::new(); 306 let mut commit_bytes_for_car: Option<Bytes> = None; 307 for cid in all_cids { 308 if let Some(data) = prefetched.get(&cid) { 309 if cid == commit_cid { 310 commit_bytes_for_car = Some(data.clone()); 311 } else { 312 blocks.insert(cid, data.clone()); 313 } 314 } 315 } 316 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 317 } else { 318 Vec::new() 319 }; 320 frame.blocks = car_bytes; 321 let header = FrameHeader { 322 op: 1, 323 t: "#commit".to_string(), 324 }; 325 let mut bytes = Vec::new(); 326 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 327 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 328 Ok(bytes) 329}