this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::frame::{ 4 AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame, 5 InfoFrame, SyncFrame, 6}; 7use axum::Json; 8use axum::http::StatusCode; 9use axum::response::{IntoResponse, Response}; 10use bytes::Bytes; 11use cid::Cid; 12use iroh_car::{CarHeader, CarWriter}; 13use jacquard_repo::commit::Commit; 14use jacquard_repo::storage::BlockStore; 15use serde::Serialize; 16use serde_json::json; 17use sqlx::PgPool; 18use std::collections::{BTreeMap, HashMap}; 19use std::io::Cursor; 20use std::str::FromStr; 21use tokio::io::AsyncWriteExt; 22 23#[derive(Debug, Clone, PartialEq, Eq, Serialize)] 24#[serde(rename_all = "lowercase")] 25pub enum AccountStatus { 26 Active, 27 Takendown, 28 Suspended, 29 Deactivated, 30 Deleted, 31} 32 33impl AccountStatus { 34 pub fn as_str(&self) -> Option<&'static str> { 35 match self { 36 AccountStatus::Active => None, 37 AccountStatus::Takendown => Some("takendown"), 38 AccountStatus::Suspended => Some("suspended"), 39 AccountStatus::Deactivated => Some("deactivated"), 40 AccountStatus::Deleted => Some("deleted"), 41 } 42 } 43 44 pub fn is_active(&self) -> bool { 45 matches!(self, AccountStatus::Active) 46 } 47} 48 49pub struct RepoAccount { 50 pub did: String, 51 pub user_id: uuid::Uuid, 52 pub status: AccountStatus, 53 pub repo_root_cid: Option<String>, 54} 55 56pub enum RepoAvailabilityError { 57 NotFound(String), 58 Takendown(String), 59 Deactivated(String), 60 Internal(String), 61} 62 63impl IntoResponse for RepoAvailabilityError { 64 fn into_response(self) -> Response { 65 match self { 66 RepoAvailabilityError::NotFound(did) => ( 67 StatusCode::BAD_REQUEST, 68 Json(json!({ 69 "error": "RepoNotFound", 70 "message": format!("Could not find repo for DID: {}", did) 71 })), 72 ) 73 .into_response(), 74 RepoAvailabilityError::Takendown(did) => ( 75 StatusCode::BAD_REQUEST, 76 Json(json!({ 77 "error": "RepoTakendown", 78 "message": format!("Repo has been takendown: {}", did) 79 })), 80 ) 81 .into_response(), 82 RepoAvailabilityError::Deactivated(did) => ( 83 StatusCode::BAD_REQUEST, 84 Json(json!({ 85 "error": "RepoDeactivated", 86 "message": format!("Repo has been deactivated: {}", did) 87 })), 88 ) 89 .into_response(), 90 RepoAvailabilityError::Internal(msg) => ( 91 StatusCode::INTERNAL_SERVER_ERROR, 92 Json(json!({ 93 "error": "InternalError", 94 "message": msg 95 })), 96 ) 97 .into_response(), 98 } 99 } 100} 101 102pub async fn get_account_with_status( 103 db: &PgPool, 104 did: &str, 105) -> Result<Option<RepoAccount>, sqlx::Error> { 106 let row = sqlx::query!( 107 r#" 108 SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid 109 FROM users u 110 LEFT JOIN repos r ON r.user_id = u.id 111 WHERE u.did = $1 112 "#, 113 did 114 ) 115 .fetch_optional(db) 116 .await?; 117 118 Ok(row.map(|r| { 119 let status = if r.takedown_ref.is_some() { 120 AccountStatus::Takendown 121 } else if r.deactivated_at.is_some() { 122 AccountStatus::Deactivated 123 } else { 124 AccountStatus::Active 125 }; 126 127 RepoAccount { 128 did: r.did, 129 user_id: r.id, 130 status, 131 repo_root_cid: Some(r.repo_root_cid), 132 } 133 })) 134} 135 136pub async fn assert_repo_availability( 137 db: &PgPool, 138 did: &str, 139 is_admin_or_self: bool, 140) -> Result<RepoAccount, RepoAvailabilityError> { 141 let account = get_account_with_status(db, did) 142 .await 143 .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?; 144 145 let account = match account { 146 Some(a) => a, 147 None => return Err(RepoAvailabilityError::NotFound(did.to_string())), 148 }; 149 150 if is_admin_or_self { 151 return Ok(account); 152 } 153 154 match account.status { 155 AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())), 156 AccountStatus::Deactivated => { 157 return Err(RepoAvailabilityError::Deactivated(did.to_string())); 158 } 159 _ => {} 160 } 161 162 Ok(account) 163} 164 165fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 166 Commit::from_cbor(commit_bytes) 167 .ok() 168 .map(|c| c.rev().to_string()) 169} 170 171async fn write_car_blocks( 172 commit_cid: Cid, 173 commit_bytes: Option<Bytes>, 174 other_blocks: BTreeMap<Cid, Bytes>, 175) -> Result<Vec<u8>, anyhow::Error> { 176 let mut buffer = Cursor::new(Vec::new()); 177 let header = CarHeader::new_v1(vec![commit_cid]); 178 let mut writer = CarWriter::new(header, &mut buffer); 179 for (cid, data) in other_blocks { 180 if cid != commit_cid { 181 writer 182 .write(cid, data.as_ref()) 183 .await 184 .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 185 } 186 } 187 if let Some(data) = commit_bytes { 188 writer 189 .write(commit_cid, data.as_ref()) 190 .await 191 .map_err(|e| anyhow::anyhow!("writing commit block: {}", e))?; 192 } 193 writer 194 .finish() 195 .await 196 .map_err(|e| anyhow::anyhow!("finalizing CAR: {}", e))?; 197 buffer 198 .flush() 199 .await 200 .map_err(|e| anyhow::anyhow!("flushing CAR buffer: {}", e))?; 201 Ok(buffer.into_inner()) 202} 203 204fn format_atproto_time(dt: chrono::DateTime<chrono::Utc>) -> String { 205 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() 206} 207 208fn format_identity_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 209 let frame = IdentityFrame { 210 did: event.did.clone(), 211 handle: event.handle.clone(), 212 seq: event.seq, 213 time: format_atproto_time(event.created_at), 214 }; 215 let header = FrameHeader { 216 op: 1, 217 t: "#identity".to_string(), 218 }; 219 let mut bytes = Vec::new(); 220 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 221 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 222 Ok(bytes) 223} 224 225fn format_account_event(event: &SequencedEvent) -> Result<Vec<u8>, anyhow::Error> { 226 let frame = AccountFrame { 227 did: event.did.clone(), 228 active: event.active.unwrap_or(true), 229 status: event.status.clone(), 230 seq: event.seq, 231 time: format_atproto_time(event.created_at), 232 }; 233 let header = FrameHeader { 234 op: 1, 235 t: "#account".to_string(), 236 }; 237 let mut bytes = Vec::new(); 238 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 239 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 240 let hex_str: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); 241 tracing::info!( 242 did = %frame.did, 243 active = frame.active, 244 status = ?frame.status, 245 cbor_len = bytes.len(), 246 cbor_hex = %hex_str, 247 "Sending account event to firehose" 248 ); 249 Ok(bytes) 250} 251 252async fn format_sync_event( 253 state: &AppState, 254 event: &SequencedEvent, 255) -> Result<Vec<u8>, anyhow::Error> { 256 let commit_cid_str = event 257 .commit_cid 258 .as_ref() 259 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 260 let commit_cid = Cid::from_str(commit_cid_str)?; 261 let commit_bytes = state 262 .block_store 263 .get(&commit_cid) 264 .await? 265 .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; 266 let rev = if let Some(ref stored_rev) = event.rev { 267 stored_rev.clone() 268 } else { 269 extract_rev_from_commit_bytes(&commit_bytes) 270 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 271 }; 272 let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; 273 let frame = SyncFrame { 274 did: event.did.clone(), 275 rev, 276 blocks: car_bytes, 277 seq: event.seq, 278 time: format_atproto_time(event.created_at), 279 }; 280 let header = FrameHeader { 281 op: 1, 282 t: "#sync".to_string(), 283 }; 284 let mut bytes = Vec::new(); 285 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 286 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 287 Ok(bytes) 288} 289 290pub async fn format_event_for_sending( 291 state: &AppState, 292 event: SequencedEvent, 293) -> Result<Vec<u8>, anyhow::Error> { 294 match event.event_type.as_str() { 295 "identity" => return format_identity_event(&event), 296 "account" => return format_account_event(&event), 297 "sync" => return format_sync_event(state, &event).await, 298 _ => {} 299 } 300 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 301 let prev_cid_str = event.prev_cid.clone(); 302 let prev_data_cid_str = event.prev_data_cid.clone(); 303 let mut frame: CommitFrame = event 304 .try_into() 305 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 306 if let Some(ref pdc) = prev_data_cid_str 307 && let Ok(cid) = Cid::from_str(pdc) 308 { 309 frame.prev_data = Some(cid); 310 } 311 let commit_cid = frame.commit; 312 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 313 let mut all_cids: Vec<Cid> = block_cids_str 314 .iter() 315 .filter_map(|s| Cid::from_str(s).ok()) 316 .filter(|c| Some(*c) != prev_cid) 317 .collect(); 318 if !all_cids.contains(&commit_cid) { 319 all_cids.push(commit_cid); 320 } 321 if let Some(ref pc) = prev_cid 322 && let Ok(Some(prev_bytes)) = state.block_store.get(pc).await 323 && let Some(rev) = extract_rev_from_commit_bytes(&prev_bytes) 324 { 325 frame.since = Some(rev); 326 } 327 let car_bytes = if !all_cids.is_empty() { 328 let fetched = state.block_store.get_many(&all_cids).await?; 329 let mut blocks = std::collections::BTreeMap::new(); 330 let mut commit_bytes: Option<Bytes> = None; 331 for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 332 if let Some(data) = data_opt { 333 if *cid == commit_cid { 334 commit_bytes = Some(data.clone()); 335 if let Some(rev) = extract_rev_from_commit_bytes(data) { 336 frame.rev = rev; 337 } 338 } else { 339 blocks.insert(*cid, data.clone()); 340 } 341 } 342 } 343 write_car_blocks(commit_cid, commit_bytes, blocks).await? 344 } else { 345 Vec::new() 346 }; 347 frame.blocks = car_bytes; 348 let header = FrameHeader { 349 op: 1, 350 t: "#commit".to_string(), 351 }; 352 let mut bytes = Vec::new(); 353 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 354 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 355 Ok(bytes) 356} 357 358pub async fn prefetch_blocks_for_events( 359 state: &AppState, 360 events: &[SequencedEvent], 361) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 362 let mut all_cids: Vec<Cid> = Vec::new(); 363 for event in events { 364 if let Some(ref commit_cid_str) = event.commit_cid 365 && let Ok(cid) = Cid::from_str(commit_cid_str) 366 { 367 all_cids.push(cid); 368 } 369 if let Some(ref prev_cid_str) = event.prev_cid 370 && let Ok(cid) = Cid::from_str(prev_cid_str) 371 { 372 all_cids.push(cid); 373 } 374 if let Some(ref block_cids_str) = event.blocks_cids { 375 for s in block_cids_str { 376 if let Ok(cid) = Cid::from_str(s) { 377 all_cids.push(cid); 378 } 379 } 380 } 381 } 382 all_cids.sort(); 383 all_cids.dedup(); 384 if all_cids.is_empty() { 385 return Ok(HashMap::new()); 386 } 387 let fetched = state.block_store.get_many(&all_cids).await?; 388 let mut blocks_map = HashMap::new(); 389 for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 390 if let Some(data) = data_opt { 391 blocks_map.insert(cid, data); 392 } 393 } 394 Ok(blocks_map) 395} 396 397fn format_sync_event_with_prefetched( 398 event: &SequencedEvent, 399 prefetched: &HashMap<Cid, Bytes>, 400) -> Result<Vec<u8>, anyhow::Error> { 401 let commit_cid_str = event 402 .commit_cid 403 .as_ref() 404 .ok_or_else(|| anyhow::anyhow!("Sync event missing commit_cid"))?; 405 let commit_cid = Cid::from_str(commit_cid_str)?; 406 let commit_bytes = prefetched 407 .get(&commit_cid) 408 .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; 409 let rev = if let Some(ref stored_rev) = event.rev { 410 stored_rev.clone() 411 } else { 412 extract_rev_from_commit_bytes(commit_bytes) 413 .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? 414 }; 415 let car_bytes = futures::executor::block_on(write_car_blocks( 416 commit_cid, 417 Some(commit_bytes.clone()), 418 BTreeMap::new(), 419 ))?; 420 let frame = SyncFrame { 421 did: event.did.clone(), 422 rev, 423 blocks: car_bytes, 424 seq: event.seq, 425 time: format_atproto_time(event.created_at), 426 }; 427 let header = FrameHeader { 428 op: 1, 429 t: "#sync".to_string(), 430 }; 431 let mut bytes = Vec::new(); 432 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 433 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 434 Ok(bytes) 435} 436 437pub async fn format_event_with_prefetched_blocks( 438 event: SequencedEvent, 439 prefetched: &HashMap<Cid, Bytes>, 440) -> Result<Vec<u8>, anyhow::Error> { 441 match event.event_type.as_str() { 442 "identity" => return format_identity_event(&event), 443 "account" => return format_account_event(&event), 444 "sync" => return format_sync_event_with_prefetched(&event, prefetched), 445 _ => {} 446 } 447 let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 448 let prev_cid_str = event.prev_cid.clone(); 449 let prev_data_cid_str = event.prev_data_cid.clone(); 450 let mut frame: CommitFrame = event 451 .try_into() 452 .map_err(|e| anyhow::anyhow!("Invalid event: {}", e))?; 453 if let Some(ref pdc) = prev_data_cid_str 454 && let Ok(cid) = Cid::from_str(pdc) 455 { 456 frame.prev_data = Some(cid); 457 } 458 let commit_cid = frame.commit; 459 let prev_cid = prev_cid_str.as_ref().and_then(|s| Cid::from_str(s).ok()); 460 let mut all_cids: Vec<Cid> = block_cids_str 461 .iter() 462 .filter_map(|s| Cid::from_str(s).ok()) 463 .filter(|c| Some(*c) != prev_cid) 464 .collect(); 465 if !all_cids.contains(&commit_cid) { 466 all_cids.push(commit_cid); 467 } 468 if let Some(commit_bytes) = prefetched.get(&commit_cid) 469 && let Some(rev) = extract_rev_from_commit_bytes(commit_bytes) 470 { 471 frame.rev = rev; 472 } 473 if let Some(ref pc) = prev_cid 474 && let Some(prev_bytes) = prefetched.get(pc) 475 && let Some(rev) = extract_rev_from_commit_bytes(prev_bytes) 476 { 477 frame.since = Some(rev); 478 } 479 let car_bytes = if !all_cids.is_empty() { 480 let mut blocks = BTreeMap::new(); 481 let mut commit_bytes_for_car: Option<Bytes> = None; 482 for cid in all_cids { 483 if let Some(data) = prefetched.get(&cid) { 484 if cid == commit_cid { 485 commit_bytes_for_car = Some(data.clone()); 486 } else { 487 blocks.insert(cid, data.clone()); 488 } 489 } 490 } 491 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 492 } else { 493 Vec::new() 494 }; 495 frame.blocks = car_bytes; 496 let header = FrameHeader { 497 op: 1, 498 t: "#commit".to_string(), 499 }; 500 let mut bytes = Vec::new(); 501 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 502 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 503 Ok(bytes) 504} 505 506pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 507 let header = FrameHeader { 508 op: 1, 509 t: "#info".to_string(), 510 }; 511 let frame = InfoFrame { 512 name: name.to_string(), 513 message: message.map(String::from), 514 }; 515 let mut bytes = Vec::new(); 516 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 517 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 518 Ok(bytes) 519} 520 521pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 522 let header = ErrorFrameHeader { op: -1 }; 523 let frame = ErrorFrameBody { 524 error: error.to_string(), 525 message: message.map(String::from), 526 }; 527 let mut bytes = Vec::new(); 528 serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 529 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 530 Ok(bytes) 531}