this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame};
4use bytes::Bytes;
5use cid::Cid;
6use iroh_car::{CarHeader, CarWriter};
7use jacquard_repo::commit::Commit;
8use jacquard_repo::storage::BlockStore;
9use std::collections::{BTreeMap, HashMap};
10use std::io::Cursor;
11use std::str::FromStr;
12use tokio::io::AsyncWriteExt;
13
14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
15 Commit::from_cbor(commit_bytes)
16 .ok()
17 .map(|c| c.rev().to_string())
18}
19
20async fn write_car_blocks(
21 commit_cid: Cid,
22 commit_bytes: Option<Bytes>,
23 other_blocks: BTreeMap<Cid, Bytes>,
24) -> Result<Vec<u8>, anyhow::Error> {
25 let mut buffer = Cursor::new(Vec::new());
26 let header = CarHeader::new_v1(vec![commit_cid]);
27 let mut writer = CarWriter::new(header, &mut buffer);
28 for (cid, data) in other_blocks {
29 if cid != commit_cid {
30 writer
31 .write(cid, data.as_ref())
32 .await
33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
34 }
35 }
36 if let Some(data) = commit_bytes {
37 writer
38 .write(commit_cid, data.as_ref())
39 .await
40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
41 }
42 writer
43 .finish()
44 .await
45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
46 buffer
47 .flush()
48 .await
49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
50 Ok(buffer.into_inner())
51}
52
53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
55}
56
57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
58 let frame = IdentityFrame {
59 did: event.did.clone(),
60 handle: event.handle.clone(),
61 seq: event.seq,
62 time: format_atproto_time(event.created_at),
63 };
64 let header = FrameHeader {
65 op: 1,
66 t: "#identity".to_string(),
67 };
68 let mut bytes = Vec::new();
69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
71 Ok(bytes)
72}
73
74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
75 let frame = AccountFrame {
76 did: event.did.clone(),
77 active: event.active.unwrap_or(true),
78 status: event.status.clone(),
79 seq: event.seq,
80 time: format_atproto_time(event.created_at),
81 };
82 let header = FrameHeader {
83 op: 1,
84 t: "#account".to_string(),
85 };
86 let mut bytes = Vec::new();
87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
89 Ok(bytes)
90}
91
92async fn format_sync_event(
93 state: &AppState,
94 event: &SequencedEvent,
95) -> Result<Vec<u8>, anyhow::Error> {
96 let commit_cid_str = event
97 .commit_cid
98 .as_ref()
99 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
100 let commit_cid = Cid::from_str(commit_cid_str)?;
101 let commit_bytes = state
102 .block_store
103 .get(&commit_cid)
104 .await?
105 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
106 let rev = extract_rev_from_commit_bytes(&commit_bytes)
107 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
108 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
109 let frame = SyncFrame {
110 did: event.did.clone(),
111 rev,
112 blocks: car_bytes,
113 seq: event.seq,
114 time: format_atproto_time(event.created_at),
115 };
116 let header = FrameHeader {
117 op: 1,
118 t: "#sync".to_string(),
119 };
120 let mut bytes = Vec::new();
121 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
122 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
123 Ok(bytes)
124}
125
126pub async fn format_event_for_sending(
127 state: &AppState,
128 event: SequencedEvent,
129) -> Result<Vec<u8>, anyhow::Error> {
130 match event.event_type.as_str() {
131 "identity" => return format_identity_event(&event),
132 "account" => return format_account_event(&event),
133 "sync" => return format_sync_event(state, &event).await,
134 _ => {}
135 }
136 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
137 let prev_cid_str = event.prev_cid.clone();
138 let prev_data_cid_str = event.prev_data_cid.clone();
139 let mut frame: CommitFrame = event
140 .try_into()
141 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
142 if let Some(ref pdc) = prev_data_cid_str
143 && let Ok(cid) = Cid::from_str(pdc)
144 {
145 frame.prev_data = Some(cid);
146 }
147 let commit_cid = frame.commit;
148 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
149 let mut all_cids: Vec<Cid> = block_cids_str
150 .iter()
151 .filter_map(|s| Cid::from_str(s).ok())
152 .filter(|c| Some(*c) != prev_cid)
153 .collect();
154 if !all_cids.contains(&commit_cid) {
155 all_cids.push(commit_cid);
156 }
157 if let Some(ref pc) = prev_cid
158 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
159 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes)
160 {
161 frame.since = Some(rev);
162 }
163 let car_bytes = if !all_cids.is_empty() {
164 let fetched = state.block_store.get_many(&all_cids).await?;
165 let mut blocks = std::collections::BTreeMap::new();
166 let mut commit_bytes: Option<Bytes> = None;
167 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
168 if let Some(data) = data_opt {
169 if *cid == commit_cid {
170 commit_bytes = Some(data.clone());
171 if let Some(rev) = extract_rev_from_commit_bytes(data) {
172 frame.rev = rev;
173 }
174 } else {
175 blocks.insert(*cid, data.clone());
176 }
177 }
178 }
179 write_car_blocks(commit_cid, commit_bytes, blocks).await?
180 } else {
181 Vec::new()
182 };
183 frame.blocks = car_bytes;
184 let header = FrameHeader {
185 op: 1,
186 t: "#commit".to_string(),
187 };
188 let mut bytes = Vec::new();
189 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
190 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
191 Ok(bytes)
192}
193
194pub async fn prefetch_blocks_for_events(
195 state: &AppState,
196 events: &[SequencedEvent],
197) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
198 let mut all_cids: Vec<Cid> = Vec::new();
199 for event in events {
200 if let Some(ref commit_cid_str) = event.commit_cid
201 && let Ok(cid) = Cid::from_str(commit_cid_str)
202 {
203 all_cids.push(cid);
204 }
205 if let Some(ref prev_cid_str) = event.prev_cid
206 && let Ok(cid) = Cid::from_str(prev_cid_str)
207 {
208 all_cids.push(cid);
209 }
210 if let Some(ref block_cids_str) = event.blocks_cids {
211 for s in block_cids_str {
212 if let Ok(cid) = Cid::from_str(s) {
213 all_cids.push(cid);
214 }
215 }
216 }
217 }
218 all_cids.sort();
219 all_cids.dedup();
220 if all_cids.is_empty() {
221 return Ok(HashMap::new());
222 }
223 let fetched = state.block_store.get_many(&all_cids).await?;
224 let mut blocks_map = HashMap::new();
225 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
226 if let Some(data) = data_opt {
227 blocks_map.insert(cid, data);
228 }
229 }
230 Ok(blocks_map)
231}
232
233fn format_sync_event_with_prefetched(
234 event: &SequencedEvent,
235 prefetched: &HashMap<Cid, Bytes>,
236) -> Result<Vec<u8>, anyhow::Error> {
237 let commit_cid_str = event
238 .commit_cid
239 .as_ref()
240 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
241 let commit_cid = Cid::from_str(commit_cid_str)?;
242 let commit_bytes = prefetched
243 .get(&commit_cid)
244 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
245 let rev = extract_rev_from_commit_bytes(commit_bytes)
246 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
247 let car_bytes = futures::executor::block_on(write_car_blocks(
248 commit_cid,
249 Some(commit_bytes.clone()),
250 BTreeMap::new(),
251 ))?;
252 let frame = SyncFrame {
253 did: event.did.clone(),
254 rev,
255 blocks: car_bytes,
256 seq: event.seq,
257 time: format_atproto_time(event.created_at),
258 };
259 let header = FrameHeader {
260 op: 1,
261 t: "#sync".to_string(),
262 };
263 let mut bytes = Vec::new();
264 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
265 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
266 Ok(bytes)
267}
268
269pub async fn format_event_with_prefetched_blocks(
270 event: SequencedEvent,
271 prefetched: &HashMap<Cid, Bytes>,
272) -> Result<Vec<u8>, anyhow::Error> {
273 match event.event_type.as_str() {
274 "identity" => return format_identity_event(&event),
275 "account" => return format_account_event(&event),
276 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
277 _ => {}
278 }
279 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
280 let prev_cid_str = event.prev_cid.clone();
281 let prev_data_cid_str = event.prev_data_cid.clone();
282 let mut frame: CommitFrame = event
283 .try_into()
284 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
285 if let Some(ref pdc) = prev_data_cid_str
286 && let Ok(cid) = Cid::from_str(pdc)
287 {
288 frame.prev_data = Some(cid);
289 }
290 let commit_cid = frame.commit;
291 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
292 let mut all_cids: Vec<Cid> = block_cids_str
293 .iter()
294 .filter_map(|s| Cid::from_str(s).ok())
295 .filter(|c| Some(*c) != prev_cid)
296 .collect();
297 if !all_cids.contains(&commit_cid) {
298 all_cids.push(commit_cid);
299 }
300 if let Some(commit_bytes) = prefetched.get(&commit_cid)
301 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes)
302 {
303 frame.rev = rev;
304 }
305 if let Some(ref pc) = prev_cid
306 && let Some(prev_bytes) = prefetched.get(pc)
307 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes)
308 {
309 frame.since = Some(rev);
310 }
311 let car_bytes = if !all_cids.is_empty() {
312 let mut blocks = BTreeMap::new();
313 let mut commit_bytes_for_car: Option<Bytes> = None;
314 for cid in all_cids {
315 if let Some(data) = prefetched.get(&cid) {
316 if cid == commit_cid {
317 commit_bytes_for_car = Some(data.clone());
318 } else {
319 blocks.insert(cid, data.clone());
320 }
321 }
322 }
323 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
324 } else {
325 Vec::new()
326 };
327 frame.blocks = car_bytes;
328 let header = FrameHeader {
329 op: 1,
330 t: "#commit".to_string(),
331 };
332 let mut bytes = Vec::new();
333 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
334 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
335 Ok(bytes)
336}