this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame};
4use bytes::Bytes;
5use cid::Cid;
6use iroh_car::{CarHeader, CarWriter};
7use jacquard_repo::commit::Commit;
8use jacquard_repo::storage::BlockStore;
9use std::collections::{BTreeMap, HashMap};
10use std::io::Cursor;
11use std::str::FromStr;
12use tokio::io::AsyncWriteExt;
13
14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
15 Commit::from_cbor(commit_bytes)
16 .ok()
17 .map(|c| c.rev().to_string())
18}
19
20async fn write_car_blocks(
21 commit_cid: Cid,
22 commit_bytes: Option<Bytes>,
23 other_blocks: BTreeMap<Cid, Bytes>,
24) -> Result<Vec<u8>, anyhow::Error> {
25 let mut buffer = Cursor::new(Vec::new());
26 let header = CarHeader::new_v1(vec![commit_cid]);
27 let mut writer = CarWriter::new(header, &mut buffer);
28 for (cid, data) in other_blocks {
29 if cid != commit_cid {
30 writer
31 .write(cid, data.as_ref())
32 .await
33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
34 }
35 }
36 if let Some(data) = commit_bytes {
37 writer
38 .write(commit_cid, data.as_ref())
39 .await
40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
41 }
42 writer
43 .finish()
44 .await
45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
46 buffer
47 .flush()
48 .await
49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
50 Ok(buffer.into_inner())
51}
52
53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
55}
56
57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
58 let frame = IdentityFrame {
59 did: event.did.clone(),
60 handle: event.handle.clone(),
61 seq: event.seq,
62 time: format_atproto_time(event.created_at),
63 };
64 let header = FrameHeader {
65 op: 1,
66 t: "#identity".to_string(),
67 };
68 let mut bytes = Vec::new();
69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
71 Ok(bytes)
72}
73
74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
75 let frame = AccountFrame {
76 did: event.did.clone(),
77 active: event.active.unwrap_or(true),
78 status: event.status.clone(),
79 seq: event.seq,
80 time: format_atproto_time(event.created_at),
81 };
82 let header = FrameHeader {
83 op: 1,
84 t: "#account".to_string(),
85 };
86 let mut bytes = Vec::new();
87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
89 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
90 tracing::info!(
91 did = %frame.did,
92 active = frame.active,
93 status = ?frame.status,
94 cbor_len = bytes.len(),
95 cbor_hex = %hex_str,
96 "Sending account event to firehose"
97 );
98 Ok(bytes)
99}
100
101async fn format_sync_event(
102 state: &AppState,
103 event: &SequencedEvent,
104) -> Result<Vec<u8>, anyhow::Error> {
105 let commit_cid_str = event
106 .commit_cid
107 .as_ref()
108 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
109 let commit_cid = Cid::from_str(commit_cid_str)?;
110 let commit_bytes = state
111 .block_store
112 .get(&commit_cid)
113 .await?
114 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
115 let rev = if let Some(ref stored_rev) = event.rev {
116 stored_rev.clone()
117 } else {
118 extract_rev_from_commit_bytes(&commit_bytes)
119 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
120 };
121 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
122 let frame = SyncFrame {
123 did: event.did.clone(),
124 rev,
125 blocks: car_bytes,
126 seq: event.seq,
127 time: format_atproto_time(event.created_at),
128 };
129 let header = FrameHeader {
130 op: 1,
131 t: "#sync".to_string(),
132 };
133 let mut bytes = Vec::new();
134 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
135 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
136 Ok(bytes)
137}
138
139pub async fn format_event_for_sending(
140 state: &AppState,
141 event: SequencedEvent,
142) -> Result<Vec<u8>, anyhow::Error> {
143 match event.event_type.as_str() {
144 "identity" => return format_identity_event(&event),
145 "account" => return format_account_event(&event),
146 "sync" => return format_sync_event(state, &event).await,
147 _ => {}
148 }
149 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
150 let prev_cid_str = event.prev_cid.clone();
151 let prev_data_cid_str = event.prev_data_cid.clone();
152 let mut frame: CommitFrame = event
153 .try_into()
154 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
155 if let Some(ref pdc) = prev_data_cid_str
156 && let Ok(cid) = Cid::from_str(pdc)
157 {
158 frame.prev_data = Some(cid);
159 }
160 let commit_cid = frame.commit;
161 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
162 let mut all_cids: Vec<Cid> = block_cids_str
163 .iter()
164 .filter_map(|s| Cid::from_str(s).ok())
165 .filter(|c| Some(*c) != prev_cid)
166 .collect();
167 if !all_cids.contains(&commit_cid) {
168 all_cids.push(commit_cid);
169 }
170 if let Some(ref pc) = prev_cid
171 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
172 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes)
173 {
174 frame.since = Some(rev);
175 }
176 let car_bytes = if !all_cids.is_empty() {
177 let fetched = state.block_store.get_many(&all_cids).await?;
178 let mut blocks = std::collections::BTreeMap::new();
179 let mut commit_bytes: Option<Bytes> = None;
180 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
181 if let Some(data) = data_opt {
182 if *cid == commit_cid {
183 commit_bytes = Some(data.clone());
184 if let Some(rev) = extract_rev_from_commit_bytes(data) {
185 frame.rev = rev;
186 }
187 } else {
188 blocks.insert(*cid, data.clone());
189 }
190 }
191 }
192 write_car_blocks(commit_cid, commit_bytes, blocks).await?
193 } else {
194 Vec::new()
195 };
196 frame.blocks = car_bytes;
197 let header = FrameHeader {
198 op: 1,
199 t: "#commit".to_string(),
200 };
201 let mut bytes = Vec::new();
202 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
203 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
204 Ok(bytes)
205}
206
207pub async fn prefetch_blocks_for_events(
208 state: &AppState,
209 events: &[SequencedEvent],
210) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
211 let mut all_cids: Vec<Cid> = Vec::new();
212 for event in events {
213 if let Some(ref commit_cid_str) = event.commit_cid
214 && let Ok(cid) = Cid::from_str(commit_cid_str)
215 {
216 all_cids.push(cid);
217 }
218 if let Some(ref prev_cid_str) = event.prev_cid
219 && let Ok(cid) = Cid::from_str(prev_cid_str)
220 {
221 all_cids.push(cid);
222 }
223 if let Some(ref block_cids_str) = event.blocks_cids {
224 for s in block_cids_str {
225 if let Ok(cid) = Cid::from_str(s) {
226 all_cids.push(cid);
227 }
228 }
229 }
230 }
231 all_cids.sort();
232 all_cids.dedup();
233 if all_cids.is_empty() {
234 return Ok(HashMap::new());
235 }
236 let fetched = state.block_store.get_many(&all_cids).await?;
237 let mut blocks_map = HashMap::new();
238 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
239 if let Some(data) = data_opt {
240 blocks_map.insert(cid, data);
241 }
242 }
243 Ok(blocks_map)
244}
245
246fn format_sync_event_with_prefetched(
247 event: &SequencedEvent,
248 prefetched: &HashMap<Cid, Bytes>,
249) -> Result<Vec<u8>, anyhow::Error> {
250 let commit_cid_str = event
251 .commit_cid
252 .as_ref()
253 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
254 let commit_cid = Cid::from_str(commit_cid_str)?;
255 let commit_bytes = prefetched
256 .get(&commit_cid)
257 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
258 let rev = if let Some(ref stored_rev) = event.rev {
259 stored_rev.clone()
260 } else {
261 extract_rev_from_commit_bytes(commit_bytes)
262 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
263 };
264 let car_bytes = futures::executor::block_on(write_car_blocks(
265 commit_cid,
266 Some(commit_bytes.clone()),
267 BTreeMap::new(),
268 ))?;
269 let frame = SyncFrame {
270 did: event.did.clone(),
271 rev,
272 blocks: car_bytes,
273 seq: event.seq,
274 time: format_atproto_time(event.created_at),
275 };
276 let header = FrameHeader {
277 op: 1,
278 t: "#sync".to_string(),
279 };
280 let mut bytes = Vec::new();
281 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
282 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
283 Ok(bytes)
284}
285
286pub async fn format_event_with_prefetched_blocks(
287 event: SequencedEvent,
288 prefetched: &HashMap<Cid, Bytes>,
289) -> Result<Vec<u8>, anyhow::Error> {
290 match event.event_type.as_str() {
291 "identity" => return format_identity_event(&event),
292 "account" => return format_account_event(&event),
293 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
294 _ => {}
295 }
296 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
297 let prev_cid_str = event.prev_cid.clone();
298 let prev_data_cid_str = event.prev_data_cid.clone();
299 let mut frame: CommitFrame = event
300 .try_into()
301 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
302 if let Some(ref pdc) = prev_data_cid_str
303 && let Ok(cid) = Cid::from_str(pdc)
304 {
305 frame.prev_data = Some(cid);
306 }
307 let commit_cid = frame.commit;
308 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
309 let mut all_cids: Vec<Cid> = block_cids_str
310 .iter()
311 .filter_map(|s| Cid::from_str(s).ok())
312 .filter(|c| Some(*c) != prev_cid)
313 .collect();
314 if !all_cids.contains(&commit_cid) {
315 all_cids.push(commit_cid);
316 }
317 if let Some(commit_bytes) = prefetched.get(&commit_cid)
318 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes)
319 {
320 frame.rev = rev;
321 }
322 if let Some(ref pc) = prev_cid
323 && let Some(prev_bytes) = prefetched.get(pc)
324 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes)
325 {
326 frame.since = Some(rev);
327 }
328 let car_bytes = if !all_cids.is_empty() {
329 let mut blocks = BTreeMap::new();
330 let mut commit_bytes_for_car: Option<Bytes> = None;
331 for cid in all_cids {
332 if let Some(data) = prefetched.get(&cid) {
333 if cid == commit_cid {
334 commit_bytes_for_car = Some(data.clone());
335 } else {
336 blocks.insert(cid, data.clone());
337 }
338 }
339 }
340 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
341 } else {
342 Vec::new()
343 };
344 frame.blocks = car_bytes;
345 let header = FrameHeader {
346 op: 1,
347 t: "#commit".to_string(),
348 };
349 let mut bytes = Vec::new();
350 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
351 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
352 Ok(bytes)
353}