this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 4use bytes::Bytes; 5use cid::Cid; 6use iroh_car::{CarHeader, CarWriter}; 7use jacquard_repo::commit::Commit; 8use jacquard_repo::storage::BlockStore; 9use std::collections::{BTreeMap, HashMap}; 10use std::io::Cursor; 11use std::str::FromStr; 12use tokio::io::AsyncWriteExt; 13 14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 15 Commit::from_cbor(commit_bytes) 16 .ok() 17 .map(|c| c.rev().to_string()) 18} 19 20async fn write_car_blocks( 21 commit_cid: Cid, 22 commit_bytes: Option<Bytes>, 23 other_blocks: BTreeMap<Cid, Bytes>, 24) -> Result<Vec<u8>, anyhow::Error> { 25 let mut buffer = Cursor::new(Vec::new()); 26 let header = CarHeader::new_v1(vec![commit_cid]); 27 let mut writer = CarWriter::new(header, &mut buffer); 28 for (cid, data) in other_blocks { 29 if cid != commit_cid { 30 writer 31 .write(cid, data.as_ref()) 32 .await 33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 34 } 35 } 36 if let Some(data) = commit_bytes { 37 writer 38 .write(commit_cid, data.as_ref()) 39 .await 40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 41 } 42 writer 43 .finish() 44 .await 45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 46 buffer 47 .flush() 48 .await 49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 50 Ok(buffer.into_inner()) 51} 52 53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 55} 56 57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 58 let frame = IdentityFrame { 59 did: event.did.clone(), 60 handle: event.handle.clone(), 61 seq: event.seq, 62 time: format_atproto_time(event.created_at), 63 }; 64 let header = FrameHeader { 65 op: 1, 66 t: "#identity".to_string(), 67 }; 68 let mut bytes = Vec::new(); 69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 71 Ok(bytes) 72} 73 74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 75 let frame = AccountFrame { 76 did: event.did.clone(), 77 active: event.active.unwrap_or(true), 78 status: event.status.clone(), 79 seq: event.seq, 80 time: format_atproto_time(event.created_at), 81 }; 82 let header = FrameHeader { 83 op: 1, 84 t: "#account".to_string(), 85 }; 86 let mut bytes = Vec::new(); 87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 89 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); 90 tracing::info!( 91 did = %frame.did, 92 active = frame.active, 93 status = ?frame.status, 94 cbor_len = bytes.len(), 95 cbor_hex = %hex_str, 96 "Sending account event to firehose" 97 ); 98 Ok(bytes) 99} 100 101async fn format_sync_event( 102 state: &AppState, 103 event: &SequencedEvent, 104) -> Result<Vec<u8>, anyhow::Error> { 105 let commit_cid_str = event 106 .commit_cid 107 .as_ref() 108 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 109 let commit_cid = Cid::from_str(commit_cid_str)?; 110 let commit_bytes = state 111 .block_store 112 .get(&commit_cid) 113 .await? 114 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 115 let rev = extract_rev_from_commit_bytes(&commit_bytes) 116 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 117 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 118 let frame = SyncFrame { 119 did: event.did.clone(), 120 rev, 121 blocks: car_bytes, 122 seq: event.seq, 123 time: format_atproto_time(event.created_at), 124 }; 125 let header = FrameHeader { 126 op: 1, 127 t: "#sync".to_string(), 128 }; 129 let mut bytes = Vec::new(); 130 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 131 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 132 Ok(bytes) 133} 134 135pub async fn format_event_for_sending( 136 state: &AppState, 137 event: SequencedEvent, 138) -> Result<Vec<u8>, anyhow::Error> { 139 match event.event_type.as_str() { 140 "identity" => return format_identity_event(&event), 141 "account" => return format_account_event(&event), 142 "sync" => return format_sync_event(state, &event).await, 143 _ => {} 144 } 145 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 146 let prev_cid_str = event.prev_cid.clone(); 147 let prev_data_cid_str = event.prev_data_cid.clone(); 148 let mut frame: CommitFrame = event 149 .try_into() 150 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 151 if let Some(ref pdc) = prev_data_cid_str 152 && let Ok(cid) = Cid::from_str(pdc) 153 { 154 frame.prev_data = Some(cid); 155 } 156 let commit_cid = frame.commit; 157 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 158 let mut all_cids: Vec<Cid> = block_cids_str 159 .iter() 160 .filter_map(|s| Cid::from_str(s).ok()) 161 .filter(|c| Some(*c) != prev_cid) 162 .collect(); 163 if !all_cids.contains(&commit_cid) { 164 all_cids.push(commit_cid); 165 } 166 if let Some(ref pc) = prev_cid 167 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 168 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) 169 { 170 frame.since = Some(rev); 171 } 172 let car_bytes = if !all_cids.is_empty() { 173 let fetched = state.block_store.get_many(&all_cids).await?; 174 let mut blocks = std::collections::BTreeMap::new(); 175 let mut commit_bytes: Option<Bytes> = None; 176 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 177 if let Some(data) = data_opt { 178 if *cid == commit_cid { 179 commit_bytes = Some(data.clone()); 180 if let Some(rev) = extract_rev_from_commit_bytes(data) { 181 frame.rev = rev; 182 } 183 } else { 184 blocks.insert(*cid, data.clone()); 185 } 186 } 187 } 188 write_car_blocks(commit_cid, commit_bytes, blocks).await? 189 } else { 190 Vec::new() 191 }; 192 frame.blocks = car_bytes; 193 let header = FrameHeader { 194 op: 1, 195 t: "#commit".to_string(), 196 }; 197 let mut bytes = Vec::new(); 198 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 199 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 200 Ok(bytes) 201} 202 203pub async fn prefetch_blocks_for_events( 204 state: &AppState, 205 events: &[SequencedEvent], 206) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 207 let mut all_cids: Vec<Cid> = Vec::new(); 208 for event in events { 209 if let Some(ref commit_cid_str) = event.commit_cid 210 && let Ok(cid) = Cid::from_str(commit_cid_str) 211 { 212 all_cids.push(cid); 213 } 214 if let Some(ref prev_cid_str) = event.prev_cid 215 && let Ok(cid) = Cid::from_str(prev_cid_str) 216 { 217 all_cids.push(cid); 218 } 219 if let Some(ref block_cids_str) = event.blocks_cids { 220 for s in block_cids_str { 221 if let Ok(cid) = Cid::from_str(s) { 222 all_cids.push(cid); 223 } 224 } 225 } 226 } 227 all_cids.sort(); 228 all_cids.dedup(); 229 if all_cids.is_empty() { 230 return Ok(HashMap::new()); 231 } 232 let fetched = state.block_store.get_many(&all_cids).await?; 233 let mut blocks_map = HashMap::new(); 234 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 235 if let Some(data) = data_opt { 236 blocks_map.insert(cid, data); 237 } 238 } 239 Ok(blocks_map) 240} 241 242fn format_sync_event_with_prefetched( 243 event: &SequencedEvent, 244 prefetched: &HashMap<Cid, Bytes>, 245) -> Result<Vec<u8>, anyhow::Error> { 246 let commit_cid_str = event 247 .commit_cid 248 .as_ref() 249 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 250 let commit_cid = Cid::from_str(commit_cid_str)?; 251 let commit_bytes = prefetched 252 .get(&commit_cid) 253 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 254 let rev = extract_rev_from_commit_bytes(commit_bytes) 255 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 256 let car_bytes = futures::executor::block_on(write_car_blocks( 257 commit_cid, 258 Some(commit_bytes.clone()), 259 BTreeMap::new(), 260 ))?; 261 let frame = SyncFrame { 262 did: event.did.clone(), 263 rev, 264 blocks: car_bytes, 265 seq: event.seq, 266 time: format_atproto_time(event.created_at), 267 }; 268 let header = FrameHeader { 269 op: 1, 270 t: "#sync".to_string(), 271 }; 272 let mut bytes = Vec::new(); 273 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 274 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 275 Ok(bytes) 276} 277 278pub async fn format_event_with_prefetched_blocks( 279 event: SequencedEvent, 280 prefetched: &HashMap<Cid, Bytes>, 281) -> Result<Vec<u8>, anyhow::Error> { 282 match event.event_type.as_str() { 283 "identity" => return format_identity_event(&event), 284 "account" => return format_account_event(&event), 285 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 286 _ => {} 287 } 288 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 289 let prev_cid_str = event.prev_cid.clone(); 290 let prev_data_cid_str = event.prev_data_cid.clone(); 291 let mut frame: CommitFrame = event 292 .try_into() 293 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 294 if let Some(ref pdc) = prev_data_cid_str 295 && let Ok(cid) = Cid::from_str(pdc) 296 { 297 frame.prev_data = Some(cid); 298 } 299 let commit_cid = frame.commit; 300 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 301 let mut all_cids: Vec<Cid> = block_cids_str 302 .iter() 303 .filter_map(|s| Cid::from_str(s).ok()) 304 .filter(|c| Some(*c) != prev_cid) 305 .collect(); 306 if !all_cids.contains(&commit_cid) { 307 all_cids.push(commit_cid); 308 } 309 if let Some(commit_bytes) = prefetched.get(&commit_cid) 310 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) 311 { 312 frame.rev = rev; 313 } 314 if let Some(ref pc) = prev_cid 315 && let Some(prev_bytes) = prefetched.get(pc) 316 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) 317 { 318 frame.since = Some(rev); 319 } 320 let car_bytes = if !all_cids.is_empty() { 321 let mut blocks = BTreeMap::new(); 322 let mut commit_bytes_for_car: Option<Bytes> = None; 323 for cid in all_cids { 324 if let Some(data) = prefetched.get(&cid) { 325 if cid == commit_cid { 326 commit_bytes_for_car = Some(data.clone()); 327 } else { 328 blocks.insert(cid, data.clone()); 329 } 330 } 331 } 332 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 333 } else { 334 Vec::new() 335 }; 336 frame.blocks = car_bytes; 337 let header = FrameHeader { 338 op: 1, 339 t: "#commit".to_string(), 340 }; 341 let mut bytes = Vec::new(); 342 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 343 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 344 Ok(bytes) 345}