this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 4use bytes::Bytes; 5use cid::Cid; 6use iroh_car::{CarHeader, CarWriter}; 7use jacquard_repo::commit::Commit; 8use jacquard_repo::storage::BlockStore; 9use std::collections::{BTreeMap, HashMap}; 10use std::io::Cursor; 11use std::str::FromStr; 12use tokio::io::AsyncWriteExt; 13 14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 15 Commit::from_cbor(commit_bytes) 16 .ok() 17 .map(|c| c.rev().to_string()) 18} 19 20async fn write_car_blocks( 21 commit_cid: Cid, 22 commit_bytes: Option<Bytes>, 23 other_blocks: BTreeMap<Cid, Bytes>, 24) -> Result<Vec<u8>, anyhow::Error> { 25 let mut buffer = Cursor::new(Vec::new()); 26 let header = CarHeader::new_v1(vec![commit_cid]); 27 let mut writer = CarWriter::new(header, &mut buffer); 28 for (cid, data) in other_blocks { 29 if cid != commit_cid { 30 writer 31 .write(cid, data.as_ref()) 32 .await 33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 34 } 35 } 36 if let Some(data) = commit_bytes { 37 writer 38 .write(commit_cid, data.as_ref()) 39 .await 40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 41 } 42 writer 43 .finish() 44 .await 45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 46 buffer 47 .flush() 48 .await 49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 50 Ok(buffer.into_inner()) 51} 52 53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 55} 56 57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 58 let frame = IdentityFrame { 59 did: event.did.clone(), 60 handle: event.handle.clone(), 61 seq: event.seq, 62 time: format_atproto_time(event.created_at), 63 }; 64 let header = FrameHeader { 65 op: 1, 66 t: "#identity".to_string(), 67 }; 68 let mut bytes = Vec::new(); 69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 71 Ok(bytes) 72} 73 74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 75 let frame = AccountFrame { 76 did: event.did.clone(), 77 active: event.active.unwrap_or(true), 78 status: event.status.clone(), 79 seq: event.seq, 80 time: format_atproto_time(event.created_at), 81 }; 82 let header = FrameHeader { 83 op: 1, 84 t: "#account".to_string(), 85 }; 86 let mut bytes = Vec::new(); 87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 89 Ok(bytes) 90} 91 92async fn format_sync_event( 93 state: &AppState, 94 event: &SequencedEvent, 95) -> Result<Vec<u8>, anyhow::Error> { 96 let commit_cid_str = event 97 .commit_cid 98 .as_ref() 99 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 100 let commit_cid = Cid::from_str(commit_cid_str)?; 101 let commit_bytes = state 102 .block_store 103 .get(&commit_cid) 104 .await? 105 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 106 let rev = extract_rev_from_commit_bytes(&commit_bytes) 107 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 108 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 109 let frame = SyncFrame { 110 did: event.did.clone(), 111 rev, 112 blocks: car_bytes, 113 seq: event.seq, 114 time: format_atproto_time(event.created_at), 115 }; 116 let header = FrameHeader { 117 op: 1, 118 t: "#sync".to_string(), 119 }; 120 let mut bytes = Vec::new(); 121 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 122 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 123 Ok(bytes) 124} 125 126pub async fn format_event_for_sending( 127 state: &AppState, 128 event: SequencedEvent, 129) -> Result<Vec<u8>, anyhow::Error> { 130 match event.event_type.as_str() { 131 "identity" => return format_identity_event(&event), 132 "account" => return format_account_event(&event), 133 "sync" => return format_sync_event(state, &event).await, 134 _ => {} 135 } 136 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 137 let prev_cid_str = event.prev_cid.clone(); 138 let prev_data_cid_str = event.prev_data_cid.clone(); 139 let mut frame: CommitFrame = event 140 .try_into() 141 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 142 if let Some(ref pdc) = prev_data_cid_str 143 && let Ok(cid) = Cid::from_str(pdc) 144 { 145 frame.prev_data = Some(cid); 146 } 147 let commit_cid = frame.commit; 148 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 149 let mut all_cids: Vec<Cid> = block_cids_str 150 .iter() 151 .filter_map(|s| Cid::from_str(s).ok()) 152 .filter(|c| Some(*c) != prev_cid) 153 .collect(); 154 if !all_cids.contains(&commit_cid) { 155 all_cids.push(commit_cid); 156 } 157 if let Some(ref pc) = prev_cid 158 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 159 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) 160 { 161 frame.since = Some(rev); 162 } 163 let car_bytes = if !all_cids.is_empty() { 164 let fetched = state.block_store.get_many(&all_cids).await?; 165 let mut blocks = std::collections::BTreeMap::new(); 166 let mut commit_bytes: Option<Bytes> = None; 167 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 168 if let Some(data) = data_opt { 169 if *cid == commit_cid { 170 commit_bytes = Some(data.clone()); 171 if let Some(rev) = extract_rev_from_commit_bytes(data) { 172 frame.rev = rev; 173 } 174 } else { 175 blocks.insert(*cid, data.clone()); 176 } 177 } 178 } 179 write_car_blocks(commit_cid, commit_bytes, blocks).await? 180 } else { 181 Vec::new() 182 }; 183 frame.blocks = car_bytes; 184 let header = FrameHeader { 185 op: 1, 186 t: "#commit".to_string(), 187 }; 188 let mut bytes = Vec::new(); 189 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 190 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 191 Ok(bytes) 192} 193 194pub async fn prefetch_blocks_for_events( 195 state: &AppState, 196 events: &[SequencedEvent], 197) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 198 let mut all_cids: Vec<Cid> = Vec::new(); 199 for event in events { 200 if let Some(ref commit_cid_str) = event.commit_cid 201 && let Ok(cid) = Cid::from_str(commit_cid_str) 202 { 203 all_cids.push(cid); 204 } 205 if let Some(ref prev_cid_str) = event.prev_cid 206 && let Ok(cid) = Cid::from_str(prev_cid_str) 207 { 208 all_cids.push(cid); 209 } 210 if let Some(ref block_cids_str) = event.blocks_cids { 211 for s in block_cids_str { 212 if let Ok(cid) = Cid::from_str(s) { 213 all_cids.push(cid); 214 } 215 } 216 } 217 } 218 all_cids.sort(); 219 all_cids.dedup(); 220 if all_cids.is_empty() { 221 return Ok(HashMap::new()); 222 } 223 let fetched = state.block_store.get_many(&all_cids).await?; 224 let mut blocks_map = HashMap::new(); 225 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 226 if let Some(data) = data_opt { 227 blocks_map.insert(cid, data); 228 } 229 } 230 Ok(blocks_map) 231} 232 233fn format_sync_event_with_prefetched( 234 event: &SequencedEvent, 235 prefetched: &HashMap<Cid, Bytes>, 236) -> Result<Vec<u8>, anyhow::Error> { 237 let commit_cid_str = event 238 .commit_cid 239 .as_ref() 240 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 241 let commit_cid = Cid::from_str(commit_cid_str)?; 242 let commit_bytes = prefetched 243 .get(&commit_cid) 244 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 245 let rev = extract_rev_from_commit_bytes(commit_bytes) 246 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 247 let car_bytes = futures::executor::block_on(write_car_blocks( 248 commit_cid, 249 Some(commit_bytes.clone()), 250 BTreeMap::new(), 251 ))?; 252 let frame = SyncFrame { 253 did: event.did.clone(), 254 rev, 255 blocks: car_bytes, 256 seq: event.seq, 257 time: format_atproto_time(event.created_at), 258 }; 259 let header = FrameHeader { 260 op: 1, 261 t: "#sync".to_string(), 262 }; 263 let mut bytes = Vec::new(); 264 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 265 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 266 Ok(bytes) 267} 268 269pub async fn format_event_with_prefetched_blocks( 270 event: SequencedEvent, 271 prefetched: &HashMap<Cid, Bytes>, 272) -> Result<Vec<u8>, anyhow::Error> { 273 match event.event_type.as_str() { 274 "identity" => return format_identity_event(&event), 275 "account" => return format_account_event(&event), 276 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 277 _ => {} 278 } 279 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 280 let prev_cid_str = event.prev_cid.clone(); 281 let prev_data_cid_str = event.prev_data_cid.clone(); 282 let mut frame: CommitFrame = event 283 .try_into() 284 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 285 if let Some(ref pdc) = prev_data_cid_str 286 && let Ok(cid) = Cid::from_str(pdc) 287 { 288 frame.prev_data = Some(cid); 289 } 290 let commit_cid = frame.commit; 291 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 292 let mut all_cids: Vec<Cid> = block_cids_str 293 .iter() 294 .filter_map(|s| Cid::from_str(s).ok()) 295 .filter(|c| Some(*c) != prev_cid) 296 .collect(); 297 if !all_cids.contains(&commit_cid) { 298 all_cids.push(commit_cid); 299 } 300 if let Some(commit_bytes) = prefetched.get(&commit_cid) 301 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) 302 { 303 frame.rev = rev; 304 } 305 if let Some(ref pc) = prev_cid 306 && let Some(prev_bytes) = prefetched.get(pc) 307 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) 308 { 309 frame.since = Some(rev); 310 } 311 let car_bytes = if !all_cids.is_empty() { 312 let mut blocks = BTreeMap::new(); 313 let mut commit_bytes_for_car: Option<Bytes> = None; 314 for cid in all_cids { 315 if let Some(data) = prefetched.get(&cid) { 316 if cid == commit_cid { 317 commit_bytes_for_car = Some(data.clone()); 318 } else { 319 blocks.insert(cid, data.clone()); 320 } 321 } 322 } 323 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 324 } else { 325 Vec::new() 326 }; 327 frame.blocks = car_bytes; 328 let header = FrameHeader { 329 op: 1, 330 t: "#commit".to_string(), 331 }; 332 let mut bytes = Vec::new(); 333 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 334 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 335 Ok(bytes) 336}