this repo has no description
1use crate::api::error::ApiError; 2use crate::state::AppState; 3use crate::sync::firehose::SequencedEvent; 4use crate::sync::frame::{ 5 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame, 6 InfoFrame, SyncFrame, 7}; 8use axum::response::{IntoResponse, Response}; 9use bytes::Bytes; 10use cid::Cid; 11use iroh_car::{CarHeader, CarWriter}; 12use jacquard_repo::commit::Commit; 13use jacquard_repo::storage::BlockStore; 14use serde::Serialize; 15use sqlx::PgPool; 16use std::collections::{BTreeMap, HashMap}; 17use std::io::Cursor; 18use std::str::FromStr; 19use tokio::io::AsyncWriteExt; 20 21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] 22#[serde(rename_all = "lowercase")] 23pub enum AccountStatus { 24 Active, 25 Takendown, 26 Suspended, 27 Deactivated, 28 Deleted, 29} 30 31impl AccountStatus { 32 pub fn as_str(&self) -> Option<&'static str> { 33 match self { 34 Self::Active => None, 35 Self::Takendown => Some("takendown"), 36 Self::Suspended => Some("suspended"), 37 Self::Deactivated => Some("deactivated"), 38 Self::Deleted => Some("deleted"), 39 } 40 } 41 42 pub fn is_active(&self) -> bool { 43 matches!(self, Self::Active) 44 } 45 46 pub fn is_takendown(&self) -> bool { 47 matches!(self, Self::Takendown) 48 } 49 50 pub fn is_suspended(&self) -> bool { 51 matches!(self, Self::Suspended) 52 } 53 54 pub fn is_deactivated(&self) -> bool { 55 matches!(self, Self::Deactivated) 56 } 57 58 pub fn is_deleted(&self) -> bool { 59 matches!(self, Self::Deleted) 60 } 61 62 pub fn allows_read(&self) -> bool { 63 matches!(self, Self::Active | Self::Deactivated) 64 } 65 66 pub fn allows_write(&self) -> bool { 67 matches!(self, Self::Active) 68 } 69 70 pub fn from_db_fields( 71 takedown_ref: Option<&str>, 72 deactivated_at: Option<chrono::DateTime<chrono::Utc>>, 73 ) -> Self { 74 if takedown_ref.is_some() { 75 Self::Takendown 76 } else if deactivated_at.is_some() { 77 Self::Deactivated 78 } else { 79 Self::Active 80 } 81 } 82} 83 84impl From<crate::types::AccountState> for AccountStatus { 85 fn from(state: crate::types::AccountState) -> Self { 86 match state { 87 crate::types::AccountState::Active => AccountStatus::Active, 88 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated, 89 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown, 90 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated, 91 } 92 } 93} 94 95impl From<&crate::types::AccountState> for AccountStatus { 96 fn from(state: &crate::types::AccountState) -> Self { 97 match state { 98 crate::types::AccountState::Active => AccountStatus::Active, 99 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated, 100 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown, 101 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated, 102 } 103 } 104} 105 106pub struct RepoAccount { 107 pub did: String, 108 pub user_id: uuid::Uuid, 109 pub status: AccountStatus, 110 pub repo_root_cid: Option<String>, 111} 112 113pub enum RepoAvailabilityError { 114 NotFound(String), 115 Takendown(String), 116 Deactivated(String), 117 Internal(String), 118} 119 120impl IntoResponse for RepoAvailabilityError { 121 fn into_response(self) -> Response { 122 match self { 123 RepoAvailabilityError::NotFound(did) => { 124 ApiError::RepoNotFound(Some(format!("Could not find repo for DID: {}", did))) 125 .into_response() 126 } 127 RepoAvailabilityError::Takendown(_) => ApiError::RepoTakendown.into_response(), 128 RepoAvailabilityError::Deactivated(_) => ApiError::RepoDeactivated.into_response(), 129 RepoAvailabilityError::Internal(msg) => { 130 ApiError::InternalError(Some(msg)).into_response() 131 } 132 } 133 } 134} 135 136pub async fn get_account_with_status( 137 db: &PgPool, 138 did: &str, 139) -> Result<Option<RepoAccount>, sqlx::Error> { 140 let row = sqlx::query!( 141 r#" 142 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid 143 FROM users u 144 LEFT JOIN repos r ON r.user_id = u.id 145 WHERE u.did = $1 146 "#, 147 did 148 ) 149 .fetch_optional(db) 150 .await?; 151 152 Ok(row.map(|r| { 153 let status = if r.takedown_ref.is_some() { 154 AccountStatus::Takendown 155 } else if r.deactivated_at.is_some() { 156 AccountStatus::Deactivated 157 } else { 158 AccountStatus::Active 159 }; 160 161 RepoAccount { 162 did: r.did, 163 user_id: r.id, 164 status, 165 repo_root_cid: Some(r.repo_root_cid), 166 } 167 })) 168} 169 170pub async fn assert_repo_availability( 171 db: &PgPool, 172 did: &str, 173 is_admin_or_self: bool, 174) -> Result<RepoAccount, RepoAvailabilityError> { 175 let account = get_account_with_status(db, did) 176 .await 177 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?; 178 179 let account = match account { 180 Some(a) => a, 181 None => return Err(RepoAvailabilityError::NotFound(did.to_string())), 182 }; 183 184 if is_admin_or_self { 185 return Ok(account); 186 } 187 188 match account.status { 189 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())), 190 AccountStatus::Deactivated => { 191 return Err(RepoAvailabilityError::Deactivated(did.to_string())); 192 } 193 _ => {} 194 } 195 196 Ok(account) 197} 198 199fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 200 Commit::from_cbor(commit_bytes) 201 .ok() 202 .map(|c| c.rev().to_string()) 203} 204 205async fn write_car_blocks( 206 commit_cid: Cid, 207 commit_bytes: Option<Bytes>, 208 other_blocks: BTreeMap<Cid, Bytes>, 209) -> Result<Vec<u8>, anyhow::Error> { 210 let mut buffer = Cursor::new(Vec::new()); 211 let header = CarHeader::new_v1(vec![commit_cid]); 212 let mut writer = CarWriter::new(header, &mut buffer); 213 for (cid, data) in other_blocks { 214 if cid != commit_cid { 215 writer 216 .write(cid, data.as_ref()) 217 .await 218 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 219 } 220 } 221 if let Some(data) = commit_bytes { 222 writer 223 .write(commit_cid, data.as_ref()) 224 .await 225 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 226 } 227 writer 228 .finish() 229 .await 230 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 231 buffer 232 .flush() 233 .await 234 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 235 Ok(buffer.into_inner()) 236} 237 238fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 239 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 240} 241 242fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 243 let frame = IdentityFrame { 244 did: event.did.clone(), 245 handle: event.handle.clone(), 246 seq: event.seq, 247 time: format_atproto_time(event.created_at), 248 }; 249 let header = FrameHeader { 250 op: 1, 251 t: "#identity".to_string(), 252 }; 253 let mut bytes = Vec::with_capacity(256); 254 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 255 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 256 Ok(bytes) 257} 258 259fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 260 let frame = AccountFrame { 261 did: event.did.clone(), 262 active: event.active.unwrap_or(true), 263 status: event.status.clone(), 264 seq: event.seq, 265 time: format_atproto_time(event.created_at), 266 }; 267 let header = FrameHeader { 268 op: 1, 269 t: "#account".to_string(), 270 }; 271 let mut bytes = Vec::with_capacity(256); 272 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 273 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 274 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); 275 tracing::info!( 276 did = %frame.did, 277 active = frame.active, 278 status = ?frame.status, 279 cbor_len = bytes.len(), 280 cbor_hex = %hex_str, 281 "Sending account event to firehose" 282 ); 283 Ok(bytes) 284} 285 286async fn format_sync_event( 287 state: &AppState, 288 event: &SequencedEvent, 289) -> Result<Vec<u8>, anyhow::Error> { 290 let commit_cid_str = event 291 .commit_cid 292 .as_ref() 293 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 294 let commit_cid = Cid::from_str(commit_cid_str)?; 295 let commit_bytes = state 296 .block_store 297 .get(&commit_cid) 298 .await? 299 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 300 let rev = if let Some(ref stored_rev) = event.rev { 301 stored_rev.clone() 302 } else { 303 extract_rev_from_commit_bytes(&commit_bytes) 304 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 305 }; 306 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 307 let frame = SyncFrame { 308 did: event.did.clone(), 309 rev, 310 blocks: car_bytes, 311 seq: event.seq, 312 time: format_atproto_time(event.created_at), 313 }; 314 let header = FrameHeader { 315 op: 1, 316 t: "#sync".to_string(), 317 }; 318 let mut bytes = Vec::with_capacity(512); 319 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 320 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 321 Ok(bytes) 322} 323 324pub async fn format_event_for_sending( 325 state: &AppState, 326 event: SequencedEvent, 327) -> Result<Vec<u8>, anyhow::Error> { 328 match event.event_type.as_str() { 329 "identity" => return format_identity_event(&event), 330 "account" => return format_account_event(&event), 331 "sync" => return format_sync_event(state, &event).await, 332 _ => {} 333 } 334 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 335 let prev_cid_str = event.prev_cid.clone(); 336 let prev_data_cid_str = event.prev_data_cid.clone(); 337 let mut frame: CommitFrame = event 338 .try_into() 339 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 340 if let Some(ref pdc) = prev_data_cid_str 341 && let Ok(cid) = Cid::from_str(pdc) 342 { 343 frame.prev_data = Some(cid); 344 } 345 let commit_cid = frame.commit; 346 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 347 let mut all_cids: Vec<Cid> = block_cids_str 348 .iter() 349 .filter_map(|s| Cid::from_str(s).ok()) 350 .filter(|c| Some(*c) != prev_cid) 351 .collect(); 352 if !all_cids.contains(&commit_cid) { 353 all_cids.push(commit_cid); 354 } 355 if let Some(ref pc) = prev_cid 356 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 357 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) 358 { 359 frame.since = Some(rev); 360 } 361 let car_bytes = if !all_cids.is_empty() { 362 let fetched = state.block_store.get_many(&all_cids).await?; 363 let mut blocks = std::collections::BTreeMap::new(); 364 let mut commit_bytes: Option<Bytes> = None; 365 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 366 if let Some(data) = data_opt { 367 if *cid == commit_cid { 368 commit_bytes = Some(data.clone()); 369 if let Some(rev) = extract_rev_from_commit_bytes(data) { 370 frame.rev = rev; 371 } 372 } else { 373 blocks.insert(*cid, data.clone()); 374 } 375 } 376 } 377 write_car_blocks(commit_cid, commit_bytes, blocks).await? 378 } else { 379 Vec::new() 380 }; 381 frame.blocks = car_bytes; 382 let header = FrameHeader { 383 op: 1, 384 t: "#commit".to_string(), 385 }; 386 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512); 387 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 388 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 389 Ok(bytes) 390} 391 392pub async fn prefetch_blocks_for_events( 393 state: &AppState, 394 events: &[SequencedEvent], 395) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 396 let mut all_cids: Vec<Cid> = Vec::new(); 397 for event in events { 398 if let Some(ref commit_cid_str) = event.commit_cid 399 && let Ok(cid) = Cid::from_str(commit_cid_str) 400 { 401 all_cids.push(cid); 402 } 403 if let Some(ref prev_cid_str) = event.prev_cid 404 && let Ok(cid) = Cid::from_str(prev_cid_str) 405 { 406 all_cids.push(cid); 407 } 408 if let Some(ref block_cids_str) = event.blocks_cids { 409 for s in block_cids_str { 410 if let Ok(cid) = Cid::from_str(s) { 411 all_cids.push(cid); 412 } 413 } 414 } 415 } 416 all_cids.sort(); 417 all_cids.dedup(); 418 if all_cids.is_empty() { 419 return Ok(HashMap::new()); 420 } 421 let fetched = state.block_store.get_many(&all_cids).await?; 422 let mut blocks_map = HashMap::with_capacity(all_cids.len()); 423 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 424 if let Some(data) = data_opt { 425 blocks_map.insert(cid, data); 426 } 427 } 428 Ok(blocks_map) 429} 430 431fn format_sync_event_with_prefetched( 432 event: &SequencedEvent, 433 prefetched: &HashMap<Cid, Bytes>, 434) -> Result<Vec<u8>, anyhow::Error> { 435 let commit_cid_str = event 436 .commit_cid 437 .as_ref() 438 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 439 let commit_cid = Cid::from_str(commit_cid_str)?; 440 let commit_bytes = prefetched 441 .get(&commit_cid) 442 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 443 let rev = if let Some(ref stored_rev) = event.rev { 444 stored_rev.clone() 445 } else { 446 extract_rev_from_commit_bytes(commit_bytes) 447 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 448 }; 449 let car_bytes = futures::executor::block_on(write_car_blocks( 450 commit_cid, 451 Some(commit_bytes.clone()), 452 BTreeMap::new(), 453 ))?; 454 let frame = SyncFrame { 455 did: event.did.clone(), 456 rev, 457 blocks: car_bytes, 458 seq: event.seq, 459 time: format_atproto_time(event.created_at), 460 }; 461 let header = FrameHeader { 462 op: 1, 463 t: "#sync".to_string(), 464 }; 465 let mut bytes = Vec::new(); 466 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 467 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 468 Ok(bytes) 469} 470 471pub async fn format_event_with_prefetched_blocks( 472 event: SequencedEvent, 473 prefetched: &HashMap<Cid, Bytes>, 474) -> Result<Vec<u8>, anyhow::Error> { 475 match event.event_type.as_str() { 476 "identity" => return format_identity_event(&event), 477 "account" => return format_account_event(&event), 478 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 479 _ => {} 480 } 481 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 482 let prev_cid_str = event.prev_cid.clone(); 483 let prev_data_cid_str = event.prev_data_cid.clone(); 484 let mut frame: CommitFrame = event 485 .try_into() 486 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 487 if let Some(ref pdc) = prev_data_cid_str 488 && let Ok(cid) = Cid::from_str(pdc) 489 { 490 frame.prev_data = Some(cid); 491 } 492 let commit_cid = frame.commit; 493 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 494 let mut all_cids: Vec<Cid> = block_cids_str 495 .iter() 496 .filter_map(|s| Cid::from_str(s).ok()) 497 .filter(|c| Some(*c) != prev_cid) 498 .collect(); 499 if !all_cids.contains(&commit_cid) { 500 all_cids.push(commit_cid); 501 } 502 if let Some(commit_bytes) = prefetched.get(&commit_cid) 503 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) 504 { 505 frame.rev = rev; 506 } 507 if let Some(ref pc) = prev_cid 508 && let Some(prev_bytes) = prefetched.get(pc) 509 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) 510 { 511 frame.since = Some(rev); 512 } 513 let car_bytes = if !all_cids.is_empty() { 514 let mut blocks = BTreeMap::new(); 515 let mut commit_bytes_for_car: Option<Bytes> = None; 516 for cid in all_cids { 517 if let Some(data) = prefetched.get(&cid) { 518 if cid == commit_cid { 519 commit_bytes_for_car = Some(data.clone()); 520 } else { 521 blocks.insert(cid, data.clone()); 522 } 523 } 524 } 525 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 526 } else { 527 Vec::new() 528 }; 529 frame.blocks = car_bytes; 530 let header = FrameHeader { 531 op: 1, 532 t: "#commit".to_string(), 533 }; 534 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512); 535 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 536 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 537 Ok(bytes) 538} 539 540pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 541 let header = FrameHeader { 542 op: 1, 543 t: "#info".to_string(), 544 }; 545 let frame = InfoFrame { 546 name: name.to_string(), 547 message: message.map(String::from), 548 }; 549 let mut bytes = Vec::with_capacity(128); 550 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 551 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 552 Ok(bytes) 553} 554 555pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 556 let header = ErrorFrameHeader { op: -1 }; 557 let frame = ErrorFrameBody { 558 error: error.to_string(), 559 message: message.map(String::from), 560 }; 561 let mut bytes = Vec::with_capacity(128); 562 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 563 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 564 Ok(bytes) 565}