this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 4use bytes::Bytes; 5use cid::Cid; 6use iroh_car::{CarHeader, CarWriter}; 7use jacquard_repo::commit::Commit; 8use jacquard_repo::storage::BlockStore; 9use std::collections::{BTreeMap, HashMap}; 10use std::io::Cursor; 11use std::str::FromStr; 12use tokio::io::AsyncWriteExt; 13fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 14 Commit::from_cbor(commit_bytes).ok().map(|c| c.rev().to_string()) 15} 16async fn write_car_blocks( 17 commit_cid: Cid, 18 commit_bytes: Option<Bytes>, 19 other_blocks: BTreeMap<Cid, Bytes>, 20) -> Result<Vec<u8>, anyhow::Error> { 21 let mut buffer = Cursor::new(Vec::new()); 22 let header = CarHeader::new_v1(vec![commit_cid]); 23 let mut writer = CarWriter::new(header, &mut buffer); 24 for (cid, data) in other_blocks { 25 if cid != commit_cid { 26 writer.write(cid, data.as_ref()).await 27 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 28 } 29 } 30 if let Some(data) = commit_bytes { 31 writer.write(commit_cid, data.as_ref()).await 32 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 33 } 34 writer.finish().await 35 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 36 buffer.flush().await 37 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 38 Ok(buffer.into_inner()) 39} 40fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 41 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 42} 43fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 44 let frame = IdentityFrame { 45 did: event.did.clone(), 46 handle: event.handle.clone(), 47 seq: event.seq, 48 time: format_atproto_time(event.created_at), 49 }; 50 let header = FrameHeader { 51 op: 1, 52 t: "#identity".to_string(), 53 }; 54 let mut bytes = Vec::new(); 55 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 56 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 57 Ok(bytes) 58} 59fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 60 let frame = AccountFrame { 61 did: event.did.clone(), 62 active: event.active.unwrap_or(true), 63 status: event.status.clone(), 64 seq: event.seq, 65 time: format_atproto_time(event.created_at), 66 }; 67 let header = FrameHeader { 68 op: 1, 69 t: "#account".to_string(), 70 }; 71 let mut bytes = Vec::new(); 72 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 73 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 74 Ok(bytes) 75} 76async fn format_sync_event( 77 state: &AppState, 78 event: &SequencedEvent, 79) -> Result<Vec<u8>, anyhow::Error> { 80 let commit_cid_str = event.commit_cid.as_ref() 81 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 82 let commit_cid = Cid::from_str(commit_cid_str)?; 83 let commit_bytes = state.block_store.get(&commit_cid).await? 84 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 85 let rev = extract_rev_from_commit_bytes(&commit_bytes) 86 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 87 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 88 let frame = SyncFrame { 89 did: event.did.clone(), 90 rev, 91 blocks: car_bytes, 92 seq: event.seq, 93 time: format_atproto_time(event.created_at), 94 }; 95 let header = FrameHeader { 96 op: 1, 97 t: "#sync".to_string(), 98 }; 99 let mut bytes = Vec::new(); 100 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 101 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 102 Ok(bytes) 103} 104pub async fn format_event_for_sending( 105 state: &AppState, 106 event: SequencedEvent, 107) -> Result<Vec<u8>, anyhow::Error> { 108 match event.event_type.as_str() { 109 "identity" => return format_identity_event(&event), 110 "account" => return format_account_event(&event), 111 "sync" => return format_sync_event(state, &event).await, 112 _ => {} 113 } 114 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 115 let prev_cid_str = event.prev_cid.clone(); 116 let prev_data_cid_str = event.prev_data_cid.clone(); 117 let mut frame: CommitFrame = event.try_into() 118 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 119 if let Some(ref pdc) = prev_data_cid_str { 120 if let Ok(cid) = Cid::from_str(pdc) { 121 frame.prev_data = Some(cid); 122 } 123 } 124 let commit_cid = frame.commit; 125 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 126 let mut all_cids: Vec<Cid> = block_cids_str 127 .iter() 128 .filter_map(|s| Cid::from_str(s).ok()) 129 .filter(|c| Some(*c) != prev_cid) 130 .collect(); 131 if !all_cids.contains(&commit_cid) { 132 all_cids.push(commit_cid); 133 } 134 if let Some(ref pc) = prev_cid { 135 if let Ok(Some(prev_bytes)) = state.block_store.get(pc).await { 136 if let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) { 137 frame.since = Some(rev); 138 } 139 } 140 } 141 let car_bytes = if !all_cids.is_empty() { 142 let fetched = state.block_store.get_many(&all_cids).await?; 143 let mut blocks = std::collections::BTreeMap::new(); 144 let mut commit_bytes: Option<Bytes> = None; 145 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 146 if let Some(data) = data_opt { 147 if *cid == commit_cid { 148 commit_bytes = Some(data.clone()); 149 if let Some(rev) = extract_rev_from_commit_bytes(data) { 150 frame.rev = rev; 151 } 152 } else { 153 blocks.insert(*cid, data.clone()); 154 } 155 } 156 } 157 write_car_blocks(commit_cid, commit_bytes, blocks).await? 158 } else { 159 Vec::new() 160 }; 161 frame.blocks = car_bytes; 162 let header = FrameHeader { 163 op: 1, 164 t: "#commit".to_string(), 165 }; 166 let mut bytes = Vec::new(); 167 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 168 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 169 Ok(bytes) 170} 171pub async fn prefetch_blocks_for_events( 172 state: &AppState, 173 events: &[SequencedEvent], 174) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 175 let mut all_cids: Vec<Cid> = Vec::new(); 176 for event in events { 177 if let Some(ref commit_cid_str) = event.commit_cid { 178 if let Ok(cid) = Cid::from_str(commit_cid_str) { 179 all_cids.push(cid); 180 } 181 } 182 if let Some(ref prev_cid_str) = event.prev_cid { 183 if let Ok(cid) = Cid::from_str(prev_cid_str) { 184 all_cids.push(cid); 185 } 186 } 187 if let Some(ref block_cids_str) = event.blocks_cids { 188 for s in block_cids_str { 189 if let Ok(cid) = Cid::from_str(s) { 190 all_cids.push(cid); 191 } 192 } 193 } 194 } 195 all_cids.sort(); 196 all_cids.dedup(); 197 if all_cids.is_empty() { 198 return Ok(HashMap::new()); 199 } 200 let fetched = state.block_store.get_many(&all_cids).await?; 201 let mut blocks_map = HashMap::new(); 202 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 203 if let Some(data) = data_opt { 204 blocks_map.insert(cid, data); 205 } 206 } 207 Ok(blocks_map) 208} 209fn format_sync_event_with_prefetched( 210 event: &SequencedEvent, 211 prefetched: &HashMap<Cid, Bytes>, 212) -> Result<Vec<u8>, anyhow::Error> { 213 let commit_cid_str = event.commit_cid.as_ref() 214 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 215 let commit_cid = Cid::from_str(commit_cid_str)?; 216 let commit_bytes = prefetched.get(&commit_cid) 217 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 218 let rev = extract_rev_from_commit_bytes(commit_bytes) 219 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; 220 let car_bytes = futures::executor::block_on( 221 write_car_blocks(commit_cid, Some(commit_bytes.clone()), BTreeMap::new()) 222 )?; 223 let frame = SyncFrame { 224 did: event.did.clone(), 225 rev, 226 blocks: car_bytes, 227 seq: event.seq, 228 time: format_atproto_time(event.created_at), 229 }; 230 let header = FrameHeader { 231 op: 1, 232 t: "#sync".to_string(), 233 }; 234 let mut bytes = Vec::new(); 235 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 236 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 237 Ok(bytes) 238} 239pub async fn format_event_with_prefetched_blocks( 240 event: SequencedEvent, 241 prefetched: &HashMap<Cid, Bytes>, 242) -> Result<Vec<u8>, anyhow::Error> { 243 match event.event_type.as_str() { 244 "identity" => return format_identity_event(&event), 245 "account" => return format_account_event(&event), 246 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 247 _ => {} 248 } 249 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 250 let prev_cid_str = event.prev_cid.clone(); 251 let prev_data_cid_str = event.prev_data_cid.clone(); 252 let mut frame: CommitFrame = event.try_into() 253 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 254 if let Some(ref pdc) = prev_data_cid_str { 255 if let Ok(cid) = Cid::from_str(pdc) { 256 frame.prev_data = Some(cid); 257 } 258 } 259 let commit_cid = frame.commit; 260 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 261 let mut all_cids: Vec<Cid> = block_cids_str 262 .iter() 263 .filter_map(|s| Cid::from_str(s).ok()) 264 .filter(|c| Some(*c) != prev_cid) 265 .collect(); 266 if !all_cids.contains(&commit_cid) { 267 all_cids.push(commit_cid); 268 } 269 if let Some(commit_bytes) = prefetched.get(&commit_cid) { 270 if let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) { 271 frame.rev = rev; 272 } 273 } 274 if let Some(ref pc) = prev_cid { 275 if let Some(prev_bytes) = prefetched.get(pc) { 276 if let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) { 277 frame.since = Some(rev); 278 } 279 } 280 } 281 let car_bytes = if !all_cids.is_empty() { 282 let mut blocks = BTreeMap::new(); 283 let mut commit_bytes_for_car: Option<Bytes> = None; 284 for cid in all_cids { 285 if let Some(data) = prefetched.get(&cid) { 286 if cid == commit_cid { 287 commit_bytes_for_car = Some(data.clone()); 288 } else { 289 blocks.insert(cid, data.clone()); 290 } 291 } 292 } 293 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 294 } else { 295 Vec::new() 296 }; 297 frame.blocks = car_bytes; 298 let header = FrameHeader { 299 op: 1, 300 t: "#commit".to_string(), 301 }; 302 let mut bytes = Vec::new(); 303 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 304 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 305 Ok(bytes) 306}