this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::frame::{
4 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame,
5 InfoFrame, SyncFrame,
6};
7use axum::Json;
8use axum::http::StatusCode;
9use axum::response::{IntoResponse, Response};
10use bytes::Bytes;
11use cid::Cid;
12use iroh_car::{CarHeader, CarWriter};
13use jacquard_repo::commit::Commit;
14use jacquard_repo::storage::BlockStore;
15use serde::Serialize;
16use serde_json::json;
17use sqlx::PgPool;
18use std::collections::{BTreeMap, HashMap};
19use std::io::Cursor;
20use std::str::FromStr;
21use tokio::io::AsyncWriteExt;
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
24#[serde(rename_all = "lowercase")]
25pub enum AccountStatus {
26 Active,
27 Takendown,
28 Suspended,
29 Deactivated,
30 Deleted,
31}
32
33impl AccountStatus {
34 pub fn as_str(&self) -> Option<&'static str> {
35 match self {
36 AccountStatus::Active => None,
37 AccountStatus::Takendown => Some("takendown"),
38 AccountStatus::Suspended => Some("suspended"),
39 AccountStatus::Deactivated => Some("deactivated"),
40 AccountStatus::Deleted => Some("deleted"),
41 }
42 }
43
44 pub fn is_active(&self) -> bool {
45 matches!(self, AccountStatus::Active)
46 }
47}
48
49pub struct RepoAccount {
50 pub did: String,
51 pub user_id: uuid::Uuid,
52 pub status: AccountStatus,
53 pub repo_root_cid: Option<String>,
54}
55
56pub enum RepoAvailabilityError {
57 NotFound(String),
58 Takendown(String),
59 Deactivated(String),
60 Internal(String),
61}
62
63impl IntoResponse for RepoAvailabilityError {
64 fn into_response(self) -> Response {
65 match self {
66 RepoAvailabilityError::NotFound(did) => (
67 StatusCode::BAD_REQUEST,
68 Json(json!({
69 "error": "RepoNotFound",
70 "message": format!("Could not find repo for DID: {}", did)
71 })),
72 )
73 .into_response(),
74 RepoAvailabilityError::Takendown(did) => (
75 StatusCode::BAD_REQUEST,
76 Json(json!({
77 "error": "RepoTakendown",
78 "message": format!("Repo has been takendown: {}", did)
79 })),
80 )
81 .into_response(),
82 RepoAvailabilityError::Deactivated(did) => (
83 StatusCode::BAD_REQUEST,
84 Json(json!({
85 "error": "RepoDeactivated",
86 "message": format!("Repo has been deactivated: {}", did)
87 })),
88 )
89 .into_response(),
90 RepoAvailabilityError::Internal(msg) => (
91 StatusCode::INTERNAL_SERVER_ERROR,
92 Json(json!({
93 "error": "InternalError",
94 "message": msg
95 })),
96 )
97 .into_response(),
98 }
99 }
100}
101
102pub async fn get_account_with_status(
103 db: &PgPool,
104 did: &str,
105) -> Result<Option<RepoAccount>, sqlx::Error> {
106 let row = sqlx::query!(
107 r#"
108 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid
109 FROM users u
110 LEFT JOIN repos r ON r.user_id = u.id
111 WHERE u.did = $1
112 "#,
113 did
114 )
115 .fetch_optional(db)
116 .await?;
117
118 Ok(row.map(|r| {
119 let status = if r.takedown_ref.is_some() {
120 AccountStatus::Takendown
121 } else if r.deactivated_at.is_some() {
122 AccountStatus::Deactivated
123 } else {
124 AccountStatus::Active
125 };
126
127 RepoAccount {
128 did: r.did,
129 user_id: r.id,
130 status,
131 repo_root_cid: Some(r.repo_root_cid),
132 }
133 }))
134}
135
136pub async fn assert_repo_availability(
137 db: &PgPool,
138 did: &str,
139 is_admin_or_self: bool,
140) -> Result<RepoAccount, RepoAvailabilityError> {
141 let account = get_account_with_status(db, did)
142 .await
143 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?;
144
145 let account = match account {
146 Some(a) => a,
147 None => return Err(RepoAvailabilityError::NotFound(did.to_string())),
148 };
149
150 if is_admin_or_self {
151 return Ok(account);
152 }
153
154 match account.status {
155 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())),
156 AccountStatus::Deactivated => {
157 return Err(RepoAvailabilityError::Deactivated(did.to_string()));
158 }
159 _ => {}
160 }
161
162 Ok(account)
163}
164
165fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
166 Commit::from_cbor(commit_bytes)
167 .ok()
168 .map(|c| c.rev().to_string())
169}
170
171async fn write_car_blocks(
172 commit_cid: Cid,
173 commit_bytes: Option<Bytes>,
174 other_blocks: BTreeMap<Cid, Bytes>,
175) -> Result<Vec<u8>, anyhow::Error> {
176 let mut buffer = Cursor::new(Vec::new());
177 let header = CarHeader::new_v1(vec![commit_cid]);
178 let mut writer = CarWriter::new(header, &mut buffer);
179 for (cid, data) in other_blocks {
180 if cid != commit_cid {
181 writer
182 .write(cid, data.as_ref())
183 .await
184 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
185 }
186 }
187 if let Some(data) = commit_bytes {
188 writer
189 .write(commit_cid, data.as_ref())
190 .await
191 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
192 }
193 writer
194 .finish()
195 .await
196 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
197 buffer
198 .flush()
199 .await
200 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
201 Ok(buffer.into_inner())
202}
203
204fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
205 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
206}
207
208fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
209 let frame = IdentityFrame {
210 did: event.did.clone(),
211 handle: event.handle.clone(),
212 seq: event.seq,
213 time: format_atproto_time(event.created_at),
214 };
215 let header = FrameHeader {
216 op: 1,
217 t: "#identity".to_string(),
218 };
219 let mut bytes = Vec::with_capacity(256);
220 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
221 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
222 Ok(bytes)
223}
224
225fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
226 let frame = AccountFrame {
227 did: event.did.clone(),
228 active: event.active.unwrap_or(true),
229 status: event.status.clone(),
230 seq: event.seq,
231 time: format_atproto_time(event.created_at),
232 };
233 let header = FrameHeader {
234 op: 1,
235 t: "#account".to_string(),
236 };
237 let mut bytes = Vec::with_capacity(256);
238 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
239 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
240 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
241 tracing::info!(
242 did = %frame.did,
243 active = frame.active,
244 status = ?frame.status,
245 cbor_len = bytes.len(),
246 cbor_hex = %hex_str,
247 "Sending account event to firehose"
248 );
249 Ok(bytes)
250}
251
252async fn format_sync_event(
253 state: &AppState,
254 event: &SequencedEvent,
255) -> Result<Vec<u8>, anyhow::Error> {
256 let commit_cid_str = event
257 .commit_cid
258 .as_ref()
259 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
260 let commit_cid = Cid::from_str(commit_cid_str)?;
261 let commit_bytes = state
262 .block_store
263 .get(&commit_cid)
264 .await?
265 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
266 let rev = if let Some(ref stored_rev) = event.rev {
267 stored_rev.clone()
268 } else {
269 extract_rev_from_commit_bytes(&commit_bytes)
270 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
271 };
272 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
273 let frame = SyncFrame {
274 did: event.did.clone(),
275 rev,
276 blocks: car_bytes,
277 seq: event.seq,
278 time: format_atproto_time(event.created_at),
279 };
280 let header = FrameHeader {
281 op: 1,
282 t: "#sync".to_string(),
283 };
284 let mut bytes = Vec::with_capacity(512);
285 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
286 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
287 Ok(bytes)
288}
289
290pub async fn format_event_for_sending(
291 state: &AppState,
292 event: SequencedEvent,
293) -> Result<Vec<u8>, anyhow::Error> {
294 match event.event_type.as_str() {
295 "identity" => return format_identity_event(&event),
296 "account" => return format_account_event(&event),
297 "sync" => return format_sync_event(state, &event).await,
298 _ => {}
299 }
300 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
301 let prev_cid_str = event.prev_cid.clone();
302 let prev_data_cid_str = event.prev_data_cid.clone();
303 let mut frame: CommitFrame = event
304 .try_into()
305 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
306 if let Some(ref pdc) = prev_data_cid_str
307 && let Ok(cid) = Cid::from_str(pdc)
308 {
309 frame.prev_data = Some(cid);
310 }
311 let commit_cid = frame.commit;
312 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
313 let mut all_cids: Vec<Cid> = block_cids_str
314 .iter()
315 .filter_map(|s| Cid::from_str(s).ok())
316 .filter(|c| Some(*c) != prev_cid)
317 .collect();
318 if !all_cids.contains(&commit_cid) {
319 all_cids.push(commit_cid);
320 }
321 if let Some(ref pc) = prev_cid
322 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
323 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes)
324 {
325 frame.since = Some(rev);
326 }
327 let car_bytes = if !all_cids.is_empty() {
328 let fetched = state.block_store.get_many(&all_cids).await?;
329 let mut blocks = std::collections::BTreeMap::new();
330 let mut commit_bytes: Option<Bytes> = None;
331 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
332 if let Some(data) = data_opt {
333 if *cid == commit_cid {
334 commit_bytes = Some(data.clone());
335 if let Some(rev) = extract_rev_from_commit_bytes(data) {
336 frame.rev = rev;
337 }
338 } else {
339 blocks.insert(*cid, data.clone());
340 }
341 }
342 }
343 write_car_blocks(commit_cid, commit_bytes, blocks).await?
344 } else {
345 Vec::new()
346 };
347 frame.blocks = car_bytes;
348 let header = FrameHeader {
349 op: 1,
350 t: "#commit".to_string(),
351 };
352 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
353 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
354 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
355 Ok(bytes)
356}
357
358pub async fn prefetch_blocks_for_events(
359 state: &AppState,
360 events: &[SequencedEvent],
361) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
362 let mut all_cids: Vec<Cid> = Vec::new();
363 for event in events {
364 if let Some(ref commit_cid_str) = event.commit_cid
365 && let Ok(cid) = Cid::from_str(commit_cid_str)
366 {
367 all_cids.push(cid);
368 }
369 if let Some(ref prev_cid_str) = event.prev_cid
370 && let Ok(cid) = Cid::from_str(prev_cid_str)
371 {
372 all_cids.push(cid);
373 }
374 if let Some(ref block_cids_str) = event.blocks_cids {
375 for s in block_cids_str {
376 if let Ok(cid) = Cid::from_str(s) {
377 all_cids.push(cid);
378 }
379 }
380 }
381 }
382 all_cids.sort();
383 all_cids.dedup();
384 if all_cids.is_empty() {
385 return Ok(HashMap::new());
386 }
387 let fetched = state.block_store.get_many(&all_cids).await?;
388 let mut blocks_map = HashMap::with_capacity(all_cids.len());
389 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
390 if let Some(data) = data_opt {
391 blocks_map.insert(cid, data);
392 }
393 }
394 Ok(blocks_map)
395}
396
397fn format_sync_event_with_prefetched(
398 event: &SequencedEvent,
399 prefetched: &HashMap<Cid, Bytes>,
400) -> Result<Vec<u8>, anyhow::Error> {
401 let commit_cid_str = event
402 .commit_cid
403 .as_ref()
404 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
405 let commit_cid = Cid::from_str(commit_cid_str)?;
406 let commit_bytes = prefetched
407 .get(&commit_cid)
408 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
409 let rev = if let Some(ref stored_rev) = event.rev {
410 stored_rev.clone()
411 } else {
412 extract_rev_from_commit_bytes(commit_bytes)
413 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
414 };
415 let car_bytes = futures::executor::block_on(write_car_blocks(
416 commit_cid,
417 Some(commit_bytes.clone()),
418 BTreeMap::new(),
419 ))?;
420 let frame = SyncFrame {
421 did: event.did.clone(),
422 rev,
423 blocks: car_bytes,
424 seq: event.seq,
425 time: format_atproto_time(event.created_at),
426 };
427 let header = FrameHeader {
428 op: 1,
429 t: "#sync".to_string(),
430 };
431 let mut bytes = Vec::new();
432 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
433 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
434 Ok(bytes)
435}
436
437pub async fn format_event_with_prefetched_blocks(
438 event: SequencedEvent,
439 prefetched: &HashMap<Cid, Bytes>,
440) -> Result<Vec<u8>, anyhow::Error> {
441 match event.event_type.as_str() {
442 "identity" => return format_identity_event(&event),
443 "account" => return format_account_event(&event),
444 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
445 _ => {}
446 }
447 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
448 let prev_cid_str = event.prev_cid.clone();
449 let prev_data_cid_str = event.prev_data_cid.clone();
450 let mut frame: CommitFrame = event
451 .try_into()
452 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
453 if let Some(ref pdc) = prev_data_cid_str
454 && let Ok(cid) = Cid::from_str(pdc)
455 {
456 frame.prev_data = Some(cid);
457 }
458 let commit_cid = frame.commit;
459 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
460 let mut all_cids: Vec<Cid> = block_cids_str
461 .iter()
462 .filter_map(|s| Cid::from_str(s).ok())
463 .filter(|c| Some(*c) != prev_cid)
464 .collect();
465 if !all_cids.contains(&commit_cid) {
466 all_cids.push(commit_cid);
467 }
468 if let Some(commit_bytes) = prefetched.get(&commit_cid)
469 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes)
470 {
471 frame.rev = rev;
472 }
473 if let Some(ref pc) = prev_cid
474 && let Some(prev_bytes) = prefetched.get(pc)
475 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes)
476 {
477 frame.since = Some(rev);
478 }
479 let car_bytes = if !all_cids.is_empty() {
480 let mut blocks = BTreeMap::new();
481 let mut commit_bytes_for_car: Option<Bytes> = None;
482 for cid in all_cids {
483 if let Some(data) = prefetched.get(&cid) {
484 if cid == commit_cid {
485 commit_bytes_for_car = Some(data.clone());
486 } else {
487 blocks.insert(cid, data.clone());
488 }
489 }
490 }
491 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
492 } else {
493 Vec::new()
494 };
495 frame.blocks = car_bytes;
496 let header = FrameHeader {
497 op: 1,
498 t: "#commit".to_string(),
499 };
500 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
501 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
502 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
503 Ok(bytes)
504}
505
506pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
507 let header = FrameHeader {
508 op: 1,
509 t: "#info".to_string(),
510 };
511 let frame = InfoFrame {
512 name: name.to_string(),
513 message: message.map(String::from),
514 };
515 let mut bytes = Vec::with_capacity(128);
516 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
517 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
518 Ok(bytes)
519}
520
521pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
522 let header = ErrorFrameHeader { op: -1 };
523 let frame = ErrorFrameBody {
524 error: error.to_string(),
525 message: message.map(String::from),
526 };
527 let mut bytes = Vec::with_capacity(128);
528 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
529 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
530 Ok(bytes)
531}