this repo has no description
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::sync::firehose::SequencedEvent;
4use crate::sync::frame::{
5 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame,
6 InfoFrame, SyncFrame,
7};
8use axum::response::{IntoResponse, Response};
9use bytes::Bytes;
10use cid::Cid;
11use iroh_car::{CarHeader, CarWriter};
12use jacquard_repo::commit::Commit;
13use jacquard_repo::storage::BlockStore;
14use serde::Serialize;
15use sqlx::PgPool;
16use std::collections::{BTreeMap, HashMap};
17use std::io::Cursor;
18use std::str::FromStr;
19use tokio::io::AsyncWriteExt;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
22#[serde(rename_all = "lowercase")]
23pub enum AccountStatus {
24 Active,
25 Takendown,
26 Suspended,
27 Deactivated,
28 Deleted,
29}
30
31impl AccountStatus {
32 pub fn as_str(&self) -> Option<&'static str> {
33 match self {
34 Self::Active => None,
35 Self::Takendown => Some("takendown"),
36 Self::Suspended => Some("suspended"),
37 Self::Deactivated => Some("deactivated"),
38 Self::Deleted => Some("deleted"),
39 }
40 }
41
42 pub fn is_active(&self) -> bool {
43 matches!(self, Self::Active)
44 }
45
46 pub fn is_takendown(&self) -> bool {
47 matches!(self, Self::Takendown)
48 }
49
50 pub fn is_suspended(&self) -> bool {
51 matches!(self, Self::Suspended)
52 }
53
54 pub fn is_deactivated(&self) -> bool {
55 matches!(self, Self::Deactivated)
56 }
57
58 pub fn is_deleted(&self) -> bool {
59 matches!(self, Self::Deleted)
60 }
61
62 pub fn allows_read(&self) -> bool {
63 matches!(self, Self::Active | Self::Deactivated)
64 }
65
66 pub fn allows_write(&self) -> bool {
67 matches!(self, Self::Active)
68 }
69
70 pub fn from_db_fields(
71 takedown_ref: Option<&str>,
72 deactivated_at: Option<chrono::DateTime<chrono::Utc>>,
73 ) -> Self {
74 if takedown_ref.is_some() {
75 Self::Takendown
76 } else if deactivated_at.is_some() {
77 Self::Deactivated
78 } else {
79 Self::Active
80 }
81 }
82}
83
84impl From<crate::types::AccountState> for AccountStatus {
85 fn from(state: crate::types::AccountState) -> Self {
86 match state {
87 crate::types::AccountState::Active => AccountStatus::Active,
88 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated,
89 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown,
90 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated,
91 }
92 }
93}
94
95impl From<&crate::types::AccountState> for AccountStatus {
96 fn from(state: &crate::types::AccountState) -> Self {
97 match state {
98 crate::types::AccountState::Active => AccountStatus::Active,
99 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated,
100 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown,
101 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated,
102 }
103 }
104}
105
106pub struct RepoAccount {
107 pub did: String,
108 pub user_id: uuid::Uuid,
109 pub status: AccountStatus,
110 pub repo_root_cid: Option<String>,
111}
112
113pub enum RepoAvailabilityError {
114 NotFound(String),
115 Takendown(String),
116 Deactivated(String),
117 Internal(String),
118}
119
120impl IntoResponse for RepoAvailabilityError {
121 fn into_response(self) -> Response {
122 match self {
123 RepoAvailabilityError::NotFound(did) => {
124 ApiError::RepoNotFound(Some(format!("Could not find repo for DID: {}", did)))
125 .into_response()
126 }
127 RepoAvailabilityError::Takendown(_) => ApiError::RepoTakendown.into_response(),
128 RepoAvailabilityError::Deactivated(_) => ApiError::RepoDeactivated.into_response(),
129 RepoAvailabilityError::Internal(msg) => {
130 ApiError::InternalError(Some(msg)).into_response()
131 }
132 }
133 }
134}
135
136pub async fn get_account_with_status(
137 db: &PgPool,
138 did: &str,
139) -> Result<Option<RepoAccount>, sqlx::Error> {
140 let row = sqlx::query!(
141 r#"
142 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid
143 FROM users u
144 LEFT JOIN repos r ON r.user_id = u.id
145 WHERE u.did = $1
146 "#,
147 did
148 )
149 .fetch_optional(db)
150 .await?;
151
152 Ok(row.map(|r| {
153 let status = if r.takedown_ref.is_some() {
154 AccountStatus::Takendown
155 } else if r.deactivated_at.is_some() {
156 AccountStatus::Deactivated
157 } else {
158 AccountStatus::Active
159 };
160
161 RepoAccount {
162 did: r.did,
163 user_id: r.id,
164 status,
165 repo_root_cid: Some(r.repo_root_cid),
166 }
167 }))
168}
169
170pub async fn assert_repo_availability(
171 db: &PgPool,
172 did: &str,
173 is_admin_or_self: bool,
174) -> Result<RepoAccount, RepoAvailabilityError> {
175 let account = get_account_with_status(db, did)
176 .await
177 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?;
178
179 let account = match account {
180 Some(a) => a,
181 None => return Err(RepoAvailabilityError::NotFound(did.to_string())),
182 };
183
184 if is_admin_or_self {
185 return Ok(account);
186 }
187
188 match account.status {
189 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())),
190 AccountStatus::Deactivated => {
191 return Err(RepoAvailabilityError::Deactivated(did.to_string()));
192 }
193 _ => {}
194 }
195
196 Ok(account)
197}
198
199fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
200 Commit::from_cbor(commit_bytes)
201 .ok()
202 .map(|c| c.rev().to_string())
203}
204
205async fn write_car_blocks(
206 commit_cid: Cid,
207 commit_bytes: Option<Bytes>,
208 other_blocks: BTreeMap<Cid, Bytes>,
209) -> Result<Vec<u8>, anyhow::Error> {
210 let mut buffer = Cursor::new(Vec::new());
211 let header = CarHeader::new_v1(vec![commit_cid]);
212 let mut writer = CarWriter::new(header, &mut buffer);
213 for (cid, data) in other_blocks {
214 if cid != commit_cid {
215 writer
216 .write(cid, data.as_ref())
217 .await
218 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
219 }
220 }
221 if let Some(data) = commit_bytes {
222 writer
223 .write(commit_cid, data.as_ref())
224 .await
225 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
226 }
227 writer
228 .finish()
229 .await
230 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
231 buffer
232 .flush()
233 .await
234 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
235 Ok(buffer.into_inner())
236}
237
238fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
239 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
240}
241
242fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
243 let frame = IdentityFrame {
244 did: event.did.clone(),
245 handle: event.handle.clone(),
246 seq: event.seq,
247 time: format_atproto_time(event.created_at),
248 };
249 let header = FrameHeader {
250 op: 1,
251 t: "#identity".to_string(),
252 };
253 let mut bytes = Vec::with_capacity(256);
254 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
255 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
256 Ok(bytes)
257}
258
259fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
260 let frame = AccountFrame {
261 did: event.did.clone(),
262 active: event.active.unwrap_or(true),
263 status: event.status.clone(),
264 seq: event.seq,
265 time: format_atproto_time(event.created_at),
266 };
267 let header = FrameHeader {
268 op: 1,
269 t: "#account".to_string(),
270 };
271 let mut bytes = Vec::with_capacity(256);
272 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
273 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
274 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
275 tracing::info!(
276 did = %frame.did,
277 active = frame.active,
278 status = ?frame.status,
279 cbor_len = bytes.len(),
280 cbor_hex = %hex_str,
281 "Sending account event to firehose"
282 );
283 Ok(bytes)
284}
285
286async fn format_sync_event(
287 state: &AppState,
288 event: &SequencedEvent,
289) -> Result<Vec<u8>, anyhow::Error> {
290 let commit_cid_str = event
291 .commit_cid
292 .as_ref()
293 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
294 let commit_cid = Cid::from_str(commit_cid_str)?;
295 let commit_bytes = state
296 .block_store
297 .get(&commit_cid)
298 .await?
299 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
300 let rev = if let Some(ref stored_rev) = event.rev {
301 stored_rev.clone()
302 } else {
303 extract_rev_from_commit_bytes(&commit_bytes)
304 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
305 };
306 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
307 let frame = SyncFrame {
308 did: event.did.clone(),
309 rev,
310 blocks: car_bytes,
311 seq: event.seq,
312 time: format_atproto_time(event.created_at),
313 };
314 let header = FrameHeader {
315 op: 1,
316 t: "#sync".to_string(),
317 };
318 let mut bytes = Vec::with_capacity(512);
319 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
320 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
321 Ok(bytes)
322}
323
324pub async fn format_event_for_sending(
325 state: &AppState,
326 event: SequencedEvent,
327) -> Result<Vec<u8>, anyhow::Error> {
328 match event.event_type.as_str() {
329 "identity" => return format_identity_event(&event),
330 "account" => return format_account_event(&event),
331 "sync" => return format_sync_event(state, &event).await,
332 _ => {}
333 }
334 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
335 let prev_cid_str = event.prev_cid.clone();
336 let prev_data_cid_str = event.prev_data_cid.clone();
337 let mut frame: CommitFrame = event
338 .try_into()
339 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
340 if let Some(ref pdc) = prev_data_cid_str
341 && let Ok(cid) = Cid::from_str(pdc)
342 {
343 frame.prev_data = Some(cid);
344 }
345 let commit_cid = frame.commit;
346 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
347 let mut all_cids: Vec<Cid> = block_cids_str
348 .iter()
349 .filter_map(|s| Cid::from_str(s).ok())
350 .filter(|c| Some(*c) != prev_cid)
351 .collect();
352 if !all_cids.contains(&commit_cid) {
353 all_cids.push(commit_cid);
354 }
355 if let Some(ref pc) = prev_cid
356 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
357 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes)
358 {
359 frame.since = Some(rev);
360 }
361 let car_bytes = if !all_cids.is_empty() {
362 let fetched = state.block_store.get_many(&all_cids).await?;
363 let mut blocks = std::collections::BTreeMap::new();
364 let mut commit_bytes: Option<Bytes> = None;
365 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
366 if let Some(data) = data_opt {
367 if *cid == commit_cid {
368 commit_bytes = Some(data.clone());
369 if let Some(rev) = extract_rev_from_commit_bytes(data) {
370 frame.rev = rev;
371 }
372 } else {
373 blocks.insert(*cid, data.clone());
374 }
375 }
376 }
377 write_car_blocks(commit_cid, commit_bytes, blocks).await?
378 } else {
379 Vec::new()
380 };
381 frame.blocks = car_bytes;
382 let header = FrameHeader {
383 op: 1,
384 t: "#commit".to_string(),
385 };
386 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
387 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
388 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
389 Ok(bytes)
390}
391
392pub async fn prefetch_blocks_for_events(
393 state: &AppState,
394 events: &[SequencedEvent],
395) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
396 let mut all_cids: Vec<Cid> = Vec::new();
397 for event in events {
398 if let Some(ref commit_cid_str) = event.commit_cid
399 && let Ok(cid) = Cid::from_str(commit_cid_str)
400 {
401 all_cids.push(cid);
402 }
403 if let Some(ref prev_cid_str) = event.prev_cid
404 && let Ok(cid) = Cid::from_str(prev_cid_str)
405 {
406 all_cids.push(cid);
407 }
408 if let Some(ref block_cids_str) = event.blocks_cids {
409 for s in block_cids_str {
410 if let Ok(cid) = Cid::from_str(s) {
411 all_cids.push(cid);
412 }
413 }
414 }
415 }
416 all_cids.sort();
417 all_cids.dedup();
418 if all_cids.is_empty() {
419 return Ok(HashMap::new());
420 }
421 let fetched = state.block_store.get_many(&all_cids).await?;
422 let mut blocks_map = HashMap::with_capacity(all_cids.len());
423 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
424 if let Some(data) = data_opt {
425 blocks_map.insert(cid, data);
426 }
427 }
428 Ok(blocks_map)
429}
430
431fn format_sync_event_with_prefetched(
432 event: &SequencedEvent,
433 prefetched: &HashMap<Cid, Bytes>,
434) -> Result<Vec<u8>, anyhow::Error> {
435 let commit_cid_str = event
436 .commit_cid
437 .as_ref()
438 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
439 let commit_cid = Cid::from_str(commit_cid_str)?;
440 let commit_bytes = prefetched
441 .get(&commit_cid)
442 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
443 let rev = if let Some(ref stored_rev) = event.rev {
444 stored_rev.clone()
445 } else {
446 extract_rev_from_commit_bytes(commit_bytes)
447 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
448 };
449 let car_bytes = futures::executor::block_on(write_car_blocks(
450 commit_cid,
451 Some(commit_bytes.clone()),
452 BTreeMap::new(),
453 ))?;
454 let frame = SyncFrame {
455 did: event.did.clone(),
456 rev,
457 blocks: car_bytes,
458 seq: event.seq,
459 time: format_atproto_time(event.created_at),
460 };
461 let header = FrameHeader {
462 op: 1,
463 t: "#sync".to_string(),
464 };
465 let mut bytes = Vec::new();
466 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
467 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
468 Ok(bytes)
469}
470
471pub async fn format_event_with_prefetched_blocks(
472 event: SequencedEvent,
473 prefetched: &HashMap<Cid, Bytes>,
474) -> Result<Vec<u8>, anyhow::Error> {
475 match event.event_type.as_str() {
476 "identity" => return format_identity_event(&event),
477 "account" => return format_account_event(&event),
478 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
479 _ => {}
480 }
481 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
482 let prev_cid_str = event.prev_cid.clone();
483 let prev_data_cid_str = event.prev_data_cid.clone();
484 let mut frame: CommitFrame = event
485 .try_into()
486 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
487 if let Some(ref pdc) = prev_data_cid_str
488 && let Ok(cid) = Cid::from_str(pdc)
489 {
490 frame.prev_data = Some(cid);
491 }
492 let commit_cid = frame.commit;
493 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
494 let mut all_cids: Vec<Cid> = block_cids_str
495 .iter()
496 .filter_map(|s| Cid::from_str(s).ok())
497 .filter(|c| Some(*c) != prev_cid)
498 .collect();
499 if !all_cids.contains(&commit_cid) {
500 all_cids.push(commit_cid);
501 }
502 if let Some(commit_bytes) = prefetched.get(&commit_cid)
503 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes)
504 {
505 frame.rev = rev;
506 }
507 if let Some(ref pc) = prev_cid
508 && let Some(prev_bytes) = prefetched.get(pc)
509 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes)
510 {
511 frame.since = Some(rev);
512 }
513 let car_bytes = if !all_cids.is_empty() {
514 let mut blocks = BTreeMap::new();
515 let mut commit_bytes_for_car: Option<Bytes> = None;
516 for cid in all_cids {
517 if let Some(data) = prefetched.get(&cid) {
518 if cid == commit_cid {
519 commit_bytes_for_car = Some(data.clone());
520 } else {
521 blocks.insert(cid, data.clone());
522 }
523 }
524 }
525 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
526 } else {
527 Vec::new()
528 };
529 frame.blocks = car_bytes;
530 let header = FrameHeader {
531 op: 1,
532 t: "#commit".to_string(),
533 };
534 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
535 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
536 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
537 Ok(bytes)
538}
539
540pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
541 let header = FrameHeader {
542 op: 1,
543 t: "#info".to_string(),
544 };
545 let frame = InfoFrame {
546 name: name.to_string(),
547 message: message.map(String::from),
548 };
549 let mut bytes = Vec::with_capacity(128);
550 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
551 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
552 Ok(bytes)
553}
554
555pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
556 let header = ErrorFrameHeader { op: -1 };
557 let frame = ErrorFrameBody {
558 error: error.to_string(),
559 message: message.map(String::from),
560 };
561 let mut bytes = Vec::with_capacity(128);
562 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
563 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
564 Ok(bytes)
565}