this repo has no description
1use crate::api::EmptyResponse; 2use crate::api::error::ApiError; 3use crate::cache::Cache; 4use crate::plc::PlcClient; 5use crate::state::AppState; 6use crate::types::{Handle, PlainPassword}; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::verify; 14use chrono::{Duration, Utc}; 15use cid::Cid; 16use jacquard_repo::commit::Commit; 17use jacquard_repo::storage::BlockStore; 18use k256::ecdsa::SigningKey; 19use serde::{Deserialize, Serialize}; 20use std::str::FromStr; 21use std::sync::Arc; 22use tracing::{error, info, warn}; 23use uuid::Uuid; 24 25#[derive(Serialize)] 26#[serde(rename_all = "camelCase")] 27pub struct CheckAccountStatusOutput { 28 pub activated: bool, 29 pub valid_did: bool, 30 pub repo_commit: String, 31 pub repo_rev: String, 32 pub repo_blocks: i64, 33 pub indexed_records: i64, 34 pub private_state_values: i64, 35 pub expected_blobs: i64, 36 pub imported_blobs: i64, 37} 38 39pub async fn check_account_status( 40 State(state): State<AppState>, 41 headers: axum::http::HeaderMap, 42) -> Response { 43 let extracted = match crate::auth::extract_auth_token_from_header( 44 headers.get("Authorization").and_then(|h| h.to_str().ok()), 45 ) { 46 Some(t) => t, 47 None => return ApiError::AuthenticationRequired.into_response(), 48 }; 49 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 50 let http_uri = format!( 51 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 52 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 53 ); 54 let did = match crate::auth::validate_token_with_dpop( 55 &state.db, 56 &extracted.token, 57 extracted.is_dpop, 58 dpop_proof, 59 "GET", 60 &http_uri, 61 true, 62 false, 63 ) 64 .await 65 { 66 Ok(user) => user.did, 67 Err(e) => return ApiError::from(e).into_response(), 68 }; 69 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 70 .fetch_optional(&state.db) 71 .await 72 { 73 Ok(Some(id)) => id, 74 _ => { 75 return ApiError::InternalError(None).into_response(); 76 } 77 }; 78 let user_status = sqlx::query!( 79 "SELECT deactivated_at FROM users WHERE did = $1", 80 did.as_str() 81 ) 82 .fetch_optional(&state.db) 83 .await; 84 let deactivated_at = match user_status { 85 Ok(Some(row)) => row.deactivated_at, 86 _ => None, 87 }; 88 let repo_result = sqlx::query!( 89 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 90 user_id 91 ) 92 .fetch_optional(&state.db) 93 .await; 94 let (repo_commit, repo_rev_from_db) = match repo_result { 95 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 96 _ => (String::new(), None), 97 }; 98 let block_count: i64 = sqlx::query_scalar!( 99 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", 100 user_id 101 ) 102 .fetch_one(&state.db) 103 .await 104 .unwrap_or(Some(0)) 105 .unwrap_or(0); 106 let repo_rev = if let Some(rev) = repo_rev_from_db { 107 rev 108 } else if !repo_commit.is_empty() { 109 if let Ok(cid) = Cid::from_str(&repo_commit) { 110 if let Ok(Some(block)) = state.block_store.get(&cid).await { 111 Commit::from_cbor(&block) 112 .ok() 113 .map(|c| c.rev().to_string()) 114 .unwrap_or_default() 115 } else { 116 String::new() 117 } 118 } else { 119 String::new() 120 } 121 } else { 122 String::new() 123 }; 124 let record_count: i64 = 125 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 126 .fetch_one(&state.db) 127 .await 128 .unwrap_or(Some(0)) 129 .unwrap_or(0); 130 let imported_blobs: i64 = sqlx::query_scalar!( 131 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 132 user_id 133 ) 134 .fetch_one(&state.db) 135 .await 136 .unwrap_or(Some(0)) 137 .unwrap_or(0); 138 let expected_blobs: i64 = sqlx::query_scalar!( 139 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1", 140 user_id 141 ) 142 .fetch_one(&state.db) 143 .await 144 .unwrap_or(Some(0)) 145 .unwrap_or(0); 146 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await; 147 ( 148 StatusCode::OK, 149 Json(CheckAccountStatusOutput { 150 activated: deactivated_at.is_none(), 151 valid_did, 152 repo_commit: repo_commit.clone(), 153 repo_rev, 154 repo_blocks: block_count as i64, 155 indexed_records: record_count, 156 private_state_values: 0, 157 expected_blobs, 158 imported_blobs, 159 }), 160 ) 161 .into_response() 162} 163 164async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool { 165 assert_valid_did_document_for_service(db, cache, did, false) 166 .await 167 .is_ok() 168} 169 170async fn assert_valid_did_document_for_service( 171 db: &sqlx::PgPool, 172 cache: Arc<dyn Cache>, 173 did: &str, 174 with_retry: bool, 175) -> Result<(), ApiError> { 176 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 177 let expected_endpoint = format!("https://{}", hostname); 178 179 if did.starts_with("did:plc:") { 180 let plc_client = PlcClient::with_cache(None, Some(cache.clone())); 181 182 let max_attempts = if with_retry { 5 } else { 1 }; 183 let mut last_error = None; 184 let mut doc_data = None; 185 for attempt in 0..max_attempts { 186 if attempt > 0 { 187 let delay_ms = 500 * (1 << (attempt - 1)); 188 info!( 189 "Waiting {}ms before retry {} for DID document validation ({})", 190 delay_ms, attempt, did 191 ); 192 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 193 } 194 195 match plc_client.get_document_data(did).await { 196 Ok(data) => { 197 let pds_endpoint = data 198 .get("services") 199 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 200 .and_then(|p| p.get("endpoint")) 201 .and_then(|e| e.as_str()); 202 203 if pds_endpoint == Some(&expected_endpoint) { 204 doc_data = Some(data); 205 break; 206 } else { 207 info!( 208 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 209 attempt + 1, 210 did, 211 pds_endpoint, 212 expected_endpoint 213 ); 214 last_error = Some(format!( 215 "DID document endpoint {:?} does not match expected {}", 216 pds_endpoint, expected_endpoint 217 )); 218 } 219 } 220 Err(e) => { 221 warn!( 222 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 223 attempt + 1, 224 did, 225 e 226 ); 227 last_error = Some(format!("Could not resolve DID document: {}", e)); 228 } 229 } 230 } 231 232 let Some(doc_data) = doc_data else { 233 return Err(ApiError::InvalidRequest( 234 last_error.unwrap_or_else(|| "DID document validation failed".to_string()), 235 )); 236 }; 237 238 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 239 if let Some(ref expected_rotation_key) = server_rotation_key { 240 let rotation_keys = doc_data 241 .get("rotationKeys") 242 .and_then(|v| v.as_array()) 243 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>()) 244 .unwrap_or_default(); 245 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 246 return Err(ApiError::InvalidRequest( 247 "Server rotation key not included in PLC DID data".into(), 248 )); 249 } 250 } 251 252 let doc_signing_key = doc_data 253 .get("verificationMethods") 254 .and_then(|v| v.get("atproto")) 255 .and_then(|k| k.as_str()); 256 257 let user_row = sqlx::query!( 258 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 259 did 260 ) 261 .fetch_optional(db) 262 .await 263 .map_err(|e| { 264 error!("Failed to fetch user key: {:?}", e); 265 ApiError::InternalError(None) 266 })?; 267 268 if let Some(row) = user_row { 269 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 270 .map_err(|e| { 271 error!("Failed to decrypt user key: {}", e); 272 ApiError::InternalError(None) 273 })?; 274 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 275 error!("Failed to create signing key: {:?}", e); 276 ApiError::InternalError(None) 277 })?; 278 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 279 280 if doc_signing_key != Some(&expected_did_key) { 281 warn!( 282 "DID {} has signing key {:?}, expected {}", 283 did, doc_signing_key, expected_did_key 284 ); 285 return Err(ApiError::InvalidRequest( 286 "DID document verification method does not match expected signing key".into(), 287 )); 288 } 289 } 290 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 291 let client = crate::api::proxy_client::did_resolution_client(); 292 let decoded = host_and_path.replace("%3A", ":"); 293 let parts: Vec<&str> = decoded.split(':').collect(); 294 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 295 { 296 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 297 } else { 298 (parts[0].to_string(), parts[1..].to_vec()) 299 }; 300 let scheme = 301 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 302 "http" 303 } else { 304 "https" 305 }; 306 let url = if path_parts.is_empty() { 307 format!("{}://{}/.well-known/did.json", scheme, host) 308 } else { 309 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 310 }; 311 let resp = client.get(&url).send().await.map_err(|e| { 312 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 313 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e)) 314 })?; 315 let doc: serde_json::Value = resp.json().await.map_err(|e| { 316 warn!("Failed to parse did:web document for {}: {:?}", did, e); 317 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e)) 318 })?; 319 320 let pds_endpoint = doc 321 .get("service") 322 .and_then(|s| s.as_array()) 323 .and_then(|arr| { 324 arr.iter().find(|svc| { 325 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 326 || svc.get("type").and_then(|t| t.as_str()) 327 == Some("AtprotoPersonalDataServer") 328 }) 329 }) 330 .and_then(|svc| svc.get("serviceEndpoint")) 331 .and_then(|e| e.as_str()); 332 333 if pds_endpoint != Some(&expected_endpoint) { 334 warn!( 335 "DID {} has endpoint {:?}, expected {}", 336 did, pds_endpoint, expected_endpoint 337 ); 338 return Err(ApiError::InvalidRequest( 339 "DID document atproto_pds service endpoint does not match PDS public url".into(), 340 )); 341 } 342 } 343 344 Ok(()) 345} 346 347pub async fn activate_account( 348 State(state): State<AppState>, 349 headers: axum::http::HeaderMap, 350) -> Response { 351 info!("[MIGRATION] activateAccount called"); 352 let extracted = match crate::auth::extract_auth_token_from_header( 353 headers.get("Authorization").and_then(|h| h.to_str().ok()), 354 ) { 355 Some(t) => t, 356 None => { 357 info!("[MIGRATION] activateAccount: No auth token"); 358 return ApiError::AuthenticationRequired.into_response(); 359 } 360 }; 361 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 362 let http_uri = format!( 363 "https://{}/xrpc/com.atproto.server.activateAccount", 364 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 365 ); 366 let auth_user = match crate::auth::validate_token_with_dpop( 367 &state.db, 368 &extracted.token, 369 extracted.is_dpop, 370 dpop_proof, 371 "POST", 372 &http_uri, 373 true, 374 false, 375 ) 376 .await 377 { 378 Ok(user) => user, 379 Err(e) => { 380 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 381 return ApiError::from(e).into_response(); 382 } 383 }; 384 info!( 385 "[MIGRATION] activateAccount: Authenticated user did={}", 386 auth_user.did 387 ); 388 389 if let Err(e) = crate::auth::scope_check::check_account_scope( 390 auth_user.is_oauth, 391 auth_user.scope.as_deref(), 392 crate::oauth::scopes::AccountAttr::Repo, 393 crate::oauth::scopes::AccountAction::Manage, 394 ) { 395 info!("[MIGRATION] activateAccount: Scope check failed"); 396 return e; 397 } 398 399 let did = auth_user.did; 400 401 info!( 402 "[MIGRATION] activateAccount: Validating DID document for did={}", 403 did 404 ); 405 let did_validation_start = std::time::Instant::now(); 406 if let Err(e) = 407 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true) 408 .await 409 { 410 info!( 411 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 412 did, 413 did_validation_start.elapsed() 414 ); 415 return e.into_response(); 416 } 417 info!( 418 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 419 did, 420 did_validation_start.elapsed() 421 ); 422 423 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 424 .fetch_optional(&state.db) 425 .await 426 .ok() 427 .flatten(); 428 info!( 429 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 430 did, handle 431 ); 432 let result = sqlx::query!( 433 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 434 did.as_str() 435 ) 436 .execute(&state.db) 437 .await; 438 match result { 439 Ok(_) => { 440 info!( 441 "[MIGRATION] activateAccount: DB update success for did={}", 442 did 443 ); 444 if let Some(ref h) = handle { 445 let _ = state.cache.delete(&format!("handle:{}", h)).await; 446 } 447 info!( 448 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 449 did 450 ); 451 if let Err(e) = 452 crate::api::repo::record::sequence_account_event(&state, &did, true, None) 453 .await 454 { 455 warn!( 456 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 457 e 458 ); 459 } else { 460 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 461 } 462 info!( 463 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 464 did, handle 465 ); 466 let handle_typed = handle.as_ref().map(|h| Handle::new_unchecked(h)); 467 if let Err(e) = crate::api::repo::record::sequence_identity_event( 468 &state, 469 &did, 470 handle_typed.as_ref(), 471 ) 472 .await 473 { 474 warn!( 475 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 476 e 477 ); 478 } else { 479 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 480 } 481 let repo_root = sqlx::query_scalar!( 482 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 483 did.as_str() 484 ) 485 .fetch_optional(&state.db) 486 .await 487 .ok() 488 .flatten(); 489 if let Some(root_cid) = repo_root { 490 info!( 491 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 492 did, root_cid 493 ); 494 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 495 if let Ok(Some(block)) = state.block_store.get(&cid).await { 496 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 497 } else { 498 None 499 } 500 } else { 501 None 502 }; 503 if let Err(e) = crate::api::repo::record::sequence_sync_event( 504 &state, 505 &did, 506 &root_cid, 507 rev.as_deref(), 508 ) 509 .await 510 { 511 warn!( 512 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 513 e 514 ); 515 } else { 516 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 517 } 518 } else { 519 warn!( 520 "[MIGRATION] activateAccount: No repo root found for did={}", 521 did 522 ); 523 } 524 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 525 EmptyResponse::ok().into_response() 526 } 527 Err(e) => { 528 error!( 529 "[MIGRATION] activateAccount: DB error activating account: {:?}", 530 e 531 ); 532 ApiError::InternalError(None).into_response() 533 } 534 } 535} 536 537#[derive(Deserialize)] 538#[serde(rename_all = "camelCase")] 539pub struct DeactivateAccountInput { 540 pub delete_after: Option<String>, 541} 542 543pub async fn deactivate_account( 544 State(state): State<AppState>, 545 headers: axum::http::HeaderMap, 546 Json(input): Json<DeactivateAccountInput>, 547) -> Response { 548 let extracted = match crate::auth::extract_auth_token_from_header( 549 headers.get("Authorization").and_then(|h| h.to_str().ok()), 550 ) { 551 Some(t) => t, 552 None => return ApiError::AuthenticationRequired.into_response(), 553 }; 554 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 555 let http_uri = format!( 556 "https://{}/xrpc/com.atproto.server.deactivateAccount", 557 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 558 ); 559 let auth_user = match crate::auth::validate_token_with_dpop( 560 &state.db, 561 &extracted.token, 562 extracted.is_dpop, 563 dpop_proof, 564 "POST", 565 &http_uri, 566 false, 567 false, 568 ) 569 .await 570 { 571 Ok(user) => user, 572 Err(e) => return ApiError::from(e).into_response(), 573 }; 574 575 if let Err(e) = crate::auth::scope_check::check_account_scope( 576 auth_user.is_oauth, 577 auth_user.scope.as_deref(), 578 crate::oauth::scopes::AccountAttr::Repo, 579 crate::oauth::scopes::AccountAction::Manage, 580 ) { 581 return e; 582 } 583 584 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 585 .delete_after 586 .as_ref() 587 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 588 .map(|dt| dt.with_timezone(&chrono::Utc)); 589 590 let did = auth_user.did; 591 592 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 593 .fetch_optional(&state.db) 594 .await 595 .ok() 596 .flatten(); 597 598 let result = sqlx::query!( 599 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 600 did.as_str(), 601 delete_after 602 ) 603 .execute(&state.db) 604 .await; 605 606 match result { 607 Ok(_) => { 608 if let Some(ref h) = handle { 609 let _ = state.cache.delete(&format!("handle:{}", h)).await; 610 } 611 if let Err(e) = crate::api::repo::record::sequence_account_event( 612 &state, 613 &did, 614 false, 615 Some("deactivated"), 616 ) 617 .await 618 { 619 warn!("Failed to sequence account deactivated event: {}", e); 620 } 621 EmptyResponse::ok().into_response() 622 } 623 Err(e) => { 624 error!("DB error deactivating account: {:?}", e); 625 ApiError::InternalError(None).into_response() 626 } 627 } 628} 629 630pub async fn request_account_delete( 631 State(state): State<AppState>, 632 headers: axum::http::HeaderMap, 633) -> Response { 634 let extracted = match crate::auth::extract_auth_token_from_header( 635 headers.get("Authorization").and_then(|h| h.to_str().ok()), 636 ) { 637 Some(t) => t, 638 None => return ApiError::AuthenticationRequired.into_response(), 639 }; 640 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 641 let http_uri = format!( 642 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 643 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 644 ); 645 let validated = match crate::auth::validate_token_with_dpop( 646 &state.db, 647 &extracted.token, 648 extracted.is_dpop, 649 dpop_proof, 650 "POST", 651 &http_uri, 652 true, 653 false, 654 ) 655 .await 656 { 657 Ok(user) => user, 658 Err(e) => return ApiError::from(e).into_response(), 659 }; 660 let did = validated.did.clone(); 661 662 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await { 663 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str()) 664 .await; 665 } 666 667 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 668 .fetch_optional(&state.db) 669 .await 670 { 671 Ok(Some(id)) => id, 672 _ => { 673 return ApiError::InternalError(None).into_response(); 674 } 675 }; 676 let confirmation_token = Uuid::new_v4().to_string(); 677 let expires_at = Utc::now() + Duration::minutes(15); 678 let insert = sqlx::query!( 679 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 680 confirmation_token, 681 did.as_str(), 682 expires_at 683 ) 684 .execute(&state.db) 685 .await; 686 if let Err(e) = insert { 687 error!("DB error creating deletion token: {:?}", e); 688 return ApiError::InternalError(None).into_response(); 689 } 690 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 691 if let Err(e) = 692 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 693 .await 694 { 695 warn!("Failed to enqueue account deletion notification: {:?}", e); 696 } 697 info!("Account deletion requested for user {}", did); 698 EmptyResponse::ok().into_response() 699} 700 701#[derive(Deserialize)] 702pub struct DeleteAccountInput { 703 pub did: crate::types::Did, 704 pub password: PlainPassword, 705 pub token: String, 706} 707 708pub async fn delete_account( 709 State(state): State<AppState>, 710 Json(input): Json<DeleteAccountInput>, 711) -> Response { 712 let did = &input.did; 713 let password = &input.password; 714 let token = input.token.trim(); 715 if password.is_empty() { 716 return ApiError::InvalidRequest("password is required".into()).into_response(); 717 } 718 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 719 if password.len() > OLD_PASSWORD_MAX_LENGTH { 720 return ApiError::InvalidRequest("Invalid password length".into()).into_response(); 721 } 722 if token.is_empty() { 723 return ApiError::InvalidToken(Some("token is required".into())).into_response(); 724 } 725 let user = sqlx::query!( 726 "SELECT id, password_hash, handle FROM users WHERE did = $1", 727 did.as_str() 728 ) 729 .fetch_optional(&state.db) 730 .await; 731 let (user_id, password_hash, handle) = match user { 732 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 733 Ok(None) => { 734 return ApiError::InvalidRequest("account not found".into()).into_response(); 735 } 736 Err(e) => { 737 error!("DB error in delete_account: {:?}", e); 738 return ApiError::InternalError(None).into_response(); 739 } 740 }; 741 let password_valid = if password_hash 742 .as_ref() 743 .map(|h| verify(password, h).unwrap_or(false)) 744 .unwrap_or(false) 745 { 746 true 747 } else { 748 let app_pass_rows = sqlx::query!( 749 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 750 user_id 751 ) 752 .fetch_all(&state.db) 753 .await 754 .unwrap_or_default(); 755 app_pass_rows 756 .iter() 757 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 758 }; 759 if !password_valid { 760 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response(); 761 } 762 let deletion_request = sqlx::query!( 763 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 764 token 765 ) 766 .fetch_optional(&state.db) 767 .await; 768 let (token_did, expires_at) = match deletion_request { 769 Ok(Some(row)) => (row.did, row.expires_at), 770 Ok(None) => { 771 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response(); 772 } 773 Err(e) => { 774 error!("DB error fetching deletion token: {:?}", e); 775 return ApiError::InternalError(None).into_response(); 776 } 777 }; 778 if token_did != did.as_str() { 779 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response(); 780 } 781 if Utc::now() > expires_at { 782 let _ = sqlx::query!( 783 "DELETE FROM account_deletion_requests WHERE token = $1", 784 token 785 ) 786 .execute(&state.db) 787 .await; 788 return ApiError::ExpiredToken(None).into_response(); 789 } 790 let mut tx = match state.db.begin().await { 791 Ok(tx) => tx, 792 Err(e) => { 793 error!("Failed to begin transaction: {:?}", e); 794 return ApiError::InternalError(None).into_response(); 795 } 796 }; 797 let deletion_result: Result<(), sqlx::Error> = async { 798 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 799 .execute(&mut *tx) 800 .await?; 801 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 802 .execute(&mut *tx) 803 .await?; 804 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 805 .execute(&mut *tx) 806 .await?; 807 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 808 .execute(&mut *tx) 809 .await?; 810 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 811 .execute(&mut *tx) 812 .await?; 813 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 814 .execute(&mut *tx) 815 .await?; 816 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 817 .execute(&mut *tx) 818 .await?; 819 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 820 .execute(&mut *tx) 821 .await?; 822 Ok(()) 823 } 824 .await; 825 match deletion_result { 826 Ok(()) => { 827 if let Err(e) = tx.commit().await { 828 error!("Failed to commit account deletion transaction: {:?}", e); 829 return ApiError::InternalError(None).into_response(); 830 } 831 let account_seq = crate::api::repo::record::sequence_account_event( 832 &state, 833 did, 834 false, 835 Some("deleted"), 836 ) 837 .await; 838 match account_seq { 839 Ok(seq) => { 840 if let Err(e) = sqlx::query!( 841 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 842 did, 843 seq 844 ) 845 .execute(&state.db) 846 .await 847 { 848 warn!( 849 "Failed to cleanup sequences for deleted account {}: {}", 850 did, e 851 ); 852 } 853 } 854 Err(e) => { 855 warn!( 856 "Failed to sequence account deletion event for {}: {}", 857 did, e 858 ); 859 } 860 } 861 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 862 info!("Account {} deleted successfully", did); 863 EmptyResponse::ok().into_response() 864 } 865 Err(e) => { 866 error!("DB error deleting account, rolling back: {:?}", e); 867 ApiError::InternalError(None).into_response() 868 } 869 } 870}