this repo has no description
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::sync::firehose::SequencedEvent;
4use crate::sync::frame::{
5 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame,
6 InfoFrame, SyncFrame,
7};
8use axum::response::{IntoResponse, Response};
9use bytes::Bytes;
10use cid::Cid;
11use iroh_car::{CarHeader, CarWriter};
12use jacquard_repo::commit::Commit;
13use jacquard_repo::storage::BlockStore;
14use serde::Serialize;
15use sqlx::PgPool;
16use std::collections::{BTreeMap, HashMap};
17use std::io::Cursor;
18use std::str::FromStr;
19use tokio::io::AsyncWriteExt;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
22#[serde(rename_all = "lowercase")]
23pub enum AccountStatus {
24 Active,
25 Takendown,
26 Suspended,
27 Deactivated,
28 Deleted,
29}
30
31impl AccountStatus {
32 pub fn as_str(&self) -> Option<&'static str> {
33 match self {
34 Self::Active => None,
35 Self::Takendown => Some("takendown"),
36 Self::Suspended => Some("suspended"),
37 Self::Deactivated => Some("deactivated"),
38 Self::Deleted => Some("deleted"),
39 }
40 }
41
42 pub fn is_active(&self) -> bool {
43 matches!(self, Self::Active)
44 }
45
46 pub fn is_takendown(&self) -> bool {
47 matches!(self, Self::Takendown)
48 }
49
50 pub fn is_suspended(&self) -> bool {
51 matches!(self, Self::Suspended)
52 }
53
54 pub fn is_deactivated(&self) -> bool {
55 matches!(self, Self::Deactivated)
56 }
57
58 pub fn is_deleted(&self) -> bool {
59 matches!(self, Self::Deleted)
60 }
61
62 pub fn allows_read(&self) -> bool {
63 matches!(self, Self::Active | Self::Deactivated)
64 }
65
66 pub fn allows_write(&self) -> bool {
67 matches!(self, Self::Active)
68 }
69
70 pub fn from_db_fields(
71 takedown_ref: Option<&str>,
72 deactivated_at: Option<chrono::DateTime<chrono::Utc>>,
73 ) -> Self {
74 if takedown_ref.is_some() {
75 Self::Takendown
76 } else if deactivated_at.is_some() {
77 Self::Deactivated
78 } else {
79 Self::Active
80 }
81 }
82}
83
84impl From<crate::types::AccountState> for AccountStatus {
85 fn from(state: crate::types::AccountState) -> Self {
86 match state {
87 crate::types::AccountState::Active => AccountStatus::Active,
88 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated,
89 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown,
90 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated,
91 }
92 }
93}
94
95impl From<&crate::types::AccountState> for AccountStatus {
96 fn from(state: &crate::types::AccountState) -> Self {
97 match state {
98 crate::types::AccountState::Active => AccountStatus::Active,
99 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated,
100 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown,
101 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated,
102 }
103 }
104}
105
106pub struct RepoAccount {
107 pub did: String,
108 pub user_id: uuid::Uuid,
109 pub status: AccountStatus,
110 pub repo_root_cid: Option<String>,
111}
112
113pub enum RepoAvailabilityError {
114 NotFound(String),
115 Takendown(String),
116 Deactivated(String),
117 Internal(String),
118}
119
120impl IntoResponse for RepoAvailabilityError {
121 fn into_response(self) -> Response {
122 match self {
123 RepoAvailabilityError::NotFound(did) => {
124 ApiError::RepoNotFound(Some(format!("Could not find repo for DID: {}", did)))
125 .into_response()
126 }
127 RepoAvailabilityError::Takendown(_) => ApiError::RepoTakendown.into_response(),
128 RepoAvailabilityError::Deactivated(_) => ApiError::RepoDeactivated.into_response(),
129 RepoAvailabilityError::Internal(msg) => {
130 ApiError::InternalError(Some(msg)).into_response()
131 }
132 }
133 }
134}
135
136pub async fn get_account_with_status(
137 db: &PgPool,
138 did: &str,
139) -> Result<Option<RepoAccount>, sqlx::Error> {
140 let row = sqlx::query!(
141 r#"
142 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid
143 FROM users u
144 LEFT JOIN repos r ON r.user_id = u.id
145 WHERE u.did = $1
146 "#,
147 did
148 )
149 .fetch_optional(db)
150 .await?;
151
152 Ok(row.map(|r| {
153 let status = if r.takedown_ref.is_some() {
154 AccountStatus::Takendown
155 } else if r.deactivated_at.is_some() {
156 AccountStatus::Deactivated
157 } else {
158 AccountStatus::Active
159 };
160
161 RepoAccount {
162 did: r.did,
163 user_id: r.id,
164 status,
165 repo_root_cid: Some(r.repo_root_cid),
166 }
167 }))
168}
169
170pub async fn assert_repo_availability(
171 db: &PgPool,
172 did: &str,
173 is_admin_or_self: bool,
174) -> Result<RepoAccount, RepoAvailabilityError> {
175 let account = get_account_with_status(db, did)
176 .await
177 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?;
178
179 let account = match account {
180 Some(a) => a,
181 None => return Err(RepoAvailabilityError::NotFound(did.to_string())),
182 };
183
184 if is_admin_or_self {
185 return Ok(account);
186 }
187
188 match account.status {
189 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())),
190 AccountStatus::Deactivated => {
191 return Err(RepoAvailabilityError::Deactivated(did.to_string()));
192 }
193 _ => {}
194 }
195
196 Ok(account)
197}
198
199fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> {
200 Commit::from_cbor(commit_bytes)
201 .ok()
202 .map(|c| c.rev().to_string())
203}
204
205async fn write_car_blocks(
206 commit_cid: Cid,
207 commit_bytes: Option<Bytes>,
208 other_blocks: BTreeMap<Cid, Bytes>,
209) -> Result<Vec<u8>, anyhow::Error> {
210 let mut buffer = Cursor::new(Vec::new());
211 let header = CarHeader::new_v1(vec![commit_cid]);
212 let mut writer = CarWriter::new(header, &mut buffer);
213 for (cid, data) in other_blocks.iter().filter(|(c, _)| **c != commit_cid) {
214 writer
215 .write(*cid, data.as_ref())
216 .await
217 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
218 }
219 if let Some(data) = commit_bytes {
220 writer
221 .write(commit_cid, data.as_ref())
222 .await
223 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?;
224 }
225 writer
226 .finish()
227 .await
228 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?;
229 buffer
230 .flush()
231 .await
232 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?;
233 Ok(buffer.into_inner())
234}
235
236fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String {
237 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
238}
239
240fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
241 let frame = IdentityFrame {
242 did: event.did.clone(),
243 handle: event.handle.clone(),
244 seq: event.seq,
245 time: format_atproto_time(event.created_at),
246 };
247 let header = FrameHeader {
248 op: 1,
249 t: "#identity".to_string(),
250 };
251 let mut bytes = Vec::with_capacity(256);
252 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
253 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
254 Ok(bytes)
255}
256
257fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> {
258 let frame = AccountFrame {
259 did: event.did.clone(),
260 active: event.active.unwrap_or(true),
261 status: event.status.clone(),
262 seq: event.seq,
263 time: format_atproto_time(event.created_at),
264 };
265 let header = FrameHeader {
266 op: 1,
267 t: "#account".to_string(),
268 };
269 let mut bytes = Vec::with_capacity(256);
270 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
271 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
272 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
273 tracing::info!(
274 did = %frame.did,
275 active = frame.active,
276 status = ?frame.status,
277 cbor_len = bytes.len(),
278 cbor_hex = %hex_str,
279 "Sending account event to firehose"
280 );
281 Ok(bytes)
282}
283
284async fn format_sync_event(
285 state: &AppState,
286 event: &SequencedEvent,
287) -> Result<Vec<u8>, anyhow::Error> {
288 let commit_cid_str = event
289 .commit_cid
290 .as_ref()
291 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
292 let commit_cid = Cid::from_str(commit_cid_str)?;
293 let commit_bytes = state
294 .block_store
295 .get(&commit_cid)
296 .await?
297 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?;
298 let rev = if let Some(ref stored_rev) = event.rev {
299 stored_rev.clone()
300 } else {
301 extract_rev_from_commit_bytes(&commit_bytes)
302 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
303 };
304 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?;
305 let frame = SyncFrame {
306 did: event.did.clone(),
307 rev,
308 blocks: car_bytes,
309 seq: event.seq,
310 time: format_atproto_time(event.created_at),
311 };
312 let header = FrameHeader {
313 op: 1,
314 t: "#sync".to_string(),
315 };
316 let mut bytes = Vec::with_capacity(512);
317 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
318 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
319 Ok(bytes)
320}
321
322pub async fn format_event_for_sending(
323 state: &AppState,
324 event: SequencedEvent,
325) -> Result<Vec<u8>, anyhow::Error> {
326 match event.event_type.as_str() {
327 "identity" => return format_identity_event(&event),
328 "account" => return format_account_event(&event),
329 "sync" => return format_sync_event(state, &event).await,
330 _ => {}
331 }
332 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
333 let prev_cid_str = event.prev_cid.clone();
334 let prev_data_cid_str = event.prev_data_cid.clone();
335 let mut frame: CommitFrame = event
336 .try_into()
337 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
338 if let Some(ref pdc) = prev_data_cid_str
339 && let Ok(cid) = Cid::from_str(pdc)
340 {
341 frame.prev_data = Some(cid);
342 }
343 let commit_cid = frame.commit;
344 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
345 let mut all_cids: Vec<Cid> = block_cids_str
346 .iter()
347 .filter_map(|s| Cid::from_str(s).ok())
348 .filter(|c| Some(*c) != prev_cid)
349 .collect();
350 if !all_cids.contains(&commit_cid) {
351 all_cids.push(commit_cid);
352 }
353 if let Some(ref pc) = prev_cid
354 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await
355 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes)
356 {
357 frame.since = Some(rev);
358 }
359 let car_bytes = if !all_cids.is_empty() {
360 let fetched = state.block_store.get_many(&all_cids).await?;
361 let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids
362 .iter()
363 .zip(fetched.iter())
364 .filter_map(|(cid, data_opt)| data_opt.as_ref().map(|data| (*cid, data.clone())))
365 .partition(|(cid, _)| *cid == commit_cid);
366 let commit_bytes = commit_data.into_iter().next().map(|(_, data)| data);
367 if let Some(ref cb) = commit_bytes
368 && let Some(rev) = extract_rev_from_commit_bytes(cb)
369 {
370 frame.rev = rev;
371 }
372 let blocks: std::collections::BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect();
373 write_car_blocks(commit_cid, commit_bytes, blocks).await?
374 } else {
375 Vec::new()
376 };
377 frame.blocks = car_bytes;
378 let header = FrameHeader {
379 op: 1,
380 t: "#commit".to_string(),
381 };
382 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
383 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
384 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
385 Ok(bytes)
386}
387
388pub async fn prefetch_blocks_for_events(
389 state: &AppState,
390 events: &[SequencedEvent],
391) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
392 let mut all_cids: Vec<Cid> = events
393 .iter()
394 .flat_map(|event| {
395 let commit_cid = event
396 .commit_cid
397 .as_ref()
398 .and_then(|s| Cid::from_str(s).ok());
399 let prev_cid = event.prev_cid.as_ref().and_then(|s| Cid::from_str(s).ok());
400 let block_cids = event
401 .blocks_cids
402 .as_ref()
403 .map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect())
404 .unwrap_or_else(Vec::new);
405 commit_cid.into_iter().chain(prev_cid).chain(block_cids)
406 })
407 .collect();
408 all_cids.sort();
409 all_cids.dedup();
410 if all_cids.is_empty() {
411 return Ok(HashMap::new());
412 }
413 let fetched = state.block_store.get_many(&all_cids).await?;
414 let blocks_map: HashMap<Cid, Bytes> = all_cids
415 .into_iter()
416 .zip(fetched)
417 .filter_map(|(cid, data_opt)| data_opt.map(|data| (cid, data)))
418 .collect();
419 Ok(blocks_map)
420}
421
422fn format_sync_event_with_prefetched(
423 event: &SequencedEvent,
424 prefetched: &HashMap<Cid, Bytes>,
425) -> Result<Vec<u8>, anyhow::Error> {
426 let commit_cid_str = event
427 .commit_cid
428 .as_ref()
429 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?;
430 let commit_cid = Cid::from_str(commit_cid_str)?;
431 let commit_bytes = prefetched
432 .get(&commit_cid)
433 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?;
434 let rev = if let Some(ref stored_rev) = event.rev {
435 stored_rev.clone()
436 } else {
437 extract_rev_from_commit_bytes(commit_bytes)
438 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?
439 };
440 let car_bytes = futures::executor::block_on(write_car_blocks(
441 commit_cid,
442 Some(commit_bytes.clone()),
443 BTreeMap::new(),
444 ))?;
445 let frame = SyncFrame {
446 did: event.did.clone(),
447 rev,
448 blocks: car_bytes,
449 seq: event.seq,
450 time: format_atproto_time(event.created_at),
451 };
452 let header = FrameHeader {
453 op: 1,
454 t: "#sync".to_string(),
455 };
456 let mut bytes = Vec::new();
457 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
458 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
459 Ok(bytes)
460}
461
462pub async fn format_event_with_prefetched_blocks(
463 event: SequencedEvent,
464 prefetched: &HashMap<Cid, Bytes>,
465) -> Result<Vec<u8>, anyhow::Error> {
466 match event.event_type.as_str() {
467 "identity" => return format_identity_event(&event),
468 "account" => return format_account_event(&event),
469 "sync" => return format_sync_event_with_prefetched(&event, prefetched),
470 _ => {}
471 }
472 let block_cids_str = event.blocks_cids.clone().unwrap_or_default();
473 let prev_cid_str = event.prev_cid.clone();
474 let prev_data_cid_str = event.prev_data_cid.clone();
475 let mut frame: CommitFrame = event
476 .try_into()
477 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?;
478 if let Some(ref pdc) = prev_data_cid_str
479 && let Ok(cid) = Cid::from_str(pdc)
480 {
481 frame.prev_data = Some(cid);
482 }
483 let commit_cid = frame.commit;
484 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok());
485 let mut all_cids: Vec<Cid> = block_cids_str
486 .iter()
487 .filter_map(|s| Cid::from_str(s).ok())
488 .filter(|c| Some(*c) != prev_cid)
489 .collect();
490 if !all_cids.contains(&commit_cid) {
491 all_cids.push(commit_cid);
492 }
493 if let Some(commit_bytes) = prefetched.get(&commit_cid)
494 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes)
495 {
496 frame.rev = rev;
497 }
498 if let Some(ref pc) = prev_cid
499 && let Some(prev_bytes) = prefetched.get(pc)
500 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes)
501 {
502 frame.since = Some(rev);
503 }
504 let car_bytes = if !all_cids.is_empty() {
505 let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids
506 .into_iter()
507 .filter_map(|cid| prefetched.get(&cid).map(|data| (cid, data.clone())))
508 .partition(|(cid, _)| *cid == commit_cid);
509 let commit_bytes_for_car = commit_data.into_iter().next().map(|(_, data)| data);
510 let blocks: BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect();
511 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
512 } else {
513 Vec::new()
514 };
515 frame.blocks = car_bytes;
516 let header = FrameHeader {
517 op: 1,
518 t: "#commit".to_string(),
519 };
520 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512);
521 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
522 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
523 Ok(bytes)
524}
525
526pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
527 let header = FrameHeader {
528 op: 1,
529 t: "#info".to_string(),
530 };
531 let frame = InfoFrame {
532 name: name.to_string(),
533 message: message.map(String::from),
534 };
535 let mut bytes = Vec::with_capacity(128);
536 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
537 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
538 Ok(bytes)
539}
540
541pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> {
542 let header = ErrorFrameHeader { op: -1 };
543 let frame = ErrorFrameBody {
544 error: error.to_string(),
545 message: message.map(String::from),
546 };
547 let mut bytes = Vec::with_capacity(128);
548 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?;
549 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?;
550 Ok(bytes)
551}