this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 4use bytes::Bytes; 5use cid::Cid; 6use iroh_car::{CarHeader, CarWriter}; 7use jacquard_repo::commit::Commit; 8use jacquard_repo::storage::BlockStore; 9use std::collections::{BTreeMap, HashMap}; 10use std::io::Cursor; 11use std::str::FromStr; 12use tokio::io::AsyncWriteExt; 13 14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 15 Commit::from_cbor(commit_bytes).ok().map(|c| c.rev().to_string()) 16} 17 18async fn write_car_blocks( 19 commit_cid: Cid, 20 commit_bytes: Option<Bytes>, 21 other_blocks: BTreeMap<Cid, Bytes>, 22) -> Result<Vec<u8>, anyhow::Error> { 23 let mut buffer = Cursor::new(Vec::new()); 24 let header = CarHeader::new_v1(vec![commit_cid]); 25 let mut writer = CarWriter::new(header, &mut buffer); 26 for (cid, data) in other_blocks { 27 if cid != commit_cid { 28 writer.write(cid, data.as_ref()).await 29 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 30 } 31 } 32 if let Some(data) = commit_bytes { 33 writer.write(commit_cid, data.as_ref()).await 34 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 35 } 36 writer.finish().await 37 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 38 buffer.flush().await 39 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 40 Ok(buffer.into_inner()) 41} 42 43fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 44 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 45} 46 47fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 48 let frame = IdentityFrame { 49 did: event.did.clone(), 50 handle: event.handle.clone(), 51 seq: event.seq, 52 time: format_atproto_time(event.created_at), 53 }; 54 let header = FrameHeader { 55 op: 1, 56 t: "#identity".to_string(), 57 }; 58 let mut bytes = Vec::new(); 59 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 60 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 61 Ok(bytes) 62} 63 64fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 65 let frame = AccountFrame { 66 did: event.did.clone(), 67 active: event.active.unwrap_or(true), 68 status: event.status.clone(), 69 seq: event.seq, 70 time: format_atproto_time(event.created_at), 71 }; 72 let header = FrameHeader { 73 op: 1, 74 t: "#account".to_string(), 75 }; 76 let mut bytes = Vec::new(); 77 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 78 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 79 Ok(bytes) 80} 81 82async fn format_sync_event( 83 state: &AppState, 84 event: &SequencedEvent, 85) -> Result<Vec<u8>, anyhow::Error> { 86 let commit_cid_str = event.commit_cid.as_ref() 87 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 88 let commit_cid = Cid::from_str(commit_cid_str)?; 89 let commit_bytes = state.block_store.get(&commit_cid).await? 90 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 91 let rev = extract_rev_from_commit_bytes(&commit_bytes) 92 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 93 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 94 let frame = SyncFrame { 95 did: event.did.clone(), 96 rev, 97 blocks: car_bytes, 98 seq: event.seq, 99 time: format_atproto_time(event.created_at), 100 }; 101 let header = FrameHeader { 102 op: 1, 103 t: "#sync".to_string(), 104 }; 105 let mut bytes = Vec::new(); 106 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 107 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 108 Ok(bytes) 109} 110 111pub async fn format_event_for_sending( 112 state: &AppState, 113 event: SequencedEvent, 114) -> Result<Vec<u8>, anyhow::Error> { 115 match event.event_type.as_str() { 116 "identity" => return format_identity_event(&event), 117 "account" => return format_account_event(&event), 118 "sync" => return format_sync_event(state, &event).await, 119 _ => {} 120 } 121 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 122 let prev_cid_str = event.prev_cid.clone(); 123 let prev_data_cid_str = event.prev_data_cid.clone(); 124 let mut frame: CommitFrame = event.try_into() 125 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 126 if let Some(ref pdc) = prev_data_cid_str { 127 if let Ok(cid) = Cid::from_str(pdc) { 128 frame.prev_data = Some(cid); 129 } 130 } 131 let commit_cid = frame.commit; 132 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 133 let mut all_cids: Vec<Cid> = block_cids_str 134 .iter() 135 .filter_map(|s| Cid::from_str(s).ok()) 136 .filter(|c| Some(*c) != prev_cid) 137 .collect(); 138 if !all_cids.contains(&commit_cid) { 139 all_cids.push(commit_cid); 140 } 141 if let Some(ref pc) = prev_cid { 142 if let Ok(Some(prev_bytes)) = state.block_store.get(pc).await { 143 if let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) { 144 frame.since = Some(rev); 145 } 146 } 147 } 148 let car_bytes = if !all_cids.is_empty() { 149 let fetched = state.block_store.get_many(&all_cids).await?; 150 let mut blocks = std::collections::BTreeMap::new(); 151 let mut commit_bytes: Option<Bytes> = None; 152 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 153 if let Some(data) = data_opt { 154 if *cid == commit_cid { 155 commit_bytes = Some(data.clone()); 156 if let Some(rev) = extract_rev_from_commit_bytes(data) { 157 frame.rev = rev; 158 } 159 } else { 160 blocks.insert(*cid, data.clone()); 161 } 162 } 163 } 164 write_car_blocks(commit_cid, commit_bytes, blocks).await? 165 } else { 166 Vec::new() 167 }; 168 frame.blocks = car_bytes; 169 let header = FrameHeader { 170 op: 1, 171 t: "#commit".to_string(), 172 }; 173 let mut bytes = Vec::new(); 174 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 175 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 176 Ok(bytes) 177} 178 179pub async fn prefetch_blocks_for_events( 180 state: &AppState, 181 events: &[SequencedEvent], 182) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 183 let mut all_cids: Vec<Cid> = Vec::new(); 184 for event in events { 185 if let Some(ref commit_cid_str) = event.commit_cid { 186 if let Ok(cid) = Cid::from_str(commit_cid_str) { 187 all_cids.push(cid); 188 } 189 } 190 if let Some(ref prev_cid_str) = event.prev_cid { 191 if let Ok(cid) = Cid::from_str(prev_cid_str) { 192 all_cids.push(cid); 193 } 194 } 195 if let Some(ref block_cids_str) = event.blocks_cids { 196 for s in block_cids_str { 197 if let Ok(cid) = Cid::from_str(s) { 198 all_cids.push(cid); 199 } 200 } 201 } 202 } 203 all_cids.sort(); 204 all_cids.dedup(); 205 if all_cids.is_empty() { 206 return Ok(HashMap::new()); 207 } 208 let fetched = state.block_store.get_many(&all_cids).await?; 209 let mut blocks_map = HashMap::new(); 210 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 211 if let Some(data) = data_opt { 212 blocks_map.insert(cid, data); 213 } 214 } 215 Ok(blocks_map) 216} 217 218fn format_sync_event_with_prefetched( 219 event: &SequencedEvent, 220 prefetched: &HashMap<Cid, Bytes>, 221) -> Result<Vec<u8>, anyhow::Error> { 222 let commit_cid_str = event.commit_cid.as_ref() 223 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 224 let commit_cid = Cid::from_str(commit_cid_str)?; 225 let commit_bytes = prefetched.get(&commit_cid) 226 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 227 let rev = extract_rev_from_commit_bytes(commit_bytes) 228 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 229 let car_bytes = futures::executor::block_on( 230 write_car_blocks(commit_cid, Some(commit_bytes.clone()), BTreeMap::new()) 231 )?; 232 let frame = SyncFrame { 233 did: event.did.clone(), 234 rev, 235 blocks: car_bytes, 236 seq: event.seq, 237 time: format_atproto_time(event.created_at), 238 }; 239 let header = FrameHeader { 240 op: 1, 241 t: "#sync".to_string(), 242 }; 243 let mut bytes = Vec::new(); 244 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 245 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 246 Ok(bytes) 247} 248 249pub async fn format_event_with_prefetched_blocks( 250 event: SequencedEvent, 251 prefetched: &HashMap<Cid, Bytes>, 252) -> Result<Vec<u8>, anyhow::Error> { 253 match event.event_type.as_str() { 254 "identity" => return format_identity_event(&event), 255 "account" => return format_account_event(&event), 256 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 257 _ => {} 258 } 259 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 260 let prev_cid_str = event.prev_cid.clone(); 261 let prev_data_cid_str = event.prev_data_cid.clone(); 262 let mut frame: CommitFrame = event.try_into() 263 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 264 if let Some(ref pdc) = prev_data_cid_str { 265 if let Ok(cid) = Cid::from_str(pdc) { 266 frame.prev_data = Some(cid); 267 } 268 } 269 let commit_cid = frame.commit; 270 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 271 let mut all_cids: Vec<Cid> = block_cids_str 272 .iter() 273 .filter_map(|s| Cid::from_str(s).ok()) 274 .filter(|c| Some(*c) != prev_cid) 275 .collect(); 276 if !all_cids.contains(&commit_cid) { 277 all_cids.push(commit_cid); 278 } 279 if let Some(commit_bytes) = prefetched.get(&commit_cid) { 280 if let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) { 281 frame.rev = rev; 282 } 283 } 284 if let Some(ref pc) = prev_cid { 285 if let Some(prev_bytes) = prefetched.get(pc) { 286 if let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) { 287 frame.since = Some(rev); 288 } 289 } 290 } 291 let car_bytes = if !all_cids.is_empty() { 292 let mut blocks = BTreeMap::new(); 293 let mut commit_bytes_for_car: Option<Bytes> = None; 294 for cid in all_cids { 295 if let Some(data) = prefetched.get(&cid) { 296 if cid == commit_cid { 297 commit_bytes_for_car = Some(data.clone()); 298 } else { 299 blocks.insert(cid, data.clone()); 300 } 301 } 302 } 303 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 304 } else { 305 Vec::new() 306 }; 307 frame.blocks = car_bytes; 308 let header = FrameHeader { 309 op: 1, 310 t: "#commit".to_string(), 311 }; 312 let mut bytes = Vec::new(); 313 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 314 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 315 Ok(bytes) 316}