this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame};
4use bytes::Bytes;
5use cid::Cid;
6use iroh_car::{CarHeader, CarWriter};
7use jacquard_repo::commit::Commit;
8use jacquard_repo::storage::BlockStore;
9use std::collections::{BTreeMap, HashMap};
10use std::io::Cursor;
11use std::str::FromStr;
12use tokio::io::AsyncWriteExt;
13fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
14 Commit::from_cbor(commit_bytes).ok().map(|c| c.rev().to_string())
15}
16async fn write_car_blocks(
17 commit_cid: Cid,
18 commit_bytes: Option<Bytes>,
19 other_blocks: BTreeMap<Cid, Bytes>,
20) -> Result<Vec<u8>, anyhow::Error> {
21 let mut buffer = Cursor::new(Vec::new());
22 let header = CarHeader::new_v1(vec![commit_cid]);
23 let mut writer = CarWriter::new(header, &mut buffer);
24 for (cid, data) in other_blocks {
25 if cid != commit_cid {
26 writer.write(cid, data.as_ref()).await
27 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
28 }
29 }
30 if let Some(data) = commit_bytes {
31 writer.write(commit_cid, data.as_ref()).await
32 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
33 }
34 writer.finish().await
35 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
36 buffer.flush().await
37 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
38 Ok(buffer.into_inner())
39}
40fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
41 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
42}
43fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
44 let frame = IdentityFrame {
45 did: event.did.clone(),
46 handle: event.handle.clone(),
47 seq: event.seq,
48 time: format_atproto_time(event.created_at),
49 };
50 let header = FrameHeader {
51 op: 1,
52 t: "#identity".to_string(),
53 };
54 let mut bytes = Vec::new();
55 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
56 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
57 Ok(bytes)
58}
59fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
60 let frame = AccountFrame {
61 did: event.did.clone(),
62 active: event.active.unwrap_or(true),
63 status: event.status.clone(),
64 seq: event.seq,
65 time: format_atproto_time(event.created_at),
66 };
67 let header = FrameHeader {
68 op: 1,
69 t: "#account".to_string(),
70 };
71 let mut bytes = Vec::new();
72 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
73 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
74 Ok(bytes)
75}
76async fn format_sync_event(
77 state: &AppState,
78 event: &SequencedEvent,
79) -> Result<Vec<u8>, anyhow::Error> {
80 let commit_cid_str = event.commit_cid.as_ref()
81 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
82 let commit_cid = Cid::from_str(commit_cid_str)?;
83 let commit_bytes = state.block_store.get(&commit_cid).await?
84 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
85 let rev = extract_rev_from_commit_bytes(&commit_bytes)
86 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
87 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
88 let frame = SyncFrame {
89 did: event.did.clone(),
90 rev,
91 blocks: car_bytes,
92 seq: event.seq,
93 time: format_atproto_time(event.created_at),
94 };
95 let header = FrameHeader {
96 op: 1,
97 t: "#sync".to_string(),
98 };
99 let mut bytes = Vec::new();
100 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
101 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
102 Ok(bytes)
103}
104pub async fn format_event_for_sending(
105 state: &AppState,
106 event: SequencedEvent,
107) -> Result<Vec<u8>, anyhow::Error> {
108 match event.event_type.as_str() {
109 "identity" => return format_identity_event(&event),
110 "account" => return format_account_event(&event),
111 "sync" => return format_sync_event(state, &event).await,
112 _ => {}
113 }
114 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
115 let prev_cid_str = event.prev_cid.clone();
116 let prev_data_cid_str = event.prev_data_cid.clone();
117 let mut frame: CommitFrame = event.try_into()
118 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
119 if let Some(ref pdc) = prev_data_cid_str {
120 if let Ok(cid) = Cid::from_str(pdc) {
121 frame.prev_data = Some(cid);
122 }
123 }
124 let commit_cid = frame.commit;
125 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
126 let mut all_cids: Vec<Cid> = block_cids_str
127 .iter()
128 .filter_map(|s| Cid::from_str(s).ok())
129 .filter(|c| Some(*c) != prev_cid)
130 .collect();
131 if !all_cids.contains(&commit_cid) {
132 all_cids.push(commit_cid);
133 }
134 if let Some(ref pc) = prev_cid {
135 if let Ok(Some(prev_bytes)) = state.block_store.get(pc).await {
136 if let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) {
137 frame.since = Some(rev);
138 }
139 }
140 }
141 let car_bytes = if !all_cids.is_empty() {
142 let fetched = state.block_store.get_many(&all_cids).await?;
143 let mut blocks = std::collections::BTreeMap::new();
144 let mut commit_bytes: Option<Bytes> = None;
145 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
146 if let Some(data) = data_opt {
147 if *cid == commit_cid {
148 commit_bytes = Some(data.clone());
149 if let Some(rev) = extract_rev_from_commit_bytes(data) {
150 frame.rev = rev;
151 }
152 } else {
153 blocks.insert(*cid, data.clone());
154 }
155 }
156 }
157 write_car_blocks(commit_cid, commit_bytes, blocks).await?
158 } else {
159 Vec::new()
160 };
161 frame.blocks = car_bytes;
162 let header = FrameHeader {
163 op: 1,
164 t: "#commit".to_string(),
165 };
166 let mut bytes = Vec::new();
167 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
168 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
169 Ok(bytes)
170}
171pub async fn prefetch_blocks_for_events(
172 state: &AppState,
173 events: &[SequencedEvent],
174) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
175 let mut all_cids: Vec<Cid> = Vec::new();
176 for event in events {
177 if let Some(ref commit_cid_str) = event.commit_cid {
178 if let Ok(cid) = Cid::from_str(commit_cid_str) {
179 all_cids.push(cid);
180 }
181 }
182 if let Some(ref prev_cid_str) = event.prev_cid {
183 if let Ok(cid) = Cid::from_str(prev_cid_str) {
184 all_cids.push(cid);
185 }
186 }
187 if let Some(ref block_cids_str) = event.blocks_cids {
188 for s in block_cids_str {
189 if let Ok(cid) = Cid::from_str(s) {
190 all_cids.push(cid);
191 }
192 }
193 }
194 }
195 all_cids.sort();
196 all_cids.dedup();
197 if all_cids.is_empty() {
198 return Ok(HashMap::new());
199 }
200 let fetched = state.block_store.get_many(&all_cids).await?;
201 let mut blocks_map = HashMap::new();
202 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
203 if let Some(data) = data_opt {
204 blocks_map.insert(cid, data);
205 }
206 }
207 Ok(blocks_map)
208}
209fn format_sync_event_with_prefetched(
210 event: &SequencedEvent,
211 prefetched: &HashMap<Cid, Bytes>,
212) -> Result<Vec<u8>, anyhow::Error> {
213 let commit_cid_str = event.commit_cid.as_ref()
214 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
215 let commit_cid = Cid::from_str(commit_cid_str)?;
216 let commit_bytes = prefetched.get(&commit_cid)
217 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
218 let rev = extract_rev_from_commit_bytes(commit_bytes)
219 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?;
220 let car_bytes = futures::executor::block_on(
221 write_car_blocks(commit_cid, Some(commit_bytes.clone()), BTreeMap::new())
222 )?;
223 let frame = SyncFrame {
224 did: event.did.clone(),
225 rev,
226 blocks: car_bytes,
227 seq: event.seq,
228 time: format_atproto_time(event.created_at),
229 };
230 let header = FrameHeader {
231 op: 1,
232 t: "#sync".to_string(),
233 };
234 let mut bytes = Vec::new();
235 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
236 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
237 Ok(bytes)
238}
239pub async fn format_event_with_prefetched_blocks(
240 event: SequencedEvent,
241 prefetched: &HashMap<Cid, Bytes>,
242) -> Result<Vec<u8>, anyhow::Error> {
243 match event.event_type.as_str() {
244 "identity" => return format_identity_event(&event),
245 "account" => return format_account_event(&event),
246 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
247 _ => {}
248 }
249 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
250 let prev_cid_str = event.prev_cid.clone();
251 let prev_data_cid_str = event.prev_data_cid.clone();
252 let mut frame: CommitFrame = event.try_into()
253 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
254 if let Some(ref pdc) = prev_data_cid_str {
255 if let Ok(cid) = Cid::from_str(pdc) {
256 frame.prev_data = Some(cid);
257 }
258 }
259 let commit_cid = frame.commit;
260 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
261 let mut all_cids: Vec<Cid> = block_cids_str
262 .iter()
263 .filter_map(|s| Cid::from_str(s).ok())
264 .filter(|c| Some(*c) != prev_cid)
265 .collect();
266 if !all_cids.contains(&commit_cid) {
267 all_cids.push(commit_cid);
268 }
269 if let Some(commit_bytes) = prefetched.get(&commit_cid) {
270 if let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) {
271 frame.rev = rev;
272 }
273 }
274 if let Some(ref pc) = prev_cid {
275 if let Some(prev_bytes) = prefetched.get(pc) {
276 if let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) {
277 frame.since = Some(rev);
278 }
279 }
280 }
281 let car_bytes = if !all_cids.is_empty() {
282 let mut blocks = BTreeMap::new();
283 let mut commit_bytes_for_car: Option<Bytes> = None;
284 for cid in all_cids {
285 if let Some(data) = prefetched.get(&cid) {
286 if cid == commit_cid {
287 commit_bytes_for_car = Some(data.clone());
288 } else {
289 blocks.insert(cid, data.clone());
290 }
291 }
292 }
293 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
294 } else {
295 Vec::new()
296 };
297 frame.blocks = car_bytes;
298 let header = FrameHeader {
299 op: 1,
300 t: "#commit".to_string(),
301 };
302 let mut bytes = Vec::new();
303 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
304 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
305 Ok(bytes)
306}