this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 4use bytes::Bytes; 5use cid::Cid; 6use iroh_car::{CarHeader, CarWriter}; 7use jacquard_repo::commit::Commit; 8use jacquard_repo::storage::BlockStore; 9use std::collections::{BTreeMap, HashMap}; 10use std::io::Cursor; 11use std::str::FromStr; 12use tokio::io::AsyncWriteExt; 13 14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 15 Commit::from_cbor(commit_bytes) 16 .ok() 17 .map(|c| c.rev().to_string()) 18} 19 20async fn write_car_blocks( 21 commit_cid: Cid, 22 commit_bytes: Option<Bytes>, 23 other_blocks: BTreeMap<Cid, Bytes>, 24) -> Result<Vec<u8>, anyhow::Error> { 25 let mut buffer = Cursor::new(Vec::new()); 26 let header = CarHeader::new_v1(vec![commit_cid]); 27 let mut writer = CarWriter::new(header, &mut buffer); 28 for (cid, data) in other_blocks { 29 if cid != commit_cid { 30 writer 31 .write(cid, data.as_ref()) 32 .await 33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 34 } 35 } 36 if let Some(data) = commit_bytes { 37 writer 38 .write(commit_cid, data.as_ref()) 39 .await 40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 41 } 42 writer 43 .finish() 44 .await 45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 46 buffer 47 .flush() 48 .await 49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 50 Ok(buffer.into_inner()) 51} 52 53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 55} 56 57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 58 let frame = IdentityFrame { 59 did: event.did.clone(), 60 handle: event.handle.clone(), 61 seq: event.seq, 62 time: format_atproto_time(event.created_at), 63 }; 64 let header = FrameHeader { 65 op: 1, 66 t: "#identity".to_string(), 67 }; 68 let mut bytes = Vec::new(); 69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 71 Ok(bytes) 72} 73 74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 75 let frame = AccountFrame { 76 did: event.did.clone(), 77 active: event.active.unwrap_or(true), 78 status: event.status.clone(), 79 seq: event.seq, 80 time: format_atproto_time(event.created_at), 81 }; 82 let header = FrameHeader { 83 op: 1, 84 t: "#account".to_string(), 85 }; 86 let mut bytes = Vec::new(); 87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 89 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); 90 tracing::info!( 91 did = %frame.did, 92 active = frame.active, 93 status = ?frame.status, 94 cbor_len = bytes.len(), 95 cbor_hex = %hex_str, 96 "Sending account event to firehose" 97 ); 98 Ok(bytes) 99} 100 101async fn format_sync_event( 102 state: &AppState, 103 event: &SequencedEvent, 104) -> Result<Vec<u8>, anyhow::Error> { 105 let commit_cid_str = event 106 .commit_cid 107 .as_ref() 108 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 109 let commit_cid = Cid::from_str(commit_cid_str)?; 110 let commit_bytes = state 111 .block_store 112 .get(&commit_cid) 113 .await? 114 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 115 let rev = if let Some(ref stored_rev) = event.rev { 116 stored_rev.clone() 117 } else { 118 extract_rev_from_commit_bytes(&commit_bytes) 119 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 120 }; 121 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 122 let frame = SyncFrame { 123 did: event.did.clone(), 124 rev, 125 blocks: car_bytes, 126 seq: event.seq, 127 time: format_atproto_time(event.created_at), 128 }; 129 let header = FrameHeader { 130 op: 1, 131 t: "#sync".to_string(), 132 }; 133 let mut bytes = Vec::new(); 134 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 135 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 136 Ok(bytes) 137} 138 139pub async fn format_event_for_sending( 140 state: &AppState, 141 event: SequencedEvent, 142) -> Result<Vec<u8>, anyhow::Error> { 143 match event.event_type.as_str() { 144 "identity" => return format_identity_event(&event), 145 "account" => return format_account_event(&event), 146 "sync" => return format_sync_event(state, &event).await, 147 _ => {} 148 } 149 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 150 let prev_cid_str = event.prev_cid.clone(); 151 let prev_data_cid_str = event.prev_data_cid.clone(); 152 let mut frame: CommitFrame = event 153 .try_into() 154 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 155 if let Some(ref pdc) = prev_data_cid_str 156 && let Ok(cid) = Cid::from_str(pdc) 157 { 158 frame.prev_data = Some(cid); 159 } 160 let commit_cid = frame.commit; 161 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 162 let mut all_cids: Vec<Cid> = block_cids_str 163 .iter() 164 .filter_map(|s| Cid::from_str(s).ok()) 165 .filter(|c| Some(*c) != prev_cid) 166 .collect(); 167 if !all_cids.contains(&commit_cid) { 168 all_cids.push(commit_cid); 169 } 170 if let Some(ref pc) = prev_cid 171 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 172 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) 173 { 174 frame.since = Some(rev); 175 } 176 let car_bytes = if !all_cids.is_empty() { 177 let fetched = state.block_store.get_many(&all_cids).await?; 178 let mut blocks = std::collections::BTreeMap::new(); 179 let mut commit_bytes: Option<Bytes> = None; 180 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 181 if let Some(data) = data_opt { 182 if *cid == commit_cid { 183 commit_bytes = Some(data.clone()); 184 if let Some(rev) = extract_rev_from_commit_bytes(data) { 185 frame.rev = rev; 186 } 187 } else { 188 blocks.insert(*cid, data.clone()); 189 } 190 } 191 } 192 write_car_blocks(commit_cid, commit_bytes, blocks).await? 193 } else { 194 Vec::new() 195 }; 196 frame.blocks = car_bytes; 197 let header = FrameHeader { 198 op: 1, 199 t: "#commit".to_string(), 200 }; 201 let mut bytes = Vec::new(); 202 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 203 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 204 Ok(bytes) 205} 206 207pub async fn prefetch_blocks_for_events( 208 state: &AppState, 209 events: &[SequencedEvent], 210) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 211 let mut all_cids: Vec<Cid> = Vec::new(); 212 for event in events { 213 if let Some(ref commit_cid_str) = event.commit_cid 214 && let Ok(cid) = Cid::from_str(commit_cid_str) 215 { 216 all_cids.push(cid); 217 } 218 if let Some(ref prev_cid_str) = event.prev_cid 219 && let Ok(cid) = Cid::from_str(prev_cid_str) 220 { 221 all_cids.push(cid); 222 } 223 if let Some(ref block_cids_str) = event.blocks_cids { 224 for s in block_cids_str { 225 if let Ok(cid) = Cid::from_str(s) { 226 all_cids.push(cid); 227 } 228 } 229 } 230 } 231 all_cids.sort(); 232 all_cids.dedup(); 233 if all_cids.is_empty() { 234 return Ok(HashMap::new()); 235 } 236 let fetched = state.block_store.get_many(&all_cids).await?; 237 let mut blocks_map = HashMap::new(); 238 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 239 if let Some(data) = data_opt { 240 blocks_map.insert(cid, data); 241 } 242 } 243 Ok(blocks_map) 244} 245 246fn format_sync_event_with_prefetched( 247 event: &SequencedEvent, 248 prefetched: &HashMap<Cid, Bytes>, 249) -> Result<Vec<u8>, anyhow::Error> { 250 let commit_cid_str = event 251 .commit_cid 252 .as_ref() 253 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 254 let commit_cid = Cid::from_str(commit_cid_str)?; 255 let commit_bytes = prefetched 256 .get(&commit_cid) 257 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 258 let rev = if let Some(ref stored_rev) = event.rev { 259 stored_rev.clone() 260 } else { 261 extract_rev_from_commit_bytes(commit_bytes) 262 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 263 }; 264 let car_bytes = futures::executor::block_on(write_car_blocks( 265 commit_cid, 266 Some(commit_bytes.clone()), 267 BTreeMap::new(), 268 ))?; 269 let frame = SyncFrame { 270 did: event.did.clone(), 271 rev, 272 blocks: car_bytes, 273 seq: event.seq, 274 time: format_atproto_time(event.created_at), 275 }; 276 let header = FrameHeader { 277 op: 1, 278 t: "#sync".to_string(), 279 }; 280 let mut bytes = Vec::new(); 281 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 282 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 283 Ok(bytes) 284} 285 286pub async fn format_event_with_prefetched_blocks( 287 event: SequencedEvent, 288 prefetched: &HashMap<Cid, Bytes>, 289) -> Result<Vec<u8>, anyhow::Error> { 290 match event.event_type.as_str() { 291 "identity" => return format_identity_event(&event), 292 "account" => return format_account_event(&event), 293 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 294 _ => {} 295 } 296 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 297 let prev_cid_str = event.prev_cid.clone(); 298 let prev_data_cid_str = event.prev_data_cid.clone(); 299 let mut frame: CommitFrame = event 300 .try_into() 301 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 302 if let Some(ref pdc) = prev_data_cid_str 303 && let Ok(cid) = Cid::from_str(pdc) 304 { 305 frame.prev_data = Some(cid); 306 } 307 let commit_cid = frame.commit; 308 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 309 let mut all_cids: Vec<Cid> = block_cids_str 310 .iter() 311 .filter_map(|s| Cid::from_str(s).ok()) 312 .filter(|c| Some(*c) != prev_cid) 313 .collect(); 314 if !all_cids.contains(&commit_cid) { 315 all_cids.push(commit_cid); 316 } 317 if let Some(commit_bytes) = prefetched.get(&commit_cid) 318 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) 319 { 320 frame.rev = rev; 321 } 322 if let Some(ref pc) = prev_cid 323 && let Some(prev_bytes) = prefetched.get(pc) 324 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) 325 { 326 frame.since = Some(rev); 327 } 328 let car_bytes = if !all_cids.is_empty() { 329 let mut blocks = BTreeMap::new(); 330 let mut commit_bytes_for_car: Option<Bytes> = None; 331 for cid in all_cids { 332 if let Some(data) = prefetched.get(&cid) { 333 if cid == commit_cid { 334 commit_bytes_for_car = Some(data.clone()); 335 } else { 336 blocks.insert(cid, data.clone()); 337 } 338 } 339 } 340 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 341 } else { 342 Vec::new() 343 }; 344 frame.blocks = car_bytes; 345 let header = FrameHeader { 346 op: 1, 347 t: "#commit".to_string(), 348 }; 349 let mut bytes = Vec::new(); 350 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 351 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 352 Ok(bytes) 353}