this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame};
4use bytes::Bytes;
5use cid::Cid;
6use iroh_car::{CarHeader, CarWriter};
7use jacquard_repo::commit::Commit;
8use jacquard_repo::storage::BlockStore;
9use std::collections::{BTreeMap, HashMap};
10use std::io::Cursor;
11use std::str::FromStr;
12use tokio::io::AsyncWriteExt;
13
14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
15 Commit::from_cbor(commit_bytes).ok().map(|c| c.rev().to_string())
16}
17
18async fn write_car_blocks(
19 commit_cid: Cid,
20 commit_bytes: Option<Bytes>,
21 other_blocks: BTreeMap<Cid, Bytes>,
22) -> Result<Vec<u8>, anyhow::Error> {
23 let mut buffer = Cursor::new(Vec::new());
24 let header = CarHeader::new_v1(vec![commit_cid]);
25 let mut writer = CarWriter::new(header, &mut buffer);
26 for (cid, data) in other_blocks {
27 if cid != commit_cid {
28 writer.write(cid, data.as_ref()).await
29 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
30 }
31 }
32 if let Some(data) = commit_bytes {
33 writer.write(commit_cid, data.as_ref()).await
34 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
35 }
36 writer.finish().await
37 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
38 buffer.flush().await
39 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
40 Ok(buffer.into_inner())
41}
42
43fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
44 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
45}
46
47fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
48 let frame = IdentityFrame {
49 did: event.did.clone(),
50 handle: event.handle.clone(),
51 seq: event.seq,
52 time: format_atproto_time(event.created_at),
53 };
54 let header = FrameHeader {
55 op: 1,
56 t: "#identity".to_string(),
57 };
58 let mut bytes = Vec::new();
59 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
60 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
61 Ok(bytes)
62}
63
64fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
65 let frame = AccountFrame {
66 did: event.did.clone(),
67 active: event.active.unwrap_or(true),
68 status: event.status.clone(),
69 seq: event.seq,
70 time: format_atproto_time(event.created_at),
71 };
72 let header = FrameHeader {
73 op: 1,
74 t: "#account".to_string(),
75 };
76 let mut bytes = Vec::new();
77 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
78 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
79 Ok(bytes)
80}
81
82async fn format_sync_event(
83 state: &AppState,
84 event: &SequencedEvent,
85) -> Result<Vec<u8>, anyhow::Error> {
86 let commit_cid_str = event.commit_cid.as_ref()
87 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
88 let commit_cid = Cid::from_str(commit_cid_str)?;
89 let commit_bytes = state.block_store.get(&commit_cid).await?
90 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
91 let rev = extract_rev_from_commit_bytes(&commit_bytes)
92 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
93 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
94 let frame = SyncFrame {
95 did: event.did.clone(),
96 rev,
97 blocks: car_bytes,
98 seq: event.seq,
99 time: format_atproto_time(event.created_at),
100 };
101 let header = FrameHeader {
102 op: 1,
103 t: "#sync".to_string(),
104 };
105 let mut bytes = Vec::new();
106 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
107 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
108 Ok(bytes)
109}
110
111pub async fn format_event_for_sending(
112 state: &AppState,
113 event: SequencedEvent,
114) -> Result<Vec<u8>, anyhow::Error> {
115 match event.event_type.as_str() {
116 "identity" => return format_identity_event(&event),
117 "account" => return format_account_event(&event),
118 "sync" => return format_sync_event(state, &event).await,
119 _ => {}
120 }
121 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
122 let prev_cid_str = event.prev_cid.clone();
123 let prev_data_cid_str = event.prev_data_cid.clone();
124 let mut frame: CommitFrame = event.try_into()
125 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
126 if let Some(ref pdc) = prev_data_cid_str {
127 if let Ok(cid) = Cid::from_str(pdc) {
128 frame.prev_data = Some(cid);
129 }
130 }
131 let commit_cid = frame.commit;
132 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
133 let mut all_cids: Vec<Cid> = block_cids_str
134 .iter()
135 .filter_map(|s| Cid::from_str(s).ok())
136 .filter(|c| Some(*c) != prev_cid)
137 .collect();
138 if !all_cids.contains(&commit_cid) {
139 all_cids.push(commit_cid);
140 }
141 if let Some(ref pc) = prev_cid {
142 if let Ok(Some(prev_bytes)) = state.block_store.get(pc).await {
143 if let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) {
144 frame.since = Some(rev);
145 }
146 }
147 }
148 let car_bytes = if !all_cids.is_empty() {
149 let fetched = state.block_store.get_many(&all_cids).await?;
150 let mut blocks = std::collections::BTreeMap::new();
151 let mut commit_bytes: Option<Bytes> = None;
152 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
153 if let Some(data) = data_opt {
154 if *cid == commit_cid {
155 commit_bytes = Some(data.clone());
156 if let Some(rev) = extract_rev_from_commit_bytes(data) {
157 frame.rev = rev;
158 }
159 } else {
160 blocks.insert(*cid, data.clone());
161 }
162 }
163 }
164 write_car_blocks(commit_cid, commit_bytes, blocks).await?
165 } else {
166 Vec::new()
167 };
168 frame.blocks = car_bytes;
169 let header = FrameHeader {
170 op: 1,
171 t: "#commit".to_string(),
172 };
173 let mut bytes = Vec::new();
174 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
175 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
176 Ok(bytes)
177}
178
179pub async fn prefetch_blocks_for_events(
180 state: &AppState,
181 events: &[SequencedEvent],
182) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
183 let mut all_cids: Vec<Cid> = Vec::new();
184 for event in events {
185 if let Some(ref commit_cid_str) = event.commit_cid {
186 if let Ok(cid) = Cid::from_str(commit_cid_str) {
187 all_cids.push(cid);
188 }
189 }
190 if let Some(ref prev_cid_str) = event.prev_cid {
191 if let Ok(cid) = Cid::from_str(prev_cid_str) {
192 all_cids.push(cid);
193 }
194 }
195 if let Some(ref block_cids_str) = event.blocks_cids {
196 for s in block_cids_str {
197 if let Ok(cid) = Cid::from_str(s) {
198 all_cids.push(cid);
199 }
200 }
201 }
202 }
203 all_cids.sort();
204 all_cids.dedup();
205 if all_cids.is_empty() {
206 return Ok(HashMap::new());
207 }
208 let fetched = state.block_store.get_many(&all_cids).await?;
209 let mut blocks_map = HashMap::new();
210 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
211 if let Some(data) = data_opt {
212 blocks_map.insert(cid, data);
213 }
214 }
215 Ok(blocks_map)
216}
217
218fn format_sync_event_with_prefetched(
219 event: &SequencedEvent,
220 prefetched: &HashMap<Cid, Bytes>,
221) -> Result<Vec<u8>, anyhow::Error> {
222 let commit_cid_str = event.commit_cid.as_ref()
223 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
224 let commit_cid = Cid::from_str(commit_cid_str)?;
225 let commit_bytes = prefetched.get(&commit_cid)
226 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
227 let rev = extract_rev_from_commit_bytes(commit_bytes)
228 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
229 let car_bytes = futures::executor::block_on(
230 write_car_blocks(commit_cid, Some(commit_bytes.clone()), BTreeMap::new())
231 )?;
232 let frame = SyncFrame {
233 did: event.did.clone(),
234 rev,
235 blocks: car_bytes,
236 seq: event.seq,
237 time: format_atproto_time(event.created_at),
238 };
239 let header = FrameHeader {
240 op: 1,
241 t: "#sync".to_string(),
242 };
243 let mut bytes = Vec::new();
244 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
245 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
246 Ok(bytes)
247}
248
249pub async fn format_event_with_prefetched_blocks(
250 event: SequencedEvent,
251 prefetched: &HashMap<Cid, Bytes>,
252) -> Result<Vec<u8>, anyhow::Error> {
253 match event.event_type.as_str() {
254 "identity" => return format_identity_event(&event),
255 "account" => return format_account_event(&event),
256 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
257 _ => {}
258 }
259 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
260 let prev_cid_str = event.prev_cid.clone();
261 let prev_data_cid_str = event.prev_data_cid.clone();
262 let mut frame: CommitFrame = event.try_into()
263 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
264 if let Some(ref pdc) = prev_data_cid_str {
265 if let Ok(cid) = Cid::from_str(pdc) {
266 frame.prev_data = Some(cid);
267 }
268 }
269 let commit_cid = frame.commit;
270 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
271 let mut all_cids: Vec<Cid> = block_cids_str
272 .iter()
273 .filter_map(|s| Cid::from_str(s).ok())
274 .filter(|c| Some(*c) != prev_cid)
275 .collect();
276 if !all_cids.contains(&commit_cid) {
277 all_cids.push(commit_cid);
278 }
279 if let Some(commit_bytes) = prefetched.get(&commit_cid) {
280 if let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) {
281 frame.rev = rev;
282 }
283 }
284 if let Some(ref pc) = prev_cid {
285 if let Some(prev_bytes) = prefetched.get(pc) {
286 if let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) {
287 frame.since = Some(rev);
288 }
289 }
290 }
291 let car_bytes = if !all_cids.is_empty() {
292 let mut blocks = BTreeMap::new();
293 let mut commit_bytes_for_car: Option<Bytes> = None;
294 for cid in all_cids {
295 if let Some(data) = prefetched.get(&cid) {
296 if cid == commit_cid {
297 commit_bytes_for_car = Some(data.clone());
298 } else {
299 blocks.insert(cid, data.clone());
300 }
301 }
302 }
303 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
304 } else {
305 Vec::new()
306 };
307 frame.blocks = car_bytes;
308 let header = FrameHeader {
309 op: 1,
310 t: "#commit".to_string(),
311 };
312 let mut bytes = Vec::new();
313 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
314 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
315 Ok(bytes)
316}