this repo has no description
1use crate::api::error::ApiError; 2use crate::state::AppState; 3use crate::sync::firehose::SequencedEvent; 4use crate::sync::frame::{ 5 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame, 6 InfoFrame, SyncFrame, 7}; 8use axum::response::{IntoResponse, Response}; 9use bytes::Bytes; 10use cid::Cid; 11use iroh_car::{CarHeader, CarWriter}; 12use jacquard_repo::commit::Commit; 13use jacquard_repo::storage::BlockStore; 14use serde::Serialize; 15use sqlx::PgPool; 16use std::collections::{BTreeMap, HashMap}; 17use std::io::Cursor; 18use std::str::FromStr; 19use tokio::io::AsyncWriteExt; 20 21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] 22#[serde(rename_all = "lowercase")] 23pub enum AccountStatus { 24 Active, 25 Takendown, 26 Suspended, 27 Deactivated, 28 Deleted, 29} 30 31impl AccountStatus { 32 pub fn as_str(&self) -> Option<&'static str> { 33 match self { 34 Self::Active => None, 35 Self::Takendown => Some("takendown"), 36 Self::Suspended => Some("suspended"), 37 Self::Deactivated => Some("deactivated"), 38 Self::Deleted => Some("deleted"), 39 } 40 } 41 42 pub fn is_active(&self) -> bool { 43 matches!(self, Self::Active) 44 } 45 46 pub fn is_takendown(&self) -> bool { 47 matches!(self, Self::Takendown) 48 } 49 50 pub fn is_suspended(&self) -> bool { 51 matches!(self, Self::Suspended) 52 } 53 54 pub fn is_deactivated(&self) -> bool { 55 matches!(self, Self::Deactivated) 56 } 57 58 pub fn is_deleted(&self) -> bool { 59 matches!(self, Self::Deleted) 60 } 61 62 pub fn allows_read(&self) -> bool { 63 matches!(self, Self::Active | Self::Deactivated) 64 } 65 66 pub fn allows_write(&self) -> bool { 67 matches!(self, Self::Active) 68 } 69 70 pub fn from_db_fields(takedown_ref: Option<&str>, deactivated_at: Option<chrono::DateTime<chrono::Utc>>) -> Self { 71 if takedown_ref.is_some() { 72 Self::Takendown 73 } else if deactivated_at.is_some() { 74 Self::Deactivated 75 } else { 76 Self::Active 77 } 78 } 79} 80 81impl From<crate::types::AccountState> for AccountStatus { 82 fn from(state: crate::types::AccountState) -> Self { 83 match state { 84 crate::types::AccountState::Active => AccountStatus::Active, 85 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated, 86 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown, 87 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated, 88 } 89 } 90} 91 92impl From<&crate::types::AccountState> for AccountStatus { 93 fn from(state: &crate::types::AccountState) -> Self { 94 match state { 95 crate::types::AccountState::Active => AccountStatus::Active, 96 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated, 97 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown, 98 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated, 99 } 100 } 101} 102 103pub struct RepoAccount { 104 pub did: String, 105 pub user_id: uuid::Uuid, 106 pub status: AccountStatus, 107 pub repo_root_cid: Option<String>, 108} 109 110pub enum RepoAvailabilityError { 111 NotFound(String), 112 Takendown(String), 113 Deactivated(String), 114 Internal(String), 115} 116 117impl IntoResponse for RepoAvailabilityError { 118 fn into_response(self) -> Response { 119 match self { 120 RepoAvailabilityError::NotFound(did) => { 121 ApiError::RepoNotFound(Some(format!("Could not find repo for DID: {}", did))) 122 .into_response() 123 } 124 RepoAvailabilityError::Takendown(_) => ApiError::RepoTakendown.into_response(), 125 RepoAvailabilityError::Deactivated(_) => ApiError::RepoDeactivated.into_response(), 126 RepoAvailabilityError::Internal(msg) => { 127 ApiError::InternalError(Some(msg)).into_response() 128 } 129 } 130 } 131} 132 133pub async fn get_account_with_status( 134 db: &PgPool, 135 did: &str, 136) -> Result<Option<RepoAccount>, sqlx::Error> { 137 let row = sqlx::query!( 138 r#" 139 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid 140 FROM users u 141 LEFT JOIN repos r ON r.user_id = u.id 142 WHERE u.did = $1 143 "#, 144 did 145 ) 146 .fetch_optional(db) 147 .await?; 148 149 Ok(row.map(|r| { 150 let status = if r.takedown_ref.is_some() { 151 AccountStatus::Takendown 152 } else if r.deactivated_at.is_some() { 153 AccountStatus::Deactivated 154 } else { 155 AccountStatus::Active 156 }; 157 158 RepoAccount { 159 did: r.did, 160 user_id: r.id, 161 status, 162 repo_root_cid: Some(r.repo_root_cid), 163 } 164 })) 165} 166 167pub async fn assert_repo_availability( 168 db: &PgPool, 169 did: &str, 170 is_admin_or_self: bool, 171) -> Result<RepoAccount, RepoAvailabilityError> { 172 let account = get_account_with_status(db, did) 173 .await 174 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?; 175 176 let account = match account { 177 Some(a) => a, 178 None => return Err(RepoAvailabilityError::NotFound(did.to_string())), 179 }; 180 181 if is_admin_or_self { 182 return Ok(account); 183 } 184 185 match account.status { 186 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())), 187 AccountStatus::Deactivated => { 188 return Err(RepoAvailabilityError::Deactivated(did.to_string())); 189 } 190 _ => {} 191 } 192 193 Ok(account) 194} 195 196fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 197 Commit::from_cbor(commit_bytes) 198 .ok() 199 .map(|c| c.rev().to_string()) 200} 201 202async fn write_car_blocks( 203 commit_cid: Cid, 204 commit_bytes: Option<Bytes>, 205 other_blocks: BTreeMap<Cid, Bytes>, 206) -> Result<Vec<u8>, anyhow::Error> { 207 let mut buffer = Cursor::new(Vec::new()); 208 let header = CarHeader::new_v1(vec![commit_cid]); 209 let mut writer = CarWriter::new(header, &mut buffer); 210 for (cid, data) in other_blocks { 211 if cid != commit_cid { 212 writer 213 .write(cid, data.as_ref()) 214 .await 215 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 216 } 217 } 218 if let Some(data) = commit_bytes { 219 writer 220 .write(commit_cid, data.as_ref()) 221 .await 222 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 223 } 224 writer 225 .finish() 226 .await 227 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 228 buffer 229 .flush() 230 .await 231 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 232 Ok(buffer.into_inner()) 233} 234 235fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 236 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 237} 238 239fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 240 let frame = IdentityFrame { 241 did: event.did.clone(), 242 handle: event.handle.clone(), 243 seq: event.seq, 244 time: format_atproto_time(event.created_at), 245 }; 246 let header = FrameHeader { 247 op: 1, 248 t: "#identity".to_string(), 249 }; 250 let mut bytes = Vec::with_capacity(256); 251 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 252 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 253 Ok(bytes) 254} 255 256fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 257 let frame = AccountFrame { 258 did: event.did.clone(), 259 active: event.active.unwrap_or(true), 260 status: event.status.clone(), 261 seq: event.seq, 262 time: format_atproto_time(event.created_at), 263 }; 264 let header = FrameHeader { 265 op: 1, 266 t: "#account".to_string(), 267 }; 268 let mut bytes = Vec::with_capacity(256); 269 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 270 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 271 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); 272 tracing::info!( 273 did = %frame.did, 274 active = frame.active, 275 status = ?frame.status, 276 cbor_len = bytes.len(), 277 cbor_hex = %hex_str, 278 "Sending account event to firehose" 279 ); 280 Ok(bytes) 281} 282 283async fn format_sync_event( 284 state: &AppState, 285 event: &SequencedEvent, 286) -> Result<Vec<u8>, anyhow::Error> { 287 let commit_cid_str = event 288 .commit_cid 289 .as_ref() 290 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 291 let commit_cid = Cid::from_str(commit_cid_str)?; 292 let commit_bytes = state 293 .block_store 294 .get(&commit_cid) 295 .await? 296 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 297 let rev = if let Some(ref stored_rev) = event.rev { 298 stored_rev.clone() 299 } else { 300 extract_rev_from_commit_bytes(&commit_bytes) 301 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 302 }; 303 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 304 let frame = SyncFrame { 305 did: event.did.clone(), 306 rev, 307 blocks: car_bytes, 308 seq: event.seq, 309 time: format_atproto_time(event.created_at), 310 }; 311 let header = FrameHeader { 312 op: 1, 313 t: "#sync".to_string(), 314 }; 315 let mut bytes = Vec::with_capacity(512); 316 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 317 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 318 Ok(bytes) 319} 320 321pub async fn format_event_for_sending( 322 state: &AppState, 323 event: SequencedEvent, 324) -> Result<Vec<u8>, anyhow::Error> { 325 match event.event_type.as_str() { 326 "identity" => return format_identity_event(&event), 327 "account" => return format_account_event(&event), 328 "sync" => return format_sync_event(state, &event).await, 329 _ => {} 330 } 331 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 332 let prev_cid_str = event.prev_cid.clone(); 333 let prev_data_cid_str = event.prev_data_cid.clone(); 334 let mut frame: CommitFrame = event 335 .try_into() 336 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 337 if let Some(ref pdc) = prev_data_cid_str 338 && let Ok(cid) = Cid::from_str(pdc) 339 { 340 frame.prev_data = Some(cid); 341 } 342 let commit_cid = frame.commit; 343 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 344 let mut all_cids: Vec<Cid> = block_cids_str 345 .iter() 346 .filter_map(|s| Cid::from_str(s).ok()) 347 .filter(|c| Some(*c) != prev_cid) 348 .collect(); 349 if !all_cids.contains(&commit_cid) { 350 all_cids.push(commit_cid); 351 } 352 if let Some(ref pc) = prev_cid 353 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 354 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) 355 { 356 frame.since = Some(rev); 357 } 358 let car_bytes = if !all_cids.is_empty() { 359 let fetched = state.block_store.get_many(&all_cids).await?; 360 let mut blocks = std::collections::BTreeMap::new(); 361 let mut commit_bytes: Option<Bytes> = None; 362 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 363 if let Some(data) = data_opt { 364 if *cid == commit_cid { 365 commit_bytes = Some(data.clone()); 366 if let Some(rev) = extract_rev_from_commit_bytes(data) { 367 frame.rev = rev; 368 } 369 } else { 370 blocks.insert(*cid, data.clone()); 371 } 372 } 373 } 374 write_car_blocks(commit_cid, commit_bytes, blocks).await? 375 } else { 376 Vec::new() 377 }; 378 frame.blocks = car_bytes; 379 let header = FrameHeader { 380 op: 1, 381 t: "#commit".to_string(), 382 }; 383 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512); 384 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 385 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 386 Ok(bytes) 387} 388 389pub async fn prefetch_blocks_for_events( 390 state: &AppState, 391 events: &[SequencedEvent], 392) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 393 let mut all_cids: Vec<Cid> = Vec::new(); 394 for event in events { 395 if let Some(ref commit_cid_str) = event.commit_cid 396 && let Ok(cid) = Cid::from_str(commit_cid_str) 397 { 398 all_cids.push(cid); 399 } 400 if let Some(ref prev_cid_str) = event.prev_cid 401 && let Ok(cid) = Cid::from_str(prev_cid_str) 402 { 403 all_cids.push(cid); 404 } 405 if let Some(ref block_cids_str) = event.blocks_cids { 406 for s in block_cids_str { 407 if let Ok(cid) = Cid::from_str(s) { 408 all_cids.push(cid); 409 } 410 } 411 } 412 } 413 all_cids.sort(); 414 all_cids.dedup(); 415 if all_cids.is_empty() { 416 return Ok(HashMap::new()); 417 } 418 let fetched = state.block_store.get_many(&all_cids).await?; 419 let mut blocks_map = HashMap::with_capacity(all_cids.len()); 420 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 421 if let Some(data) = data_opt { 422 blocks_map.insert(cid, data); 423 } 424 } 425 Ok(blocks_map) 426} 427 428fn format_sync_event_with_prefetched( 429 event: &SequencedEvent, 430 prefetched: &HashMap<Cid, Bytes>, 431) -> Result<Vec<u8>, anyhow::Error> { 432 let commit_cid_str = event 433 .commit_cid 434 .as_ref() 435 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 436 let commit_cid = Cid::from_str(commit_cid_str)?; 437 let commit_bytes = prefetched 438 .get(&commit_cid) 439 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 440 let rev = if let Some(ref stored_rev) = event.rev { 441 stored_rev.clone() 442 } else { 443 extract_rev_from_commit_bytes(commit_bytes) 444 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 445 }; 446 let car_bytes = futures::executor::block_on(write_car_blocks( 447 commit_cid, 448 Some(commit_bytes.clone()), 449 BTreeMap::new(), 450 ))?; 451 let frame = SyncFrame { 452 did: event.did.clone(), 453 rev, 454 blocks: car_bytes, 455 seq: event.seq, 456 time: format_atproto_time(event.created_at), 457 }; 458 let header = FrameHeader { 459 op: 1, 460 t: "#sync".to_string(), 461 }; 462 let mut bytes = Vec::new(); 463 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 464 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 465 Ok(bytes) 466} 467 468pub async fn format_event_with_prefetched_blocks( 469 event: SequencedEvent, 470 prefetched: &HashMap<Cid, Bytes>, 471) -> Result<Vec<u8>, anyhow::Error> { 472 match event.event_type.as_str() { 473 "identity" => return format_identity_event(&event), 474 "account" => return format_account_event(&event), 475 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 476 _ => {} 477 } 478 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 479 let prev_cid_str = event.prev_cid.clone(); 480 let prev_data_cid_str = event.prev_data_cid.clone(); 481 let mut frame: CommitFrame = event 482 .try_into() 483 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 484 if let Some(ref pdc) = prev_data_cid_str 485 && let Ok(cid) = Cid::from_str(pdc) 486 { 487 frame.prev_data = Some(cid); 488 } 489 let commit_cid = frame.commit; 490 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 491 let mut all_cids: Vec<Cid> = block_cids_str 492 .iter() 493 .filter_map(|s| Cid::from_str(s).ok()) 494 .filter(|c| Some(*c) != prev_cid) 495 .collect(); 496 if !all_cids.contains(&commit_cid) { 497 all_cids.push(commit_cid); 498 } 499 if let Some(commit_bytes) = prefetched.get(&commit_cid) 500 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) 501 { 502 frame.rev = rev; 503 } 504 if let Some(ref pc) = prev_cid 505 && let Some(prev_bytes) = prefetched.get(pc) 506 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) 507 { 508 frame.since = Some(rev); 509 } 510 let car_bytes = if !all_cids.is_empty() { 511 let mut blocks = BTreeMap::new(); 512 let mut commit_bytes_for_car: Option<Bytes> = None; 513 for cid in all_cids { 514 if let Some(data) = prefetched.get(&cid) { 515 if cid == commit_cid { 516 commit_bytes_for_car = Some(data.clone()); 517 } else { 518 blocks.insert(cid, data.clone()); 519 } 520 } 521 } 522 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 523 } else { 524 Vec::new() 525 }; 526 frame.blocks = car_bytes; 527 let header = FrameHeader { 528 op: 1, 529 t: "#commit".to_string(), 530 }; 531 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512); 532 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 533 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 534 Ok(bytes) 535} 536 537pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 538 let header = FrameHeader { 539 op: 1, 540 t: "#info".to_string(), 541 }; 542 let frame = InfoFrame { 543 name: name.to_string(), 544 message: message.map(String::from), 545 }; 546 let mut bytes = Vec::with_capacity(128); 547 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 548 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 549 Ok(bytes) 550} 551 552pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 553 let header = ErrorFrameHeader { op: -1 }; 554 let frame = ErrorFrameBody { 555 error: error.to_string(), 556 message: message.map(String::from), 557 }; 558 let mut bytes = Vec::with_capacity(128); 559 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 560 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 561 Ok(bytes) 562}