this repo has no description
at main 18 kB view raw
1use crate::api::error::ApiError; 2use crate::state::AppState; 3use crate::sync::firehose::SequencedEvent; 4use crate::sync::frame::{ 5 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame, 6 InfoFrame, SyncFrame, 7}; 8use axum::response::{IntoResponse, Response}; 9use bytes::Bytes; 10use cid::Cid; 11use iroh_car::{CarHeader, CarWriter}; 12use jacquard_repo::commit::Commit; 13use jacquard_repo::storage::BlockStore; 14use serde::Serialize; 15use sqlx::PgPool; 16use std::collections::{BTreeMap, HashMap}; 17use std::io::Cursor; 18use std::str::FromStr; 19use tokio::io::AsyncWriteExt; 20 21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] 22#[serde(rename_all = "lowercase")] 23pub enum AccountStatus { 24 Active, 25 Takendown, 26 Suspended, 27 Deactivated, 28 Deleted, 29} 30 31impl AccountStatus { 32 pub fn as_str(&self) -> Option<&'static str> { 33 match self { 34 Self::Active => None, 35 Self::Takendown => Some("takendown"), 36 Self::Suspended => Some("suspended"), 37 Self::Deactivated => Some("deactivated"), 38 Self::Deleted => Some("deleted"), 39 } 40 } 41 42 pub fn is_active(&self) -> bool { 43 matches!(self, Self::Active) 44 } 45 46 pub fn is_takendown(&self) -> bool { 47 matches!(self, Self::Takendown) 48 } 49 50 pub fn is_suspended(&self) -> bool { 51 matches!(self, Self::Suspended) 52 } 53 54 pub fn is_deactivated(&self) -> bool { 55 matches!(self, Self::Deactivated) 56 } 57 58 pub fn is_deleted(&self) -> bool { 59 matches!(self, Self::Deleted) 60 } 61 62 pub fn allows_read(&self) -> bool { 63 matches!(self, Self::Active | Self::Deactivated) 64 } 65 66 pub fn allows_write(&self) -> bool { 67 matches!(self, Self::Active) 68 } 69 70 pub fn from_db_fields( 71 takedown_ref: Option<&str>, 72 deactivated_at: Option<chrono::DateTime<chrono::Utc>>, 73 ) -> Self { 74 if takedown_ref.is_some() { 75 Self::Takendown 76 } else if deactivated_at.is_some() { 77 Self::Deactivated 78 } else { 79 Self::Active 80 } 81 } 82} 83 84impl From<crate::types::AccountState> for AccountStatus { 85 fn from(state: crate::types::AccountState) -> Self { 86 match state { 87 crate::types::AccountState::Active => AccountStatus::Active, 88 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated, 89 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown, 90 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated, 91 } 92 } 93} 94 95impl From<&crate::types::AccountState> for AccountStatus { 96 fn from(state: &crate::types::AccountState) -> Self { 97 match state { 98 crate::types::AccountState::Active => AccountStatus::Active, 99 crate::types::AccountState::Deactivated { .. } => AccountStatus::Deactivated, 100 crate::types::AccountState::TakenDown { .. } => AccountStatus::Takendown, 101 crate::types::AccountState::Migrated { .. } => AccountStatus::Deactivated, 102 } 103 } 104} 105 106pub struct RepoAccount { 107 pub did: String, 108 pub user_id: uuid::Uuid, 109 pub status: AccountStatus, 110 pub repo_root_cid: Option<String>, 111} 112 113pub enum RepoAvailabilityError { 114 NotFound(String), 115 Takendown(String), 116 Deactivated(String), 117 Internal(String), 118} 119 120impl IntoResponse for RepoAvailabilityError { 121 fn into_response(self) -> Response { 122 match self { 123 RepoAvailabilityError::NotFound(did) => { 124 ApiError::RepoNotFound(Some(format!("Could not find repo for DID: {}", did))) 125 .into_response() 126 } 127 RepoAvailabilityError::Takendown(_) => ApiError::RepoTakendown.into_response(), 128 RepoAvailabilityError::Deactivated(_) => ApiError::RepoDeactivated.into_response(), 129 RepoAvailabilityError::Internal(msg) => { 130 ApiError::InternalError(Some(msg)).into_response() 131 } 132 } 133 } 134} 135 136pub async fn get_account_with_status( 137 db: &PgPool, 138 did: &str, 139) -> Result<Option<RepoAccount>, sqlx::Error> { 140 let row = sqlx::query!( 141 r#" 142 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid 143 FROM users u 144 LEFT JOIN repos r ON r.user_id = u.id 145 WHERE u.did = $1 146 "#, 147 did 148 ) 149 .fetch_optional(db) 150 .await?; 151 152 Ok(row.map(|r| { 153 let status = if r.takedown_ref.is_some() { 154 AccountStatus::Takendown 155 } else if r.deactivated_at.is_some() { 156 AccountStatus::Deactivated 157 } else { 158 AccountStatus::Active 159 }; 160 161 RepoAccount { 162 did: r.did, 163 user_id: r.id, 164 status, 165 repo_root_cid: Some(r.repo_root_cid), 166 } 167 })) 168} 169 170pub async fn assert_repo_availability( 171 db: &PgPool, 172 did: &str, 173 is_admin_or_self: bool, 174) -> Result<RepoAccount, RepoAvailabilityError> { 175 let account = get_account_with_status(db, did) 176 .await 177 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?; 178 179 let account = match account { 180 Some(a) => a, 181 None => return Err(RepoAvailabilityError::NotFound(did.to_string())), 182 }; 183 184 if is_admin_or_self { 185 return Ok(account); 186 } 187 188 match account.status { 189 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())), 190 AccountStatus::Deactivated => { 191 return Err(RepoAvailabilityError::Deactivated(did.to_string())); 192 } 193 _ => {} 194 } 195 196 Ok(account) 197} 198 199fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 200 Commit::from_cbor(commit_bytes) 201 .ok() 202 .map(|c| c.rev().to_string()) 203} 204 205async fn write_car_blocks( 206 commit_cid: Cid, 207 commit_bytes: Option<Bytes>, 208 other_blocks: BTreeMap<Cid, Bytes>, 209) -> Result<Vec<u8>, anyhow::Error> { 210 let mut buffer = Cursor::new(Vec::new()); 211 let header = CarHeader::new_v1(vec![commit_cid]); 212 let mut writer = CarWriter::new(header, &mut buffer); 213 for (cid, data) in other_blocks.iter().filter(|(c, _)| **c != commit_cid) { 214 writer 215 .write(*cid, data.as_ref()) 216 .await 217 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 218 } 219 if let Some(data) = commit_bytes { 220 writer 221 .write(commit_cid, data.as_ref()) 222 .await 223 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 224 } 225 writer 226 .finish() 227 .await 228 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 229 buffer 230 .flush() 231 .await 232 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 233 Ok(buffer.into_inner()) 234} 235 236fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 237 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 238} 239 240fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 241 let frame = IdentityFrame { 242 did: event.did.clone(), 243 handle: event.handle.clone(), 244 seq: event.seq, 245 time: format_atproto_time(event.created_at), 246 }; 247 let header = FrameHeader { 248 op: 1, 249 t: "#identity".to_string(), 250 }; 251 let mut bytes = Vec::with_capacity(256); 252 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 253 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 254 Ok(bytes) 255} 256 257fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 258 let frame = AccountFrame { 259 did: event.did.clone(), 260 active: event.active.unwrap_or(true), 261 status: event.status.clone(), 262 seq: event.seq, 263 time: format_atproto_time(event.created_at), 264 }; 265 let header = FrameHeader { 266 op: 1, 267 t: "#account".to_string(), 268 }; 269 let mut bytes = Vec::with_capacity(256); 270 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 271 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 272 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); 273 tracing::info!( 274 did = %frame.did, 275 active = frame.active, 276 status = ?frame.status, 277 cbor_len = bytes.len(), 278 cbor_hex = %hex_str, 279 "Sending account event to firehose" 280 ); 281 Ok(bytes) 282} 283 284async fn format_sync_event( 285 state: &AppState, 286 event: &SequencedEvent, 287) -> Result<Vec<u8>, anyhow::Error> { 288 let commit_cid_str = event 289 .commit_cid 290 .as_ref() 291 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 292 let commit_cid = Cid::from_str(commit_cid_str)?; 293 let commit_bytes = state 294 .block_store 295 .get(&commit_cid) 296 .await? 297 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 298 let rev = if let Some(ref stored_rev) = event.rev { 299 stored_rev.clone() 300 } else { 301 extract_rev_from_commit_bytes(&commit_bytes) 302 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 303 }; 304 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 305 let frame = SyncFrame { 306 did: event.did.clone(), 307 rev, 308 blocks: car_bytes, 309 seq: event.seq, 310 time: format_atproto_time(event.created_at), 311 }; 312 let header = FrameHeader { 313 op: 1, 314 t: "#sync".to_string(), 315 }; 316 let mut bytes = Vec::with_capacity(512); 317 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 318 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 319 Ok(bytes) 320} 321 322pub async fn format_event_for_sending( 323 state: &AppState, 324 event: SequencedEvent, 325) -> Result<Vec<u8>, anyhow::Error> { 326 match event.event_type.as_str() { 327 "identity" => return format_identity_event(&event), 328 "account" => return format_account_event(&event), 329 "sync" => return format_sync_event(state, &event).await, 330 _ => {} 331 } 332 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 333 let prev_cid_str = event.prev_cid.clone(); 334 let prev_data_cid_str = event.prev_data_cid.clone(); 335 let mut frame: CommitFrame = event 336 .try_into() 337 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 338 if let Some(ref pdc) = prev_data_cid_str 339 && let Ok(cid) = Cid::from_str(pdc) 340 { 341 frame.prev_data = Some(cid); 342 } 343 let commit_cid = frame.commit; 344 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 345 let mut all_cids: Vec<Cid> = block_cids_str 346 .iter() 347 .filter_map(|s| Cid::from_str(s).ok()) 348 .filter(|c| Some(*c) != prev_cid) 349 .collect(); 350 if !all_cids.contains(&commit_cid) { 351 all_cids.push(commit_cid); 352 } 353 if let Some(ref pc) = prev_cid 354 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 355 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) 356 { 357 frame.since = Some(rev); 358 } 359 let car_bytes = if !all_cids.is_empty() { 360 let fetched = state.block_store.get_many(&all_cids).await?; 361 let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids 362 .iter() 363 .zip(fetched.iter()) 364 .filter_map(|(cid, data_opt)| data_opt.as_ref().map(|data| (*cid, data.clone()))) 365 .partition(|(cid, _)| *cid == commit_cid); 366 let commit_bytes = commit_data.into_iter().next().map(|(_, data)| data); 367 if let Some(ref cb) = commit_bytes 368 && let Some(rev) = extract_rev_from_commit_bytes(cb) 369 { 370 frame.rev = rev; 371 } 372 let blocks: std::collections::BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect(); 373 write_car_blocks(commit_cid, commit_bytes, blocks).await? 374 } else { 375 Vec::new() 376 }; 377 frame.blocks = car_bytes; 378 let header = FrameHeader { 379 op: 1, 380 t: "#commit".to_string(), 381 }; 382 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512); 383 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 384 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 385 Ok(bytes) 386} 387 388pub async fn prefetch_blocks_for_events( 389 state: &AppState, 390 events: &[SequencedEvent], 391) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 392 let mut all_cids: Vec<Cid> = events 393 .iter() 394 .flat_map(|event| { 395 let commit_cid = event 396 .commit_cid 397 .as_ref() 398 .and_then(|s| Cid::from_str(s).ok()); 399 let prev_cid = event.prev_cid.as_ref().and_then(|s| Cid::from_str(s).ok()); 400 let block_cids = event 401 .blocks_cids 402 .as_ref() 403 .map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect()) 404 .unwrap_or_else(Vec::new); 405 commit_cid.into_iter().chain(prev_cid).chain(block_cids) 406 }) 407 .collect(); 408 all_cids.sort(); 409 all_cids.dedup(); 410 if all_cids.is_empty() { 411 return Ok(HashMap::new()); 412 } 413 let fetched = state.block_store.get_many(&all_cids).await?; 414 let blocks_map: HashMap<Cid, Bytes> = all_cids 415 .into_iter() 416 .zip(fetched) 417 .filter_map(|(cid, data_opt)| data_opt.map(|data| (cid, data))) 418 .collect(); 419 Ok(blocks_map) 420} 421 422fn format_sync_event_with_prefetched( 423 event: &SequencedEvent, 424 prefetched: &HashMap<Cid, Bytes>, 425) -> Result<Vec<u8>, anyhow::Error> { 426 let commit_cid_str = event 427 .commit_cid 428 .as_ref() 429 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 430 let commit_cid = Cid::from_str(commit_cid_str)?; 431 let commit_bytes = prefetched 432 .get(&commit_cid) 433 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 434 let rev = if let Some(ref stored_rev) = event.rev { 435 stored_rev.clone() 436 } else { 437 extract_rev_from_commit_bytes(commit_bytes) 438 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 439 }; 440 let car_bytes = futures::executor::block_on(write_car_blocks( 441 commit_cid, 442 Some(commit_bytes.clone()), 443 BTreeMap::new(), 444 ))?; 445 let frame = SyncFrame { 446 did: event.did.clone(), 447 rev, 448 blocks: car_bytes, 449 seq: event.seq, 450 time: format_atproto_time(event.created_at), 451 }; 452 let header = FrameHeader { 453 op: 1, 454 t: "#sync".to_string(), 455 }; 456 let mut bytes = Vec::new(); 457 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 458 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 459 Ok(bytes) 460} 461 462pub async fn format_event_with_prefetched_blocks( 463 event: SequencedEvent, 464 prefetched: &HashMap<Cid, Bytes>, 465) -> Result<Vec<u8>, anyhow::Error> { 466 match event.event_type.as_str() { 467 "identity" => return format_identity_event(&event), 468 "account" => return format_account_event(&event), 469 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 470 _ => {} 471 } 472 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 473 let prev_cid_str = event.prev_cid.clone(); 474 let prev_data_cid_str = event.prev_data_cid.clone(); 475 let mut frame: CommitFrame = event 476 .try_into() 477 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 478 if let Some(ref pdc) = prev_data_cid_str 479 && let Ok(cid) = Cid::from_str(pdc) 480 { 481 frame.prev_data = Some(cid); 482 } 483 let commit_cid = frame.commit; 484 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 485 let mut all_cids: Vec<Cid> = block_cids_str 486 .iter() 487 .filter_map(|s| Cid::from_str(s).ok()) 488 .filter(|c| Some(*c) != prev_cid) 489 .collect(); 490 if !all_cids.contains(&commit_cid) { 491 all_cids.push(commit_cid); 492 } 493 if let Some(commit_bytes) = prefetched.get(&commit_cid) 494 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) 495 { 496 frame.rev = rev; 497 } 498 if let Some(ref pc) = prev_cid 499 && let Some(prev_bytes) = prefetched.get(pc) 500 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) 501 { 502 frame.since = Some(rev); 503 } 504 let car_bytes = if !all_cids.is_empty() { 505 let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids 506 .into_iter() 507 .filter_map(|cid| prefetched.get(&cid).map(|data| (cid, data.clone()))) 508 .partition(|(cid, _)| *cid == commit_cid); 509 let commit_bytes_for_car = commit_data.into_iter().next().map(|(_, data)| data); 510 let blocks: BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect(); 511 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 512 } else { 513 Vec::new() 514 }; 515 frame.blocks = car_bytes; 516 let header = FrameHeader { 517 op: 1, 518 t: "#commit".to_string(), 519 }; 520 let mut bytes = Vec::with_capacity(frame.blocks.len() + 512); 521 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 522 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 523 Ok(bytes) 524} 525 526pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 527 let header = FrameHeader { 528 op: 1, 529 t: "#info".to_string(), 530 }; 531 let frame = InfoFrame { 532 name: name.to_string(), 533 message: message.map(String::from), 534 }; 535 let mut bytes = Vec::with_capacity(128); 536 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 537 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 538 Ok(bytes) 539} 540 541pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 542 let header = ErrorFrameHeader { op: -1 }; 543 let frame = ErrorFrameBody { 544 error: error.to_string(), 545 message: message.map(String::from), 546 }; 547 let mut bytes = Vec::with_capacity(128); 548 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 549 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 550 Ok(bytes) 551}