this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame};
4use bytes::Bytes;
5use cid::Cid;
6use iroh_car::{CarHeader, CarWriter};
7use jacquard_repo::commit::Commit;
8use jacquard_repo::storage::BlockStore;
9use std::collections::{BTreeMap, HashMap};
10use std::io::Cursor;
11use std::str::FromStr;
12use tokio::io::AsyncWriteExt;
13
14fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
15 Commit::from_cbor(commit_bytes)
16 .ok()
17 .map(|c| c.rev().to_string())
18}
19
20async fn write_car_blocks(
21 commit_cid: Cid,
22 commit_bytes: Option<Bytes>,
23 other_blocks: BTreeMap<Cid, Bytes>,
24) -> Result<Vec<u8>, anyhow::Error> {
25 let mut buffer = Cursor::new(Vec::new());
26 let header = CarHeader::new_v1(vec![commit_cid]);
27 let mut writer = CarWriter::new(header, &mut buffer);
28 for (cid, data) in other_blocks {
29 if cid != commit_cid {
30 writer
31 .write(cid, data.as_ref())
32 .await
33 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
34 }
35 }
36 if let Some(data) = commit_bytes {
37 writer
38 .write(commit_cid, data.as_ref())
39 .await
40 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
41 }
42 writer
43 .finish()
44 .await
45 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
46 buffer
47 .flush()
48 .await
49 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
50 Ok(buffer.into_inner())
51}
52
53fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
54 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
55}
56
57fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
58 let frame = IdentityFrame {
59 did: event.did.clone(),
60 handle: event.handle.clone(),
61 seq: event.seq,
62 time: format_atproto_time(event.created_at),
63 };
64 let header = FrameHeader {
65 op: 1,
66 t: "#identity".to_string(),
67 };
68 let mut bytes = Vec::new();
69 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
70 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
71 Ok(bytes)
72}
73
74fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
75 let frame = AccountFrame {
76 did: event.did.clone(),
77 active: event.active.unwrap_or(true),
78 status: event.status.clone(),
79 seq: event.seq,
80 time: format_atproto_time(event.created_at),
81 };
82 let header = FrameHeader {
83 op: 1,
84 t: "#account".to_string(),
85 };
86 let mut bytes = Vec::new();
87 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
88 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
89 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
90 tracing::info!(
91 did = %frame.did,
92 active = frame.active,
93 status = ?frame.status,
94 cbor_len = bytes.len(),
95 cbor_hex = %hex_str,
96 "Sending account event to firehose"
97 );
98 Ok(bytes)
99}
100
101async fn format_sync_event(
102 state: &AppState,
103 event: &SequencedEvent,
104) -> Result<Vec<u8>, anyhow::Error> {
105 let commit_cid_str = event
106 .commit_cid
107 .as_ref()
108 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
109 let commit_cid = Cid::from_str(commit_cid_str)?;
110 let commit_bytes = state
111 .block_store
112 .get(&commit_cid)
113 .await?
114 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
115 let rev = extract_rev_from_commit_bytes(&commit_bytes)
116 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
117 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
118 let frame = SyncFrame {
119 did: event.did.clone(),
120 rev,
121 blocks: car_bytes,
122 seq: event.seq,
123 time: format_atproto_time(event.created_at),
124 };
125 let header = FrameHeader {
126 op: 1,
127 t: "#sync".to_string(),
128 };
129 let mut bytes = Vec::new();
130 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
131 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
132 Ok(bytes)
133}
134
135pub async fn format_event_for_sending(
136 state: &AppState,
137 event: SequencedEvent,
138) -> Result<Vec<u8>, anyhow::Error> {
139 match event.event_type.as_str() {
140 "identity" => return format_identity_event(&event),
141 "account" => return format_account_event(&event),
142 "sync" => return format_sync_event(state, &event).await,
143 _ => {}
144 }
145 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
146 let prev_cid_str = event.prev_cid.clone();
147 let prev_data_cid_str = event.prev_data_cid.clone();
148 let mut frame: CommitFrame = event
149 .try_into()
150 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
151 if let Some(ref pdc) = prev_data_cid_str
152 && let Ok(cid) = Cid::from_str(pdc)
153 {
154 frame.prev_data = Some(cid);
155 }
156 let commit_cid = frame.commit;
157 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
158 let mut all_cids: Vec<Cid> = block_cids_str
159 .iter()
160 .filter_map(|s| Cid::from_str(s).ok())
161 .filter(|c| Some(*c) != prev_cid)
162 .collect();
163 if !all_cids.contains(&commit_cid) {
164 all_cids.push(commit_cid);
165 }
166 if let Some(ref pc) = prev_cid
167 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
168 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes)
169 {
170 frame.since = Some(rev);
171 }
172 let car_bytes = if !all_cids.is_empty() {
173 let fetched = state.block_store.get_many(&all_cids).await?;
174 let mut blocks = std::collections::BTreeMap::new();
175 let mut commit_bytes: Option<Bytes> = None;
176 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
177 if let Some(data) = data_opt {
178 if *cid == commit_cid {
179 commit_bytes = Some(data.clone());
180 if let Some(rev) = extract_rev_from_commit_bytes(data) {
181 frame.rev = rev;
182 }
183 } else {
184 blocks.insert(*cid, data.clone());
185 }
186 }
187 }
188 write_car_blocks(commit_cid, commit_bytes, blocks).await?
189 } else {
190 Vec::new()
191 };
192 frame.blocks = car_bytes;
193 let header = FrameHeader {
194 op: 1,
195 t: "#commit".to_string(),
196 };
197 let mut bytes = Vec::new();
198 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
199 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
200 Ok(bytes)
201}
202
203pub async fn prefetch_blocks_for_events(
204 state: &AppState,
205 events: &[SequencedEvent],
206) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
207 let mut all_cids: Vec<Cid> = Vec::new();
208 for event in events {
209 if let Some(ref commit_cid_str) = event.commit_cid
210 && let Ok(cid) = Cid::from_str(commit_cid_str)
211 {
212 all_cids.push(cid);
213 }
214 if let Some(ref prev_cid_str) = event.prev_cid
215 && let Ok(cid) = Cid::from_str(prev_cid_str)
216 {
217 all_cids.push(cid);
218 }
219 if let Some(ref block_cids_str) = event.blocks_cids {
220 for s in block_cids_str {
221 if let Ok(cid) = Cid::from_str(s) {
222 all_cids.push(cid);
223 }
224 }
225 }
226 }
227 all_cids.sort();
228 all_cids.dedup();
229 if all_cids.is_empty() {
230 return Ok(HashMap::new());
231 }
232 let fetched = state.block_store.get_many(&all_cids).await?;
233 let mut blocks_map = HashMap::new();
234 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
235 if let Some(data) = data_opt {
236 blocks_map.insert(cid, data);
237 }
238 }
239 Ok(blocks_map)
240}
241
242fn format_sync_event_with_prefetched(
243 event: &SequencedEvent,
244 prefetched: &HashMap<Cid, Bytes>,
245) -> Result<Vec<u8>, anyhow::Error> {
246 let commit_cid_str = event
247 .commit_cid
248 .as_ref()
249 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
250 let commit_cid = Cid::from_str(commit_cid_str)?;
251 let commit_bytes = prefetched
252 .get(&commit_cid)
253 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
254 let rev = extract_rev_from_commit_bytes(commit_bytes)
255 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
256 let car_bytes = futures::executor::block_on(write_car_blocks(
257 commit_cid,
258 Some(commit_bytes.clone()),
259 BTreeMap::new(),
260 ))?;
261 let frame = SyncFrame {
262 did: event.did.clone(),
263 rev,
264 blocks: car_bytes,
265 seq: event.seq,
266 time: format_atproto_time(event.created_at),
267 };
268 let header = FrameHeader {
269 op: 1,
270 t: "#sync".to_string(),
271 };
272 let mut bytes = Vec::new();
273 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
274 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
275 Ok(bytes)
276}
277
278pub async fn format_event_with_prefetched_blocks(
279 event: SequencedEvent,
280 prefetched: &HashMap<Cid, Bytes>,
281) -> Result<Vec<u8>, anyhow::Error> {
282 match event.event_type.as_str() {
283 "identity" => return format_identity_event(&event),
284 "account" => return format_account_event(&event),
285 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
286 _ => {}
287 }
288 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
289 let prev_cid_str = event.prev_cid.clone();
290 let prev_data_cid_str = event.prev_data_cid.clone();
291 let mut frame: CommitFrame = event
292 .try_into()
293 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
294 if let Some(ref pdc) = prev_data_cid_str
295 && let Ok(cid) = Cid::from_str(pdc)
296 {
297 frame.prev_data = Some(cid);
298 }
299 let commit_cid = frame.commit;
300 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
301 let mut all_cids: Vec<Cid> = block_cids_str
302 .iter()
303 .filter_map(|s| Cid::from_str(s).ok())
304 .filter(|c| Some(*c) != prev_cid)
305 .collect();
306 if !all_cids.contains(&commit_cid) {
307 all_cids.push(commit_cid);
308 }
309 if let Some(commit_bytes) = prefetched.get(&commit_cid)
310 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes)
311 {
312 frame.rev = rev;
313 }
314 if let Some(ref pc) = prev_cid
315 && let Some(prev_bytes) = prefetched.get(pc)
316 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes)
317 {
318 frame.since = Some(rev);
319 }
320 let car_bytes = if !all_cids.is_empty() {
321 let mut blocks = BTreeMap::new();
322 let mut commit_bytes_for_car: Option<Bytes> = None;
323 for cid in all_cids {
324 if let Some(data) = prefetched.get(&cid) {
325 if cid == commit_cid {
326 commit_bytes_for_car = Some(data.clone());
327 } else {
328 blocks.insert(cid, data.clone());
329 }
330 }
331 }
332 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
333 } else {
334 Vec::new()
335 };
336 frame.blocks = car_bytes;
337 let header = FrameHeader {
338 op: 1,
339 t: "#commit".to_string(),
340 };
341 let mut bytes = Vec::new();
342 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
343 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
344 Ok(bytes)
345}