this repo has no description
1use crate::api::ApiError; 2use crate::plc::PlcClient; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::{Duration, Utc}; 12use cid::Cid; 13use jacquard_repo::commit::Commit; 14use jacquard_repo::storage::BlockStore; 15use k256::ecdsa::SigningKey; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::str::FromStr; 19use tracing::{error, info, warn}; 20use uuid::Uuid; 21 22#[derive(Serialize)] 23#[serde(rename_all = "camelCase")] 24pub struct CheckAccountStatusOutput { 25 pub activated: bool, 26 pub valid_did: bool, 27 pub repo_commit: String, 28 pub repo_rev: String, 29 pub repo_blocks: i64, 30 pub indexed_records: i64, 31 pub private_state_values: i64, 32 pub expected_blobs: i64, 33 pub imported_blobs: i64, 34} 35 36pub async fn check_account_status( 37 State(state): State<AppState>, 38 headers: axum::http::HeaderMap, 39) -> Response { 40 let extracted = match crate::auth::extract_auth_token_from_header( 41 headers.get("Authorization").and_then(|h| h.to_str().ok()), 42 ) { 43 Some(t) => t, 44 None => return ApiError::AuthenticationRequired.into_response(), 45 }; 46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 47 let http_uri = format!( 48 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 50 ); 51 let did = match crate::auth::validate_token_with_dpop( 52 &state.db, 53 &extracted.token, 54 extracted.is_dpop, 55 dpop_proof, 56 "GET", 57 &http_uri, 58 true, 59 ) 60 .await 61 { 62 Ok(user) => user.did, 63 Err(e) => return ApiError::from(e).into_response(), 64 }; 65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 66 .fetch_optional(&state.db) 67 .await 68 { 69 Ok(Some(id)) => id, 70 _ => { 71 return ( 72 StatusCode::INTERNAL_SERVER_ERROR, 73 Json(json!({"error": "InternalError"})), 74 ) 75 .into_response(); 76 } 77 }; 78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 79 .fetch_optional(&state.db) 80 .await; 81 let deactivated_at = match user_status { 82 Ok(Some(row)) => row.deactivated_at, 83 _ => None, 84 }; 85 let repo_result = sqlx::query!( 86 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 87 user_id 88 ) 89 .fetch_optional(&state.db) 90 .await; 91 let (repo_commit, repo_rev_from_db) = match repo_result { 92 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 93 _ => (String::new(), None), 94 }; 95 let block_count: i64 = 96 sqlx::query_scalar!("SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", user_id) 97 .fetch_one(&state.db) 98 .await 99 .unwrap_or(Some(0)) 100 .unwrap_or(0); 101 let repo_rev = if let Some(rev) = repo_rev_from_db { 102 rev 103 } else if !repo_commit.is_empty() { 104 if let Ok(cid) = Cid::from_str(&repo_commit) { 105 if let Ok(Some(block)) = state.block_store.get(&cid).await { 106 Commit::from_cbor(&block) 107 .ok() 108 .map(|c| c.rev().to_string()) 109 .unwrap_or_default() 110 } else { 111 String::new() 112 } 113 } else { 114 String::new() 115 } 116 } else { 117 String::new() 118 }; 119 let record_count: i64 = 120 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 121 .fetch_one(&state.db) 122 .await 123 .unwrap_or(Some(0)) 124 .unwrap_or(0); 125 let blob_count: i64 = sqlx::query_scalar!( 126 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 127 user_id 128 ) 129 .fetch_one(&state.db) 130 .await 131 .unwrap_or(Some(0)) 132 .unwrap_or(0); 133 let valid_did = is_valid_did_for_service(&state.db, &did).await; 134 ( 135 StatusCode::OK, 136 Json(CheckAccountStatusOutput { 137 activated: deactivated_at.is_none(), 138 valid_did, 139 repo_commit: repo_commit.clone(), 140 repo_rev, 141 repo_blocks: block_count as i64, 142 indexed_records: record_count, 143 private_state_values: 0, 144 expected_blobs: blob_count, 145 imported_blobs: blob_count, 146 }), 147 ) 148 .into_response() 149} 150 151async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool { 152 assert_valid_did_document_for_service(db, did, false) 153 .await 154 .is_ok() 155} 156 157async fn assert_valid_did_document_for_service( 158 db: &sqlx::PgPool, 159 did: &str, 160 with_retry: bool, 161) -> Result<(), (StatusCode, Json<serde_json::Value>)> { 162 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 163 let expected_endpoint = format!("https://{}", hostname); 164 165 if did.starts_with("did:plc:") { 166 let plc_client = PlcClient::new(None); 167 168 let max_attempts = if with_retry { 5 } else { 1 }; 169 let mut last_error = None; 170 let mut doc_data = None; 171 for attempt in 0..max_attempts { 172 if attempt > 0 { 173 let delay_ms = 500 * (1 << (attempt - 1)); 174 info!( 175 "Waiting {}ms before retry {} for DID document validation ({})", 176 delay_ms, attempt, did 177 ); 178 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 179 } 180 181 match plc_client.get_document_data(did).await { 182 Ok(data) => { 183 let pds_endpoint = data 184 .get("services") 185 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 186 .and_then(|p| p.get("endpoint")) 187 .and_then(|e| e.as_str()); 188 189 if pds_endpoint == Some(&expected_endpoint) { 190 doc_data = Some(data); 191 break; 192 } else { 193 info!( 194 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 195 attempt + 1, 196 did, 197 pds_endpoint, 198 expected_endpoint 199 ); 200 last_error = Some(format!( 201 "DID document endpoint {:?} does not match expected {}", 202 pds_endpoint, expected_endpoint 203 )); 204 } 205 } 206 Err(e) => { 207 warn!( 208 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 209 attempt + 1, 210 did, 211 e 212 ); 213 last_error = Some(format!("Could not resolve DID document: {}", e)); 214 } 215 } 216 } 217 218 let doc_data = match doc_data { 219 Some(d) => d, 220 None => { 221 return Err(( 222 StatusCode::BAD_REQUEST, 223 Json(json!({ 224 "error": "InvalidRequest", 225 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string()) 226 })), 227 )); 228 } 229 }; 230 231 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 232 if let Some(ref expected_rotation_key) = server_rotation_key { 233 let rotation_keys = doc_data 234 .get("rotationKeys") 235 .and_then(|v| v.as_array()) 236 .map(|arr| { 237 arr.iter() 238 .filter_map(|k| k.as_str()) 239 .collect::<Vec<_>>() 240 }) 241 .unwrap_or_default(); 242 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 243 return Err(( 244 StatusCode::BAD_REQUEST, 245 Json(json!({ 246 "error": "InvalidRequest", 247 "message": "Server rotation key not included in PLC DID data" 248 })), 249 )); 250 } 251 } 252 253 let doc_signing_key = doc_data 254 .get("verificationMethods") 255 .and_then(|v| v.get("atproto")) 256 .and_then(|k| k.as_str()); 257 258 let user_row = sqlx::query!( 259 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 260 did 261 ) 262 .fetch_optional(db) 263 .await 264 .map_err(|e| { 265 error!("Failed to fetch user key: {:?}", e); 266 ( 267 StatusCode::INTERNAL_SERVER_ERROR, 268 Json(json!({"error": "InternalError"})), 269 ) 270 })?; 271 272 if let Some(row) = user_row { 273 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 274 .map_err(|e| { 275 error!("Failed to decrypt user key: {}", e); 276 ( 277 StatusCode::INTERNAL_SERVER_ERROR, 278 Json(json!({"error": "InternalError"})), 279 ) 280 })?; 281 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 282 error!("Failed to create signing key: {:?}", e); 283 ( 284 StatusCode::INTERNAL_SERVER_ERROR, 285 Json(json!({"error": "InternalError"})), 286 ) 287 })?; 288 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 289 290 if doc_signing_key != Some(&expected_did_key) { 291 warn!( 292 "DID {} has signing key {:?}, expected {}", 293 did, doc_signing_key, expected_did_key 294 ); 295 return Err(( 296 StatusCode::BAD_REQUEST, 297 Json(json!({ 298 "error": "InvalidRequest", 299 "message": "DID document verification method does not match expected signing key" 300 })), 301 )); 302 } 303 } 304 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 305 let client = reqwest::Client::new(); 306 let decoded = host_and_path.replace("%3A", ":"); 307 let parts: Vec<&str> = decoded.split(':').collect(); 308 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 309 { 310 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 311 } else { 312 (parts[0].to_string(), parts[1..].to_vec()) 313 }; 314 let scheme = 315 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 316 "http" 317 } else { 318 "https" 319 }; 320 let url = if path_parts.is_empty() { 321 format!("{}://{}/.well-known/did.json", scheme, host) 322 } else { 323 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 324 }; 325 let resp = client.get(&url).send().await.map_err(|e| { 326 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 327 ( 328 StatusCode::BAD_REQUEST, 329 Json(json!({ 330 "error": "InvalidRequest", 331 "message": format!("Could not resolve DID document: {}", e) 332 })), 333 ) 334 })?; 335 let doc: serde_json::Value = resp.json().await.map_err(|e| { 336 warn!("Failed to parse did:web document for {}: {:?}", did, e); 337 ( 338 StatusCode::BAD_REQUEST, 339 Json(json!({ 340 "error": "InvalidRequest", 341 "message": format!("Could not parse DID document: {}", e) 342 })), 343 ) 344 })?; 345 346 let pds_endpoint = doc 347 .get("service") 348 .and_then(|s| s.as_array()) 349 .and_then(|arr| { 350 arr.iter().find(|svc| { 351 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 352 || svc.get("type").and_then(|t| t.as_str()) 353 == Some("AtprotoPersonalDataServer") 354 }) 355 }) 356 .and_then(|svc| svc.get("serviceEndpoint")) 357 .and_then(|e| e.as_str()); 358 359 if pds_endpoint != Some(&expected_endpoint) { 360 warn!( 361 "DID {} has endpoint {:?}, expected {}", 362 did, pds_endpoint, expected_endpoint 363 ); 364 return Err(( 365 StatusCode::BAD_REQUEST, 366 Json(json!({ 367 "error": "InvalidRequest", 368 "message": "DID document atproto_pds service endpoint does not match PDS public url" 369 })), 370 )); 371 } 372 } 373 374 Ok(()) 375} 376 377pub async fn activate_account( 378 State(state): State<AppState>, 379 headers: axum::http::HeaderMap, 380) -> Response { 381 info!("[MIGRATION] activateAccount called"); 382 let extracted = match crate::auth::extract_auth_token_from_header( 383 headers.get("Authorization").and_then(|h| h.to_str().ok()), 384 ) { 385 Some(t) => t, 386 None => { 387 info!("[MIGRATION] activateAccount: No auth token"); 388 return ApiError::AuthenticationRequired.into_response(); 389 } 390 }; 391 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 392 let http_uri = format!( 393 "https://{}/xrpc/com.atproto.server.activateAccount", 394 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 395 ); 396 let auth_user = match crate::auth::validate_token_with_dpop( 397 &state.db, 398 &extracted.token, 399 extracted.is_dpop, 400 dpop_proof, 401 "POST", 402 &http_uri, 403 true, 404 ) 405 .await 406 { 407 Ok(user) => user, 408 Err(e) => { 409 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 410 return ApiError::from(e).into_response(); 411 } 412 }; 413 info!( 414 "[MIGRATION] activateAccount: Authenticated user did={}", 415 auth_user.did 416 ); 417 418 if let Err(e) = crate::auth::scope_check::check_account_scope( 419 auth_user.is_oauth, 420 auth_user.scope.as_deref(), 421 crate::oauth::scopes::AccountAttr::Repo, 422 crate::oauth::scopes::AccountAction::Manage, 423 ) { 424 info!("[MIGRATION] activateAccount: Scope check failed"); 425 return e; 426 } 427 428 let did = auth_user.did; 429 430 info!( 431 "[MIGRATION] activateAccount: Validating DID document for did={}", 432 did 433 ); 434 let did_validation_start = std::time::Instant::now(); 435 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await { 436 info!( 437 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 438 did, 439 did_validation_start.elapsed() 440 ); 441 return (status, json).into_response(); 442 } 443 info!( 444 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 445 did, 446 did_validation_start.elapsed() 447 ); 448 449 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 450 .fetch_optional(&state.db) 451 .await 452 .ok() 453 .flatten(); 454 info!( 455 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 456 did, handle 457 ); 458 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 459 .execute(&state.db) 460 .await; 461 match result { 462 Ok(_) => { 463 info!( 464 "[MIGRATION] activateAccount: DB update success for did={}", 465 did 466 ); 467 if let Some(ref h) = handle { 468 let _ = state.cache.delete(&format!("handle:{}", h)).await; 469 } 470 info!( 471 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 472 did 473 ); 474 if let Err(e) = 475 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 476 { 477 warn!( 478 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 479 e 480 ); 481 } else { 482 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 483 } 484 info!( 485 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 486 did, handle 487 ); 488 if let Err(e) = 489 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 490 .await 491 { 492 warn!( 493 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 494 e 495 ); 496 } else { 497 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 498 } 499 let repo_root = sqlx::query_scalar!( 500 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 501 did 502 ) 503 .fetch_optional(&state.db) 504 .await 505 .ok() 506 .flatten(); 507 if let Some(root_cid) = repo_root { 508 info!( 509 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 510 did, root_cid 511 ); 512 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 513 if let Ok(Some(block)) = state.block_store.get(&cid).await { 514 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 515 } else { 516 None 517 } 518 } else { 519 None 520 }; 521 if let Err(e) = crate::api::repo::record::sequence_sync_event( 522 &state, 523 &did, 524 &root_cid, 525 rev.as_deref(), 526 ) 527 .await 528 { 529 warn!( 530 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 531 e 532 ); 533 } else { 534 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 535 } 536 } else { 537 warn!( 538 "[MIGRATION] activateAccount: No repo root found for did={}", 539 did 540 ); 541 } 542 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 543 (StatusCode::OK, Json(json!({}))).into_response() 544 } 545 Err(e) => { 546 error!( 547 "[MIGRATION] activateAccount: DB error activating account: {:?}", 548 e 549 ); 550 ( 551 StatusCode::INTERNAL_SERVER_ERROR, 552 Json(json!({"error": "InternalError"})), 553 ) 554 .into_response() 555 } 556 } 557} 558 559#[derive(Deserialize)] 560#[serde(rename_all = "camelCase")] 561pub struct DeactivateAccountInput { 562 pub delete_after: Option<String>, 563} 564 565pub async fn deactivate_account( 566 State(state): State<AppState>, 567 headers: axum::http::HeaderMap, 568 Json(input): Json<DeactivateAccountInput>, 569) -> Response { 570 let extracted = match crate::auth::extract_auth_token_from_header( 571 headers.get("Authorization").and_then(|h| h.to_str().ok()), 572 ) { 573 Some(t) => t, 574 None => return ApiError::AuthenticationRequired.into_response(), 575 }; 576 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 577 let http_uri = format!( 578 "https://{}/xrpc/com.atproto.server.deactivateAccount", 579 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 580 ); 581 let auth_user = match crate::auth::validate_token_with_dpop( 582 &state.db, 583 &extracted.token, 584 extracted.is_dpop, 585 dpop_proof, 586 "POST", 587 &http_uri, 588 false, 589 ) 590 .await 591 { 592 Ok(user) => user, 593 Err(e) => return ApiError::from(e).into_response(), 594 }; 595 596 if let Err(e) = crate::auth::scope_check::check_account_scope( 597 auth_user.is_oauth, 598 auth_user.scope.as_deref(), 599 crate::oauth::scopes::AccountAttr::Repo, 600 crate::oauth::scopes::AccountAction::Manage, 601 ) { 602 return e; 603 } 604 605 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 606 .delete_after 607 .as_ref() 608 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 609 .map(|dt| dt.with_timezone(&chrono::Utc)); 610 611 let did = auth_user.did; 612 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 613 .fetch_optional(&state.db) 614 .await 615 .ok() 616 .flatten(); 617 let result = sqlx::query!( 618 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 619 did, 620 delete_after 621 ) 622 .execute(&state.db) 623 .await; 624 match result { 625 Ok(_) => { 626 if let Some(ref h) = handle { 627 let _ = state.cache.delete(&format!("handle:{}", h)).await; 628 } 629 if let Err(e) = crate::api::repo::record::sequence_account_event( 630 &state, 631 &did, 632 false, 633 Some("deactivated"), 634 ) 635 .await 636 { 637 warn!("Failed to sequence account deactivation event: {}", e); 638 } 639 (StatusCode::OK, Json(json!({}))).into_response() 640 } 641 Err(e) => { 642 error!("DB error deactivating account: {:?}", e); 643 ( 644 StatusCode::INTERNAL_SERVER_ERROR, 645 Json(json!({"error": "InternalError"})), 646 ) 647 .into_response() 648 } 649 } 650} 651 652pub async fn request_account_delete( 653 State(state): State<AppState>, 654 headers: axum::http::HeaderMap, 655) -> Response { 656 let extracted = match crate::auth::extract_auth_token_from_header( 657 headers.get("Authorization").and_then(|h| h.to_str().ok()), 658 ) { 659 Some(t) => t, 660 None => return ApiError::AuthenticationRequired.into_response(), 661 }; 662 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 663 let http_uri = format!( 664 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 665 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 666 ); 667 let validated = match crate::auth::validate_token_with_dpop( 668 &state.db, 669 &extracted.token, 670 extracted.is_dpop, 671 dpop_proof, 672 "POST", 673 &http_uri, 674 true, 675 ) 676 .await 677 { 678 Ok(user) => user, 679 Err(e) => return ApiError::from(e).into_response(), 680 }; 681 let did = validated.did.clone(); 682 683 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await { 684 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await; 685 } 686 687 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 688 .fetch_optional(&state.db) 689 .await 690 { 691 Ok(Some(id)) => id, 692 _ => { 693 return ( 694 StatusCode::INTERNAL_SERVER_ERROR, 695 Json(json!({"error": "InternalError"})), 696 ) 697 .into_response(); 698 } 699 }; 700 let confirmation_token = Uuid::new_v4().to_string(); 701 let expires_at = Utc::now() + Duration::minutes(15); 702 let insert = sqlx::query!( 703 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 704 confirmation_token, 705 did, 706 expires_at 707 ) 708 .execute(&state.db) 709 .await; 710 if let Err(e) = insert { 711 error!("DB error creating deletion token: {:?}", e); 712 return ( 713 StatusCode::INTERNAL_SERVER_ERROR, 714 Json(json!({"error": "InternalError"})), 715 ) 716 .into_response(); 717 } 718 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 719 if let Err(e) = 720 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 721 .await 722 { 723 warn!("Failed to enqueue account deletion notification: {:?}", e); 724 } 725 info!("Account deletion requested for user {}", did); 726 (StatusCode::OK, Json(json!({}))).into_response() 727} 728 729#[derive(Deserialize)] 730pub struct DeleteAccountInput { 731 pub did: String, 732 pub password: String, 733 pub token: String, 734} 735 736pub async fn delete_account( 737 State(state): State<AppState>, 738 Json(input): Json<DeleteAccountInput>, 739) -> Response { 740 let did = input.did.trim(); 741 let password = &input.password; 742 let token = input.token.trim(); 743 if did.is_empty() { 744 return ( 745 StatusCode::BAD_REQUEST, 746 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 747 ) 748 .into_response(); 749 } 750 if password.is_empty() { 751 return ( 752 StatusCode::BAD_REQUEST, 753 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 754 ) 755 .into_response(); 756 } 757 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 758 if password.len() > OLD_PASSWORD_MAX_LENGTH { 759 return ( 760 StatusCode::BAD_REQUEST, 761 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})), 762 ) 763 .into_response(); 764 } 765 if token.is_empty() { 766 return ( 767 StatusCode::BAD_REQUEST, 768 Json(json!({"error": "InvalidToken", "message": "token is required"})), 769 ) 770 .into_response(); 771 } 772 let user = sqlx::query!( 773 "SELECT id, password_hash, handle FROM users WHERE did = $1", 774 did 775 ) 776 .fetch_optional(&state.db) 777 .await; 778 let (user_id, password_hash, handle) = match user { 779 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 780 Ok(None) => { 781 return ( 782 StatusCode::BAD_REQUEST, 783 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 784 ) 785 .into_response(); 786 } 787 Err(e) => { 788 error!("DB error in delete_account: {:?}", e); 789 return ( 790 StatusCode::INTERNAL_SERVER_ERROR, 791 Json(json!({"error": "InternalError"})), 792 ) 793 .into_response(); 794 } 795 }; 796 let password_valid = if password_hash 797 .as_ref() 798 .map(|h| verify(password, h).unwrap_or(false)) 799 .unwrap_or(false) 800 { 801 true 802 } else { 803 let app_pass_rows = sqlx::query!( 804 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 805 user_id 806 ) 807 .fetch_all(&state.db) 808 .await 809 .unwrap_or_default(); 810 app_pass_rows 811 .iter() 812 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 813 }; 814 if !password_valid { 815 return ( 816 StatusCode::UNAUTHORIZED, 817 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 818 ) 819 .into_response(); 820 } 821 let deletion_request = sqlx::query!( 822 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 823 token 824 ) 825 .fetch_optional(&state.db) 826 .await; 827 let (token_did, expires_at) = match deletion_request { 828 Ok(Some(row)) => (row.did, row.expires_at), 829 Ok(None) => { 830 return ( 831 StatusCode::BAD_REQUEST, 832 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 833 ) 834 .into_response(); 835 } 836 Err(e) => { 837 error!("DB error fetching deletion token: {:?}", e); 838 return ( 839 StatusCode::INTERNAL_SERVER_ERROR, 840 Json(json!({"error": "InternalError"})), 841 ) 842 .into_response(); 843 } 844 }; 845 if token_did != did { 846 return ( 847 StatusCode::BAD_REQUEST, 848 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 849 ) 850 .into_response(); 851 } 852 if Utc::now() > expires_at { 853 let _ = sqlx::query!( 854 "DELETE FROM account_deletion_requests WHERE token = $1", 855 token 856 ) 857 .execute(&state.db) 858 .await; 859 return ( 860 StatusCode::BAD_REQUEST, 861 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 862 ) 863 .into_response(); 864 } 865 let mut tx = match state.db.begin().await { 866 Ok(tx) => tx, 867 Err(e) => { 868 error!("Failed to begin transaction: {:?}", e); 869 return ( 870 StatusCode::INTERNAL_SERVER_ERROR, 871 Json(json!({"error": "InternalError"})), 872 ) 873 .into_response(); 874 } 875 }; 876 let deletion_result: Result<(), sqlx::Error> = async { 877 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 878 .execute(&mut *tx) 879 .await?; 880 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 881 .execute(&mut *tx) 882 .await?; 883 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 884 .execute(&mut *tx) 885 .await?; 886 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 887 .execute(&mut *tx) 888 .await?; 889 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 890 .execute(&mut *tx) 891 .await?; 892 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 893 .execute(&mut *tx) 894 .await?; 895 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 896 .execute(&mut *tx) 897 .await?; 898 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 899 .execute(&mut *tx) 900 .await?; 901 Ok(()) 902 } 903 .await; 904 match deletion_result { 905 Ok(()) => { 906 if let Err(e) = tx.commit().await { 907 error!("Failed to commit account deletion transaction: {:?}", e); 908 return ( 909 StatusCode::INTERNAL_SERVER_ERROR, 910 Json(json!({"error": "InternalError"})), 911 ) 912 .into_response(); 913 } 914 let account_seq = crate::api::repo::record::sequence_account_event( 915 &state, 916 did, 917 false, 918 Some("deleted"), 919 ) 920 .await; 921 match account_seq { 922 Ok(seq) => { 923 if let Err(e) = sqlx::query!( 924 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 925 did, 926 seq 927 ) 928 .execute(&state.db) 929 .await 930 { 931 warn!( 932 "Failed to cleanup sequences for deleted account {}: {}", 933 did, e 934 ); 935 } 936 } 937 Err(e) => { 938 warn!( 939 "Failed to sequence account deletion event for {}: {}", 940 did, e 941 ); 942 } 943 } 944 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 945 info!("Account {} deleted successfully", did); 946 (StatusCode::OK, Json(json!({}))).into_response() 947 } 948 Err(e) => { 949 error!("DB error deleting account, rolling back: {:?}", e); 950 ( 951 StatusCode::INTERNAL_SERVER_ERROR, 952 Json(json!({"error": "InternalError"})), 953 ) 954 .into_response() 955 } 956 } 957}