this repo has no description
1use crate::api::ApiError; 2use crate::plc::PlcClient; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::{Duration, Utc}; 12use cid::Cid; 13use jacquard_repo::commit::Commit; 14use jacquard_repo::storage::BlockStore; 15use k256::ecdsa::SigningKey; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::str::FromStr; 19use tracing::{error, info, warn}; 20use uuid::Uuid; 21 22#[derive(Serialize)] 23#[serde(rename_all = "camelCase")] 24pub struct CheckAccountStatusOutput { 25 pub activated: bool, 26 pub valid_did: bool, 27 pub repo_commit: String, 28 pub repo_rev: String, 29 pub repo_blocks: i64, 30 pub indexed_records: i64, 31 pub private_state_values: i64, 32 pub expected_blobs: i64, 33 pub imported_blobs: i64, 34} 35 36pub async fn check_account_status( 37 State(state): State<AppState>, 38 headers: axum::http::HeaderMap, 39) -> Response { 40 let extracted = match crate::auth::extract_auth_token_from_header( 41 headers.get("Authorization").and_then(|h| h.to_str().ok()), 42 ) { 43 Some(t) => t, 44 None => return ApiError::AuthenticationRequired.into_response(), 45 }; 46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 47 let http_uri = format!( 48 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 50 ); 51 let did = match crate::auth::validate_token_with_dpop( 52 &state.db, 53 &extracted.token, 54 extracted.is_dpop, 55 dpop_proof, 56 "GET", 57 &http_uri, 58 true, 59 ) 60 .await 61 { 62 Ok(user) => user.did, 63 Err(e) => return ApiError::from(e).into_response(), 64 }; 65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 66 .fetch_optional(&state.db) 67 .await 68 { 69 Ok(Some(id)) => id, 70 _ => { 71 return ( 72 StatusCode::INTERNAL_SERVER_ERROR, 73 Json(json!({"error": "InternalError"})), 74 ) 75 .into_response(); 76 } 77 }; 78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 79 .fetch_optional(&state.db) 80 .await; 81 let deactivated_at = match user_status { 82 Ok(Some(row)) => row.deactivated_at, 83 _ => None, 84 }; 85 let repo_result = sqlx::query!( 86 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 87 user_id 88 ) 89 .fetch_optional(&state.db) 90 .await; 91 let repo_commit = match repo_result { 92 Ok(Some(row)) => row.repo_root_cid, 93 _ => String::new(), 94 }; 95 let record_count: i64 = 96 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 97 .fetch_one(&state.db) 98 .await 99 .unwrap_or(Some(0)) 100 .unwrap_or(0); 101 let blob_count: i64 = sqlx::query_scalar!( 102 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 103 user_id 104 ) 105 .fetch_one(&state.db) 106 .await 107 .unwrap_or(Some(0)) 108 .unwrap_or(0); 109 let valid_did = did.starts_with("did:"); 110 ( 111 StatusCode::OK, 112 Json(CheckAccountStatusOutput { 113 activated: deactivated_at.is_none(), 114 valid_did, 115 repo_commit: repo_commit.clone(), 116 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 117 repo_blocks: 0, 118 indexed_records: record_count, 119 private_state_values: 0, 120 expected_blobs: blob_count, 121 imported_blobs: blob_count, 122 }), 123 ) 124 .into_response() 125} 126 127async fn assert_valid_did_document_for_service( 128 db: &sqlx::PgPool, 129 did: &str, 130) -> Result<(), (StatusCode, Json<serde_json::Value>)> { 131 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 132 let expected_endpoint = format!("https://{}", hostname); 133 134 if did.starts_with("did:plc:") { 135 let plc_client = PlcClient::new(None); 136 137 let mut last_error = None; 138 let mut doc_data = None; 139 for attempt in 0..5 { 140 if attempt > 0 { 141 let delay_ms = 500 * (1 << (attempt - 1)); 142 info!( 143 "Waiting {}ms before retry {} for DID document validation ({})", 144 delay_ms, attempt, did 145 ); 146 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 147 } 148 149 match plc_client.get_document_data(did).await { 150 Ok(data) => { 151 let pds_endpoint = data 152 .get("services") 153 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 154 .and_then(|p| p.get("endpoint")) 155 .and_then(|e| e.as_str()); 156 157 if pds_endpoint == Some(&expected_endpoint) { 158 doc_data = Some(data); 159 break; 160 } else { 161 info!( 162 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 163 attempt + 1, 164 did, 165 pds_endpoint, 166 expected_endpoint 167 ); 168 last_error = Some(format!( 169 "DID document endpoint {:?} does not match expected {}", 170 pds_endpoint, expected_endpoint 171 )); 172 } 173 } 174 Err(e) => { 175 warn!( 176 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 177 attempt + 1, 178 did, 179 e 180 ); 181 last_error = Some(format!("Could not resolve DID document: {}", e)); 182 } 183 } 184 } 185 186 let doc_data = match doc_data { 187 Some(d) => d, 188 None => { 189 return Err(( 190 StatusCode::BAD_REQUEST, 191 Json(json!({ 192 "error": "InvalidRequest", 193 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string()) 194 })), 195 )); 196 } 197 }; 198 199 let doc_signing_key = doc_data 200 .get("verificationMethods") 201 .and_then(|v| v.get("atproto")) 202 .and_then(|k| k.as_str()); 203 204 let user_row = sqlx::query!( 205 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 206 did 207 ) 208 .fetch_optional(db) 209 .await 210 .map_err(|e| { 211 error!("Failed to fetch user key: {:?}", e); 212 ( 213 StatusCode::INTERNAL_SERVER_ERROR, 214 Json(json!({"error": "InternalError"})), 215 ) 216 })?; 217 218 if let Some(row) = user_row { 219 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 220 .map_err(|e| { 221 error!("Failed to decrypt user key: {}", e); 222 ( 223 StatusCode::INTERNAL_SERVER_ERROR, 224 Json(json!({"error": "InternalError"})), 225 ) 226 })?; 227 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 228 error!("Failed to create signing key: {:?}", e); 229 ( 230 StatusCode::INTERNAL_SERVER_ERROR, 231 Json(json!({"error": "InternalError"})), 232 ) 233 })?; 234 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 235 236 if doc_signing_key != Some(&expected_did_key) { 237 warn!( 238 "DID {} has signing key {:?}, expected {}", 239 did, doc_signing_key, expected_did_key 240 ); 241 return Err(( 242 StatusCode::BAD_REQUEST, 243 Json(json!({ 244 "error": "InvalidRequest", 245 "message": "DID document verification method does not match expected signing key" 246 })), 247 )); 248 } 249 } 250 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 251 let client = reqwest::Client::new(); 252 let decoded = host_and_path.replace("%3A", ":"); 253 let parts: Vec<&str> = decoded.split(':').collect(); 254 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 255 { 256 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 257 } else { 258 (parts[0].to_string(), parts[1..].to_vec()) 259 }; 260 let scheme = 261 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 262 "http" 263 } else { 264 "https" 265 }; 266 let url = if path_parts.is_empty() { 267 format!("{}://{}/.well-known/did.json", scheme, host) 268 } else { 269 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 270 }; 271 let resp = client.get(&url).send().await.map_err(|e| { 272 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 273 ( 274 StatusCode::BAD_REQUEST, 275 Json(json!({ 276 "error": "InvalidRequest", 277 "message": format!("Could not resolve DID document: {}", e) 278 })), 279 ) 280 })?; 281 let doc: serde_json::Value = resp.json().await.map_err(|e| { 282 warn!("Failed to parse did:web document for {}: {:?}", did, e); 283 ( 284 StatusCode::BAD_REQUEST, 285 Json(json!({ 286 "error": "InvalidRequest", 287 "message": format!("Could not parse DID document: {}", e) 288 })), 289 ) 290 })?; 291 292 let pds_endpoint = doc 293 .get("service") 294 .and_then(|s| s.as_array()) 295 .and_then(|arr| { 296 arr.iter().find(|svc| { 297 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 298 || svc.get("type").and_then(|t| t.as_str()) 299 == Some("AtprotoPersonalDataServer") 300 }) 301 }) 302 .and_then(|svc| svc.get("serviceEndpoint")) 303 .and_then(|e| e.as_str()); 304 305 if pds_endpoint != Some(&expected_endpoint) { 306 warn!( 307 "DID {} has endpoint {:?}, expected {}", 308 did, pds_endpoint, expected_endpoint 309 ); 310 return Err(( 311 StatusCode::BAD_REQUEST, 312 Json(json!({ 313 "error": "InvalidRequest", 314 "message": "DID document atproto_pds service endpoint does not match PDS public url" 315 })), 316 )); 317 } 318 } 319 320 Ok(()) 321} 322 323pub async fn activate_account( 324 State(state): State<AppState>, 325 headers: axum::http::HeaderMap, 326) -> Response { 327 info!("[MIGRATION] activateAccount called"); 328 let extracted = match crate::auth::extract_auth_token_from_header( 329 headers.get("Authorization").and_then(|h| h.to_str().ok()), 330 ) { 331 Some(t) => t, 332 None => { 333 info!("[MIGRATION] activateAccount: No auth token"); 334 return ApiError::AuthenticationRequired.into_response(); 335 } 336 }; 337 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 338 let http_uri = format!( 339 "https://{}/xrpc/com.atproto.server.activateAccount", 340 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 341 ); 342 let auth_user = match crate::auth::validate_token_with_dpop( 343 &state.db, 344 &extracted.token, 345 extracted.is_dpop, 346 dpop_proof, 347 "POST", 348 &http_uri, 349 true, 350 ) 351 .await 352 { 353 Ok(user) => user, 354 Err(e) => { 355 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 356 return ApiError::from(e).into_response(); 357 } 358 }; 359 info!( 360 "[MIGRATION] activateAccount: Authenticated user did={}", 361 auth_user.did 362 ); 363 364 if let Err(e) = crate::auth::scope_check::check_account_scope( 365 auth_user.is_oauth, 366 auth_user.scope.as_deref(), 367 crate::oauth::scopes::AccountAttr::Repo, 368 crate::oauth::scopes::AccountAction::Manage, 369 ) { 370 info!("[MIGRATION] activateAccount: Scope check failed"); 371 return e; 372 } 373 374 let did = auth_user.did; 375 376 info!( 377 "[MIGRATION] activateAccount: Validating DID document for did={}", 378 did 379 ); 380 let did_validation_start = std::time::Instant::now(); 381 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did).await { 382 info!( 383 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 384 did, 385 did_validation_start.elapsed() 386 ); 387 return (status, json).into_response(); 388 } 389 info!( 390 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 391 did, 392 did_validation_start.elapsed() 393 ); 394 395 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 396 .fetch_optional(&state.db) 397 .await 398 .ok() 399 .flatten(); 400 info!( 401 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 402 did, handle 403 ); 404 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 405 .execute(&state.db) 406 .await; 407 match result { 408 Ok(_) => { 409 info!( 410 "[MIGRATION] activateAccount: DB update success for did={}", 411 did 412 ); 413 if let Some(ref h) = handle { 414 let _ = state.cache.delete(&format!("handle:{}", h)).await; 415 } 416 info!( 417 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 418 did 419 ); 420 if let Err(e) = 421 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 422 { 423 warn!( 424 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 425 e 426 ); 427 } else { 428 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 429 } 430 info!( 431 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 432 did, handle 433 ); 434 if let Err(e) = 435 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 436 .await 437 { 438 warn!( 439 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 440 e 441 ); 442 } else { 443 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 444 } 445 let repo_root = sqlx::query_scalar!( 446 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 447 did 448 ) 449 .fetch_optional(&state.db) 450 .await 451 .ok() 452 .flatten(); 453 if let Some(root_cid) = repo_root { 454 info!( 455 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 456 did, root_cid 457 ); 458 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 459 if let Ok(Some(block)) = state.block_store.get(&cid).await { 460 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 461 } else { 462 None 463 } 464 } else { 465 None 466 }; 467 if let Err(e) = crate::api::repo::record::sequence_sync_event( 468 &state, 469 &did, 470 &root_cid, 471 rev.as_deref(), 472 ) 473 .await 474 { 475 warn!( 476 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 477 e 478 ); 479 } else { 480 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 481 } 482 } else { 483 warn!( 484 "[MIGRATION] activateAccount: No repo root found for did={}", 485 did 486 ); 487 } 488 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 489 (StatusCode::OK, Json(json!({}))).into_response() 490 } 491 Err(e) => { 492 error!( 493 "[MIGRATION] activateAccount: DB error activating account: {:?}", 494 e 495 ); 496 ( 497 StatusCode::INTERNAL_SERVER_ERROR, 498 Json(json!({"error": "InternalError"})), 499 ) 500 .into_response() 501 } 502 } 503} 504 505#[derive(Deserialize)] 506#[serde(rename_all = "camelCase")] 507pub struct DeactivateAccountInput { 508 pub delete_after: Option<String>, 509} 510 511pub async fn deactivate_account( 512 State(state): State<AppState>, 513 headers: axum::http::HeaderMap, 514 Json(_input): Json<DeactivateAccountInput>, 515) -> Response { 516 let extracted = match crate::auth::extract_auth_token_from_header( 517 headers.get("Authorization").and_then(|h| h.to_str().ok()), 518 ) { 519 Some(t) => t, 520 None => return ApiError::AuthenticationRequired.into_response(), 521 }; 522 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 523 let http_uri = format!( 524 "https://{}/xrpc/com.atproto.server.deactivateAccount", 525 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 526 ); 527 let auth_user = match crate::auth::validate_token_with_dpop( 528 &state.db, 529 &extracted.token, 530 extracted.is_dpop, 531 dpop_proof, 532 "POST", 533 &http_uri, 534 false, 535 ) 536 .await 537 { 538 Ok(user) => user, 539 Err(e) => return ApiError::from(e).into_response(), 540 }; 541 542 if let Err(e) = crate::auth::scope_check::check_account_scope( 543 auth_user.is_oauth, 544 auth_user.scope.as_deref(), 545 crate::oauth::scopes::AccountAttr::Repo, 546 crate::oauth::scopes::AccountAction::Manage, 547 ) { 548 return e; 549 } 550 551 let did = auth_user.did; 552 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 553 .fetch_optional(&state.db) 554 .await 555 .ok() 556 .flatten(); 557 let result = sqlx::query!( 558 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 559 did 560 ) 561 .execute(&state.db) 562 .await; 563 match result { 564 Ok(_) => { 565 if let Some(ref h) = handle { 566 let _ = state.cache.delete(&format!("handle:{}", h)).await; 567 } 568 if let Err(e) = crate::api::repo::record::sequence_account_event( 569 &state, 570 &did, 571 false, 572 Some("deactivated"), 573 ) 574 .await 575 { 576 warn!("Failed to sequence account deactivation event: {}", e); 577 } 578 (StatusCode::OK, Json(json!({}))).into_response() 579 } 580 Err(e) => { 581 error!("DB error deactivating account: {:?}", e); 582 ( 583 StatusCode::INTERNAL_SERVER_ERROR, 584 Json(json!({"error": "InternalError"})), 585 ) 586 .into_response() 587 } 588 } 589} 590 591pub async fn request_account_delete( 592 State(state): State<AppState>, 593 headers: axum::http::HeaderMap, 594) -> Response { 595 let extracted = match crate::auth::extract_auth_token_from_header( 596 headers.get("Authorization").and_then(|h| h.to_str().ok()), 597 ) { 598 Some(t) => t, 599 None => return ApiError::AuthenticationRequired.into_response(), 600 }; 601 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 602 let http_uri = format!( 603 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 604 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 605 ); 606 let validated = match crate::auth::validate_token_with_dpop( 607 &state.db, 608 &extracted.token, 609 extracted.is_dpop, 610 dpop_proof, 611 "POST", 612 &http_uri, 613 true, 614 ) 615 .await 616 { 617 Ok(user) => user, 618 Err(e) => return ApiError::from(e).into_response(), 619 }; 620 let did = validated.did.clone(); 621 622 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await { 623 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await; 624 } 625 626 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 627 .fetch_optional(&state.db) 628 .await 629 { 630 Ok(Some(id)) => id, 631 _ => { 632 return ( 633 StatusCode::INTERNAL_SERVER_ERROR, 634 Json(json!({"error": "InternalError"})), 635 ) 636 .into_response(); 637 } 638 }; 639 let confirmation_token = Uuid::new_v4().to_string(); 640 let expires_at = Utc::now() + Duration::minutes(15); 641 let insert = sqlx::query!( 642 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 643 confirmation_token, 644 did, 645 expires_at 646 ) 647 .execute(&state.db) 648 .await; 649 if let Err(e) = insert { 650 error!("DB error creating deletion token: {:?}", e); 651 return ( 652 StatusCode::INTERNAL_SERVER_ERROR, 653 Json(json!({"error": "InternalError"})), 654 ) 655 .into_response(); 656 } 657 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 658 if let Err(e) = 659 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 660 .await 661 { 662 warn!("Failed to enqueue account deletion notification: {:?}", e); 663 } 664 info!("Account deletion requested for user {}", did); 665 (StatusCode::OK, Json(json!({}))).into_response() 666} 667 668#[derive(Deserialize)] 669pub struct DeleteAccountInput { 670 pub did: String, 671 pub password: String, 672 pub token: String, 673} 674 675pub async fn delete_account( 676 State(state): State<AppState>, 677 Json(input): Json<DeleteAccountInput>, 678) -> Response { 679 let did = input.did.trim(); 680 let password = &input.password; 681 let token = input.token.trim(); 682 if did.is_empty() { 683 return ( 684 StatusCode::BAD_REQUEST, 685 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 686 ) 687 .into_response(); 688 } 689 if password.is_empty() { 690 return ( 691 StatusCode::BAD_REQUEST, 692 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 693 ) 694 .into_response(); 695 } 696 if token.is_empty() { 697 return ( 698 StatusCode::BAD_REQUEST, 699 Json(json!({"error": "InvalidToken", "message": "token is required"})), 700 ) 701 .into_response(); 702 } 703 let user = sqlx::query!( 704 "SELECT id, password_hash, handle FROM users WHERE did = $1", 705 did 706 ) 707 .fetch_optional(&state.db) 708 .await; 709 let (user_id, password_hash, handle) = match user { 710 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 711 Ok(None) => { 712 return ( 713 StatusCode::BAD_REQUEST, 714 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 715 ) 716 .into_response(); 717 } 718 Err(e) => { 719 error!("DB error in delete_account: {:?}", e); 720 return ( 721 StatusCode::INTERNAL_SERVER_ERROR, 722 Json(json!({"error": "InternalError"})), 723 ) 724 .into_response(); 725 } 726 }; 727 let password_valid = if password_hash 728 .as_ref() 729 .map(|h| verify(password, h).unwrap_or(false)) 730 .unwrap_or(false) 731 { 732 true 733 } else { 734 let app_pass_rows = sqlx::query!( 735 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 736 user_id 737 ) 738 .fetch_all(&state.db) 739 .await 740 .unwrap_or_default(); 741 app_pass_rows 742 .iter() 743 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 744 }; 745 if !password_valid { 746 return ( 747 StatusCode::UNAUTHORIZED, 748 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 749 ) 750 .into_response(); 751 } 752 let deletion_request = sqlx::query!( 753 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 754 token 755 ) 756 .fetch_optional(&state.db) 757 .await; 758 let (token_did, expires_at) = match deletion_request { 759 Ok(Some(row)) => (row.did, row.expires_at), 760 Ok(None) => { 761 return ( 762 StatusCode::BAD_REQUEST, 763 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 764 ) 765 .into_response(); 766 } 767 Err(e) => { 768 error!("DB error fetching deletion token: {:?}", e); 769 return ( 770 StatusCode::INTERNAL_SERVER_ERROR, 771 Json(json!({"error": "InternalError"})), 772 ) 773 .into_response(); 774 } 775 }; 776 if token_did != did { 777 return ( 778 StatusCode::BAD_REQUEST, 779 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 780 ) 781 .into_response(); 782 } 783 if Utc::now() > expires_at { 784 let _ = sqlx::query!( 785 "DELETE FROM account_deletion_requests WHERE token = $1", 786 token 787 ) 788 .execute(&state.db) 789 .await; 790 return ( 791 StatusCode::BAD_REQUEST, 792 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 793 ) 794 .into_response(); 795 } 796 let mut tx = match state.db.begin().await { 797 Ok(tx) => tx, 798 Err(e) => { 799 error!("Failed to begin transaction: {:?}", e); 800 return ( 801 StatusCode::INTERNAL_SERVER_ERROR, 802 Json(json!({"error": "InternalError"})), 803 ) 804 .into_response(); 805 } 806 }; 807 let deletion_result: Result<(), sqlx::Error> = async { 808 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 809 .execute(&mut *tx) 810 .await?; 811 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 812 .execute(&mut *tx) 813 .await?; 814 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 815 .execute(&mut *tx) 816 .await?; 817 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 818 .execute(&mut *tx) 819 .await?; 820 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 821 .execute(&mut *tx) 822 .await?; 823 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 824 .execute(&mut *tx) 825 .await?; 826 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 827 .execute(&mut *tx) 828 .await?; 829 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 830 .execute(&mut *tx) 831 .await?; 832 Ok(()) 833 } 834 .await; 835 match deletion_result { 836 Ok(()) => { 837 if let Err(e) = tx.commit().await { 838 error!("Failed to commit account deletion transaction: {:?}", e); 839 return ( 840 StatusCode::INTERNAL_SERVER_ERROR, 841 Json(json!({"error": "InternalError"})), 842 ) 843 .into_response(); 844 } 845 if let Err(e) = crate::api::repo::record::sequence_account_event( 846 &state, 847 did, 848 false, 849 Some("deleted"), 850 ) 851 .await 852 { 853 warn!( 854 "Failed to sequence account deletion event for {}: {}", 855 did, e 856 ); 857 } 858 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 859 info!("Account {} deleted successfully", did); 860 (StatusCode::OK, Json(json!({}))).into_response() 861 } 862 Err(e) => { 863 error!("DB error deleting account, rolling back: {:?}", e); 864 ( 865 StatusCode::INTERNAL_SERVER_ERROR, 866 Json(json!({"error": "InternalError"})), 867 ) 868 .into_response() 869 } 870 } 871}