this repo has no description
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::sync::firehose::SequencedEvent;
4use crate::sync::frame::{
5 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame,
6 InfoFrame, SyncFrame,
7};
8use axum::response::{IntoResponse, Response};
9use bytes::Bytes;
10use cid::Cid;
11use iroh_car::{CarHeader, CarWriter};
12use jacquard_repo::commit::Commit;
13use jacquard_repo::storage::BlockStore;
14use serde::Serialize;
15use sqlx::PgPool;
16use std::collections::{BTreeMap, HashMap};
17use std::io::Cursor;
18use std::str::FromStr;
19use tokio::io::AsyncWriteExt;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
22#[serde(rename_all = "lowercase")]
23pub enum AccountStatus {
24 Active,
25 Takendown,
26 Suspended,
27 Deactivated,
28 Deleted,
29}
30
31impl AccountStatus {
32 pub fn as_str(&self) -> Option<&'static str> {
33 match self {
34 Self::Active => None,
35 Self::Takendown => Some("takendown"),
36 Self::Suspended => Some("suspended"),
37 Self::Deactivated => Some("deactivated"),
38 Self::Deleted => Some("deleted"),
39 }
40 }
41
42 pub fn is_active(&self) -> bool {
43 matches!(self, Self::Active)
44 }
45
46 pub fn is_takendown(&self) -> bool {
47 matches!(self, Self::Takendown)
48 }
49
50 pub fn is_suspended(&self) -> bool {
51 matches!(self, Self::Suspended)
52 }
53
54 pub fn is_deactivated(&self) -> bool {
55 matches!(self, Self::Deactivated)
56 }
57
58 pub fn is_deleted(&self) -> bool {
59 matches!(self, Self::Deleted)
60 }
61
62 pub fn allows_read(&self) -> bool {
63 matches!(self, Self::Active | Self::Deactivated)
64 }
65
66 pub fn allows_write(&self) -> bool {
67 matches!(self, Self::Active)
68 }
69
70 pub fn from_db_fields(takedown_ref: Option<&str>, deactivated_at: Option<chrono::DateTime<chrono::Utc>>) -> Self {
71 if takedown_ref.is_some() {
72 Self::Takendown
73 } else if deactivated_at.is_some() {
74 Self::Deactivated
75 } else {
76 Self::Active
77 }
78 }
79}
80
81impl From<crate::types::AccountState> for AccountStatus {
82 fn from(state: crate::types::AccountState) -> Self {
83 match state {
84 crate::types::AccountState::Active => AccountStatus::Active,
85 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated,
86 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown,
87 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated,
88 }
89 }
90}
91
92impl From<&crate::types::AccountState> for AccountStatus {
93 fn from(state: &crate::types::AccountState) -> Self {
94 match state {
95 crate::types::AccountState::Active => AccountStatus::Active,
96 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated,
97 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown,
98 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated,
99 }
100 }
101}
102
103pub struct RepoAccount {
104 pub did: String,
105 pub user_id: uuid::Uuid,
106 pub status: AccountStatus,
107 pub repo_root_cid: Option<String>,
108}
109
110pub enum RepoAvailabilityError {
111 NotFound(String),
112 Takendown(String),
113 Deactivated(String),
114 Internal(String),
115}
116
117impl IntoResponse for RepoAvailabilityError {
118 fn into_response(self) -> Response {
119 match self {
120 RepoAvailabilityError::NotFound(did) => {
121 ApiError::RepoNotFound(Some(format!("Could not find repo for DID: {}", did)))
122 .into_response()
123 }
124 RepoAvailabilityError::Takendown(_) => ApiError::RepoTakendown.into_response(),
125 RepoAvailabilityError::Deactivated(_) => ApiError::RepoDeactivated.into_response(),
126 RepoAvailabilityError::Internal(msg) => {
127 ApiError::InternalError(Some(msg)).into_response()
128 }
129 }
130 }
131}
132
133pub async fn get_account_with_status(
134 db: &PgPool,
135 did: &str,
136) -> Result<Option<RepoAccount>, sqlx::Error> {
137 let row = sqlx::query!(
138 r#"
139 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid
140 FROM users u
141 LEFT JOIN repos r ON r.user_id = u.id
142 WHERE u.did = $1
143 "#,
144 did
145 )
146 .fetch_optional(db)
147 .await?;
148
149 Ok(row.map(|r| {
150 let status = if r.takedown_ref.is_some() {
151 AccountStatus::Takendown
152 } else if r.deactivated_at.is_some() {
153 AccountStatus::Deactivated
154 } else {
155 AccountStatus::Active
156 };
157
158 RepoAccount {
159 did: r.did,
160 user_id: r.id,
161 status,
162 repo_root_cid: Some(r.repo_root_cid),
163 }
164 }))
165}
166
167pub async fn assert_repo_availability(
168 db: &PgPool,
169 did: &str,
170 is_admin_or_self: bool,
171) -> Result<RepoAccount, RepoAvailabilityError> {
172 let account = get_account_with_status(db, did)
173 .await
174 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?;
175
176 let account = match account {
177 Some(a) => a,
178 None => return Err(RepoAvailabilityError::NotFound(did.to_string())),
179 };
180
181 if is_admin_or_self {
182 return Ok(account);
183 }
184
185 match account.status {
186 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())),
187 AccountStatus::Deactivated => {
188 return Err(RepoAvailabilityError::Deactivated(did.to_string()));
189 }
190 _ => {}
191 }
192
193 Ok(account)
194}
195
196fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
197 Commit::from_cbor(commit_bytes)
198 .ok()
199 .map(|c| c.rev().to_string())
200}
201
202async fn write_car_blocks(
203 commit_cid: Cid,
204 commit_bytes: Option<Bytes>,
205 other_blocks: BTreeMap<Cid, Bytes>,
206) -> Result<Vec<u8>, anyhow::Error> {
207 let mut buffer = Cursor::new(Vec::new());
208 let header = CarHeader::new_v1(vec![commit_cid]);
209 let mut writer = CarWriter::new(header, &mut buffer);
210 for (cid, data) in other_blocks {
211 if cid != commit_cid {
212 writer
213 .write(cid, data.as_ref())
214 .await
215 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
216 }
217 }
218 if let Some(data) = commit_bytes {
219 writer
220 .write(commit_cid, data.as_ref())
221 .await
222 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
223 }
224 writer
225 .finish()
226 .await
227 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
228 buffer
229 .flush()
230 .await
231 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
232 Ok(buffer.into_inner())
233}
234
235fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
236 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
237}
238
239fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
240 let frame = IdentityFrame {
241 did: event.did.clone(),
242 handle: event.handle.clone(),
243 seq: event.seq,
244 time: format_atproto_time(event.created_at),
245 };
246 let header = FrameHeader {
247 op: 1,
248 t: "#identity".to_string(),
249 };
250 let mut bytes = Vec::with_capacity(256);
251 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
252 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
253 Ok(bytes)
254}
255
256fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
257 let frame = AccountFrame {
258 did: event.did.clone(),
259 active: event.active.unwrap_or(true),
260 status: event.status.clone(),
261 seq: event.seq,
262 time: format_atproto_time(event.created_at),
263 };
264 let header = FrameHeader {
265 op: 1,
266 t: "#account".to_string(),
267 };
268 let mut bytes = Vec::with_capacity(256);
269 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
270 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
271 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
272 tracing::info!(
273 did = %frame.did,
274 active = frame.active,
275 status = ?frame.status,
276 cbor_len = bytes.len(),
277 cbor_hex = %hex_str,
278 "Sending account event to firehose"
279 );
280 Ok(bytes)
281}
282
283async fn format_sync_event(
284 state: &AppState,
285 event: &SequencedEvent,
286) -> Result<Vec<u8>, anyhow::Error> {
287 let commit_cid_str = event
288 .commit_cid
289 .as_ref()
290 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
291 let commit_cid = Cid::from_str(commit_cid_str)?;
292 let commit_bytes = state
293 .block_store
294 .get(&commit_cid)
295 .await?
296 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
297 let rev = if let Some(ref stored_rev) = event.rev {
298 stored_rev.clone()
299 } else {
300 extract_rev_from_commit_bytes(&commit_bytes)
301 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
302 };
303 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
304 let frame = SyncFrame {
305 did: event.did.clone(),
306 rev,
307 blocks: car_bytes,
308 seq: event.seq,
309 time: format_atproto_time(event.created_at),
310 };
311 let header = FrameHeader {
312 op: 1,
313 t: "#sync".to_string(),
314 };
315 let mut bytes = Vec::with_capacity(512);
316 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
317 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
318 Ok(bytes)
319}
320
321pub async fn format_event_for_sending(
322 state: &AppState,
323 event: SequencedEvent,
324) -> Result<Vec<u8>, anyhow::Error> {
325 match event.event_type.as_str() {
326 "identity" => return format_identity_event(&event),
327 "account" => return format_account_event(&event),
328 "sync" => return format_sync_event(state, &event).await,
329 _ => {}
330 }
331 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
332 let prev_cid_str = event.prev_cid.clone();
333 let prev_data_cid_str = event.prev_data_cid.clone();
334 let mut frame: CommitFrame = event
335 .try_into()
336 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
337 if let Some(ref pdc) = prev_data_cid_str
338 && let Ok(cid) = Cid::from_str(pdc)
339 {
340 frame.prev_data = Some(cid);
341 }
342 let commit_cid = frame.commit;
343 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
344 let mut all_cids: Vec<Cid> = block_cids_str
345 .iter()
346 .filter_map(|s| Cid::from_str(s).ok())
347 .filter(|c| Some(*c) != prev_cid)
348 .collect();
349 if !all_cids.contains(&commit_cid) {
350 all_cids.push(commit_cid);
351 }
352 if let Some(ref pc) = prev_cid
353 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
354 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes)
355 {
356 frame.since = Some(rev);
357 }
358 let car_bytes = if !all_cids.is_empty() {
359 let fetched = state.block_store.get_many(&all_cids).await?;
360 let mut blocks = std::collections::BTreeMap::new();
361 let mut commit_bytes: Option<Bytes> = None;
362 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
363 if let Some(data) = data_opt {
364 if *cid == commit_cid {
365 commit_bytes = Some(data.clone());
366 if let Some(rev) = extract_rev_from_commit_bytes(data) {
367 frame.rev = rev;
368 }
369 } else {
370 blocks.insert(*cid, data.clone());
371 }
372 }
373 }
374 write_car_blocks(commit_cid, commit_bytes, blocks).await?
375 } else {
376 Vec::new()
377 };
378 frame.blocks = car_bytes;
379 let header = FrameHeader {
380 op: 1,
381 t: "#commit".to_string(),
382 };
383 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
384 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
385 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
386 Ok(bytes)
387}
388
389pub async fn prefetch_blocks_for_events(
390 state: &AppState,
391 events: &[SequencedEvent],
392) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
393 let mut all_cids: Vec<Cid> = Vec::new();
394 for event in events {
395 if let Some(ref commit_cid_str) = event.commit_cid
396 && let Ok(cid) = Cid::from_str(commit_cid_str)
397 {
398 all_cids.push(cid);
399 }
400 if let Some(ref prev_cid_str) = event.prev_cid
401 && let Ok(cid) = Cid::from_str(prev_cid_str)
402 {
403 all_cids.push(cid);
404 }
405 if let Some(ref block_cids_str) = event.blocks_cids {
406 for s in block_cids_str {
407 if let Ok(cid) = Cid::from_str(s) {
408 all_cids.push(cid);
409 }
410 }
411 }
412 }
413 all_cids.sort();
414 all_cids.dedup();
415 if all_cids.is_empty() {
416 return Ok(HashMap::new());
417 }
418 let fetched = state.block_store.get_many(&all_cids).await?;
419 let mut blocks_map = HashMap::with_capacity(all_cids.len());
420 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
421 if let Some(data) = data_opt {
422 blocks_map.insert(cid, data);
423 }
424 }
425 Ok(blocks_map)
426}
427
428fn format_sync_event_with_prefetched(
429 event: &SequencedEvent,
430 prefetched: &HashMap<Cid, Bytes>,
431) -> Result<Vec<u8>, anyhow::Error> {
432 let commit_cid_str = event
433 .commit_cid
434 .as_ref()
435 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
436 let commit_cid = Cid::from_str(commit_cid_str)?;
437 let commit_bytes = prefetched
438 .get(&commit_cid)
439 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
440 let rev = if let Some(ref stored_rev) = event.rev {
441 stored_rev.clone()
442 } else {
443 extract_rev_from_commit_bytes(commit_bytes)
444 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
445 };
446 let car_bytes = futures::executor::block_on(write_car_blocks(
447 commit_cid,
448 Some(commit_bytes.clone()),
449 BTreeMap::new(),
450 ))?;
451 let frame = SyncFrame {
452 did: event.did.clone(),
453 rev,
454 blocks: car_bytes,
455 seq: event.seq,
456 time: format_atproto_time(event.created_at),
457 };
458 let header = FrameHeader {
459 op: 1,
460 t: "#sync".to_string(),
461 };
462 let mut bytes = Vec::new();
463 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
464 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
465 Ok(bytes)
466}
467
468pub async fn format_event_with_prefetched_blocks(
469 event: SequencedEvent,
470 prefetched: &HashMap<Cid, Bytes>,
471) -> Result<Vec<u8>, anyhow::Error> {
472 match event.event_type.as_str() {
473 "identity" => return format_identity_event(&event),
474 "account" => return format_account_event(&event),
475 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
476 _ => {}
477 }
478 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
479 let prev_cid_str = event.prev_cid.clone();
480 let prev_data_cid_str = event.prev_data_cid.clone();
481 let mut frame: CommitFrame = event
482 .try_into()
483 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
484 if let Some(ref pdc) = prev_data_cid_str
485 && let Ok(cid) = Cid::from_str(pdc)
486 {
487 frame.prev_data = Some(cid);
488 }
489 let commit_cid = frame.commit;
490 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
491 let mut all_cids: Vec<Cid> = block_cids_str
492 .iter()
493 .filter_map(|s| Cid::from_str(s).ok())
494 .filter(|c| Some(*c) != prev_cid)
495 .collect();
496 if !all_cids.contains(&commit_cid) {
497 all_cids.push(commit_cid);
498 }
499 if let Some(commit_bytes) = prefetched.get(&commit_cid)
500 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes)
501 {
502 frame.rev = rev;
503 }
504 if let Some(ref pc) = prev_cid
505 && let Some(prev_bytes) = prefetched.get(pc)
506 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes)
507 {
508 frame.since = Some(rev);
509 }
510 let car_bytes = if !all_cids.is_empty() {
511 let mut blocks = BTreeMap::new();
512 let mut commit_bytes_for_car: Option<Bytes> = None;
513 for cid in all_cids {
514 if let Some(data) = prefetched.get(&cid) {
515 if cid == commit_cid {
516 commit_bytes_for_car = Some(data.clone());
517 } else {
518 blocks.insert(cid, data.clone());
519 }
520 }
521 }
522 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
523 } else {
524 Vec::new()
525 };
526 frame.blocks = car_bytes;
527 let header = FrameHeader {
528 op: 1,
529 t: "#commit".to_string(),
530 };
531 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
532 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
533 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
534 Ok(bytes)
535}
536
537pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
538 let header = FrameHeader {
539 op: 1,
540 t: "#info".to_string(),
541 };
542 let frame = InfoFrame {
543 name: name.to_string(),
544 message: message.map(String::from),
545 };
546 let mut bytes = Vec::with_capacity(128);
547 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
548 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
549 Ok(bytes)
550}
551
552pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
553 let header = ErrorFrameHeader { op: -1 };
554 let frame = ErrorFrameBody {
555 error: error.to_string(),
556 message: message.map(String::from),
557 };
558 let mut bytes = Vec::with_capacity(128);
559 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
560 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
561 Ok(bytes)
562}