this repo has no description
1use crate::api::EmptyResponse; 2use crate::api::error::ApiError; 3use crate::cache::Cache; 4use crate::plc::PlcClient; 5use crate::state::AppState; 6use crate::types::PlainPassword; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::verify; 14use chrono::{Duration, Utc}; 15use cid::Cid; 16use jacquard_repo::commit::Commit; 17use jacquard_repo::storage::BlockStore; 18use k256::ecdsa::SigningKey; 19use serde::{Deserialize, Serialize}; 20use std::str::FromStr; 21use std::sync::Arc; 22use tracing::{error, info, warn}; 23use uuid::Uuid; 24 25#[derive(Serialize)] 26#[serde(rename_all = "camelCase")] 27pub struct CheckAccountStatusOutput { 28 pub activated: bool, 29 pub valid_did: bool, 30 pub repo_commit: String, 31 pub repo_rev: String, 32 pub repo_blocks: i64, 33 pub indexed_records: i64, 34 pub private_state_values: i64, 35 pub expected_blobs: i64, 36 pub imported_blobs: i64, 37} 38 39pub async fn check_account_status( 40 State(state): State<AppState>, 41 headers: axum::http::HeaderMap, 42) -> Response { 43 let extracted = match crate::auth::extract_auth_token_from_header( 44 headers.get("Authorization").and_then(|h| h.to_str().ok()), 45 ) { 46 Some(t) => t, 47 None => return ApiError::AuthenticationRequired.into_response(), 48 }; 49 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 50 let http_uri = format!( 51 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 52 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 53 ); 54 let did = match crate::auth::validate_token_with_dpop( 55 &state.db, 56 &extracted.token, 57 extracted.is_dpop, 58 dpop_proof, 59 "GET", 60 &http_uri, 61 true, 62 ) 63 .await 64 { 65 Ok(user) => user.did, 66 Err(e) => return ApiError::from(e).into_response(), 67 }; 68 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 69 .fetch_optional(&state.db) 70 .await 71 { 72 Ok(Some(id)) => id, 73 _ => { 74 return ApiError::InternalError(None).into_response(); 75 } 76 }; 77 let user_status = sqlx::query!( 78 "SELECT deactivated_at FROM users WHERE did = $1", 79 did.as_str() 80 ) 81 .fetch_optional(&state.db) 82 .await; 83 let deactivated_at = match user_status { 84 Ok(Some(row)) => row.deactivated_at, 85 _ => None, 86 }; 87 let repo_result = sqlx::query!( 88 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 89 user_id 90 ) 91 .fetch_optional(&state.db) 92 .await; 93 let (repo_commit, repo_rev_from_db) = match repo_result { 94 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 95 _ => (String::new(), None), 96 }; 97 let block_count: i64 = sqlx::query_scalar!( 98 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", 99 user_id 100 ) 101 .fetch_one(&state.db) 102 .await 103 .unwrap_or(Some(0)) 104 .unwrap_or(0); 105 let repo_rev = if let Some(rev) = repo_rev_from_db { 106 rev 107 } else if !repo_commit.is_empty() { 108 if let Ok(cid) = Cid::from_str(&repo_commit) { 109 if let Ok(Some(block)) = state.block_store.get(&cid).await { 110 Commit::from_cbor(&block) 111 .ok() 112 .map(|c| c.rev().to_string()) 113 .unwrap_or_default() 114 } else { 115 String::new() 116 } 117 } else { 118 String::new() 119 } 120 } else { 121 String::new() 122 }; 123 let record_count: i64 = 124 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 125 .fetch_one(&state.db) 126 .await 127 .unwrap_or(Some(0)) 128 .unwrap_or(0); 129 let imported_blobs: i64 = sqlx::query_scalar!( 130 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 131 user_id 132 ) 133 .fetch_one(&state.db) 134 .await 135 .unwrap_or(Some(0)) 136 .unwrap_or(0); 137 let expected_blobs: i64 = sqlx::query_scalar!( 138 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1", 139 user_id 140 ) 141 .fetch_one(&state.db) 142 .await 143 .unwrap_or(Some(0)) 144 .unwrap_or(0); 145 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await; 146 ( 147 StatusCode::OK, 148 Json(CheckAccountStatusOutput { 149 activated: deactivated_at.is_none(), 150 valid_did, 151 repo_commit: repo_commit.clone(), 152 repo_rev, 153 repo_blocks: block_count as i64, 154 indexed_records: record_count, 155 private_state_values: 0, 156 expected_blobs, 157 imported_blobs, 158 }), 159 ) 160 .into_response() 161} 162 163async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool { 164 assert_valid_did_document_for_service(db, cache, did, false) 165 .await 166 .is_ok() 167} 168 169async fn assert_valid_did_document_for_service( 170 db: &sqlx::PgPool, 171 cache: Arc<dyn Cache>, 172 did: &str, 173 with_retry: bool, 174) -> Result<(), ApiError> { 175 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 176 let expected_endpoint = format!("https://{}", hostname); 177 178 if did.starts_with("did:plc:") { 179 let plc_client = PlcClient::with_cache(None, Some(cache.clone())); 180 181 let max_attempts = if with_retry { 5 } else { 1 }; 182 let mut last_error = None; 183 let mut doc_data = None; 184 for attempt in 0..max_attempts { 185 if attempt > 0 { 186 let delay_ms = 500 * (1 << (attempt - 1)); 187 info!( 188 "Waiting {}ms before retry {} for DID document validation ({})", 189 delay_ms, attempt, did 190 ); 191 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 192 } 193 194 match plc_client.get_document_data(did).await { 195 Ok(data) => { 196 let pds_endpoint = data 197 .get("services") 198 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 199 .and_then(|p| p.get("endpoint")) 200 .and_then(|e| e.as_str()); 201 202 if pds_endpoint == Some(&expected_endpoint) { 203 doc_data = Some(data); 204 break; 205 } else { 206 info!( 207 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 208 attempt + 1, 209 did, 210 pds_endpoint, 211 expected_endpoint 212 ); 213 last_error = Some(format!( 214 "DID document endpoint {:?} does not match expected {}", 215 pds_endpoint, expected_endpoint 216 )); 217 } 218 } 219 Err(e) => { 220 warn!( 221 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 222 attempt + 1, 223 did, 224 e 225 ); 226 last_error = Some(format!("Could not resolve DID document: {}", e)); 227 } 228 } 229 } 230 231 let Some(doc_data) = doc_data else { 232 return Err(ApiError::InvalidRequest( 233 last_error.unwrap_or_else(|| "DID document validation failed".to_string()), 234 )); 235 }; 236 237 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 238 if let Some(ref expected_rotation_key) = server_rotation_key { 239 let rotation_keys = doc_data 240 .get("rotationKeys") 241 .and_then(|v| v.as_array()) 242 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>()) 243 .unwrap_or_default(); 244 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 245 return Err(ApiError::InvalidRequest( 246 "Server rotation key not included in PLC DID data".into(), 247 )); 248 } 249 } 250 251 let doc_signing_key = doc_data 252 .get("verificationMethods") 253 .and_then(|v| v.get("atproto")) 254 .and_then(|k| k.as_str()); 255 256 let user_row = sqlx::query!( 257 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 258 did 259 ) 260 .fetch_optional(db) 261 .await 262 .map_err(|e| { 263 error!("Failed to fetch user key: {:?}", e); 264 ApiError::InternalError(None) 265 })?; 266 267 if let Some(row) = user_row { 268 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 269 .map_err(|e| { 270 error!("Failed to decrypt user key: {}", e); 271 ApiError::InternalError(None) 272 })?; 273 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 274 error!("Failed to create signing key: {:?}", e); 275 ApiError::InternalError(None) 276 })?; 277 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 278 279 if doc_signing_key != Some(&expected_did_key) { 280 warn!( 281 "DID {} has signing key {:?}, expected {}", 282 did, doc_signing_key, expected_did_key 283 ); 284 return Err(ApiError::InvalidRequest( 285 "DID document verification method does not match expected signing key".into(), 286 )); 287 } 288 } 289 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 290 let client = crate::api::proxy_client::did_resolution_client(); 291 let decoded = host_and_path.replace("%3A", ":"); 292 let parts: Vec<&str> = decoded.split(':').collect(); 293 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 294 { 295 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 296 } else { 297 (parts[0].to_string(), parts[1..].to_vec()) 298 }; 299 let scheme = 300 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 301 "http" 302 } else { 303 "https" 304 }; 305 let url = if path_parts.is_empty() { 306 format!("{}://{}/.well-known/did.json", scheme, host) 307 } else { 308 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 309 }; 310 let resp = client.get(&url).send().await.map_err(|e| { 311 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 312 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e)) 313 })?; 314 let doc: serde_json::Value = resp.json().await.map_err(|e| { 315 warn!("Failed to parse did:web document for {}: {:?}", did, e); 316 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e)) 317 })?; 318 319 let pds_endpoint = doc 320 .get("service") 321 .and_then(|s| s.as_array()) 322 .and_then(|arr| { 323 arr.iter().find(|svc| { 324 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 325 || svc.get("type").and_then(|t| t.as_str()) 326 == Some("AtprotoPersonalDataServer") 327 }) 328 }) 329 .and_then(|svc| svc.get("serviceEndpoint")) 330 .and_then(|e| e.as_str()); 331 332 if pds_endpoint != Some(&expected_endpoint) { 333 warn!( 334 "DID {} has endpoint {:?}, expected {}", 335 did, pds_endpoint, expected_endpoint 336 ); 337 return Err(ApiError::InvalidRequest( 338 "DID document atproto_pds service endpoint does not match PDS public url".into(), 339 )); 340 } 341 } 342 343 Ok(()) 344} 345 346pub async fn activate_account( 347 State(state): State<AppState>, 348 headers: axum::http::HeaderMap, 349) -> Response { 350 info!("[MIGRATION] activateAccount called"); 351 let extracted = match crate::auth::extract_auth_token_from_header( 352 headers.get("Authorization").and_then(|h| h.to_str().ok()), 353 ) { 354 Some(t) => t, 355 None => { 356 info!("[MIGRATION] activateAccount: No auth token"); 357 return ApiError::AuthenticationRequired.into_response(); 358 } 359 }; 360 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 361 let http_uri = format!( 362 "https://{}/xrpc/com.atproto.server.activateAccount", 363 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 364 ); 365 let auth_user = match crate::auth::validate_token_with_dpop( 366 &state.db, 367 &extracted.token, 368 extracted.is_dpop, 369 dpop_proof, 370 "POST", 371 &http_uri, 372 true, 373 ) 374 .await 375 { 376 Ok(user) => user, 377 Err(e) => { 378 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 379 return ApiError::from(e).into_response(); 380 } 381 }; 382 info!( 383 "[MIGRATION] activateAccount: Authenticated user did={}", 384 auth_user.did 385 ); 386 387 if let Err(e) = crate::auth::scope_check::check_account_scope( 388 auth_user.is_oauth, 389 auth_user.scope.as_deref(), 390 crate::oauth::scopes::AccountAttr::Repo, 391 crate::oauth::scopes::AccountAction::Manage, 392 ) { 393 info!("[MIGRATION] activateAccount: Scope check failed"); 394 return e; 395 } 396 397 let did = auth_user.did; 398 399 info!( 400 "[MIGRATION] activateAccount: Validating DID document for did={}", 401 did 402 ); 403 let did_validation_start = std::time::Instant::now(); 404 if let Err(e) = 405 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true) 406 .await 407 { 408 info!( 409 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 410 did, 411 did_validation_start.elapsed() 412 ); 413 return e.into_response(); 414 } 415 info!( 416 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 417 did, 418 did_validation_start.elapsed() 419 ); 420 421 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 422 .fetch_optional(&state.db) 423 .await 424 .ok() 425 .flatten(); 426 info!( 427 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 428 did, handle 429 ); 430 let result = sqlx::query!( 431 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 432 did.as_str() 433 ) 434 .execute(&state.db) 435 .await; 436 match result { 437 Ok(_) => { 438 info!( 439 "[MIGRATION] activateAccount: DB update success for did={}", 440 did 441 ); 442 if let Some(ref h) = handle { 443 let _ = state.cache.delete(&format!("handle:{}", h)).await; 444 } 445 info!( 446 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 447 did 448 ); 449 if let Err(e) = 450 crate::api::repo::record::sequence_account_event(&state, did.as_str(), true, None) 451 .await 452 { 453 warn!( 454 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 455 e 456 ); 457 } else { 458 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 459 } 460 info!( 461 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 462 did, handle 463 ); 464 if let Err(e) = crate::api::repo::record::sequence_identity_event( 465 &state, 466 did.as_str(), 467 handle.as_deref(), 468 ) 469 .await 470 { 471 warn!( 472 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 473 e 474 ); 475 } else { 476 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 477 } 478 let repo_root = sqlx::query_scalar!( 479 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 480 did.as_str() 481 ) 482 .fetch_optional(&state.db) 483 .await 484 .ok() 485 .flatten(); 486 if let Some(root_cid) = repo_root { 487 info!( 488 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 489 did, root_cid 490 ); 491 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 492 if let Ok(Some(block)) = state.block_store.get(&cid).await { 493 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 494 } else { 495 None 496 } 497 } else { 498 None 499 }; 500 if let Err(e) = crate::api::repo::record::sequence_sync_event( 501 &state, 502 did.as_str(), 503 &root_cid, 504 rev.as_deref(), 505 ) 506 .await 507 { 508 warn!( 509 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 510 e 511 ); 512 } else { 513 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 514 } 515 } else { 516 warn!( 517 "[MIGRATION] activateAccount: No repo root found for did={}", 518 did 519 ); 520 } 521 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 522 EmptyResponse::ok().into_response() 523 } 524 Err(e) => { 525 error!( 526 "[MIGRATION] activateAccount: DB error activating account: {:?}", 527 e 528 ); 529 ApiError::InternalError(None).into_response() 530 } 531 } 532} 533 534#[derive(Deserialize)] 535#[serde(rename_all = "camelCase")] 536pub struct DeactivateAccountInput { 537 pub delete_after: Option<String>, 538} 539 540pub async fn deactivate_account( 541 State(state): State<AppState>, 542 headers: axum::http::HeaderMap, 543 Json(input): Json<DeactivateAccountInput>, 544) -> Response { 545 let extracted = match crate::auth::extract_auth_token_from_header( 546 headers.get("Authorization").and_then(|h| h.to_str().ok()), 547 ) { 548 Some(t) => t, 549 None => return ApiError::AuthenticationRequired.into_response(), 550 }; 551 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 552 let http_uri = format!( 553 "https://{}/xrpc/com.atproto.server.deactivateAccount", 554 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 555 ); 556 let auth_user = match crate::auth::validate_token_with_dpop( 557 &state.db, 558 &extracted.token, 559 extracted.is_dpop, 560 dpop_proof, 561 "POST", 562 &http_uri, 563 false, 564 ) 565 .await 566 { 567 Ok(user) => user, 568 Err(e) => return ApiError::from(e).into_response(), 569 }; 570 571 if let Err(e) = crate::auth::scope_check::check_account_scope( 572 auth_user.is_oauth, 573 auth_user.scope.as_deref(), 574 crate::oauth::scopes::AccountAttr::Repo, 575 crate::oauth::scopes::AccountAction::Manage, 576 ) { 577 return e; 578 } 579 580 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 581 .delete_after 582 .as_ref() 583 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 584 .map(|dt| dt.with_timezone(&chrono::Utc)); 585 586 let did = auth_user.did; 587 588 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 589 .fetch_optional(&state.db) 590 .await 591 .ok() 592 .flatten(); 593 594 let result = sqlx::query!( 595 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 596 did.as_str(), 597 delete_after 598 ) 599 .execute(&state.db) 600 .await; 601 602 match result { 603 Ok(_) => { 604 if let Some(ref h) = handle { 605 let _ = state.cache.delete(&format!("handle:{}", h)).await; 606 } 607 if let Err(e) = crate::api::repo::record::sequence_account_event( 608 &state, 609 did.as_str(), 610 false, 611 Some("deactivated"), 612 ) 613 .await 614 { 615 warn!("Failed to sequence account deactivated event: {}", e); 616 } 617 EmptyResponse::ok().into_response() 618 } 619 Err(e) => { 620 error!("DB error deactivating account: {:?}", e); 621 ApiError::InternalError(None).into_response() 622 } 623 } 624} 625 626pub async fn request_account_delete( 627 State(state): State<AppState>, 628 headers: axum::http::HeaderMap, 629) -> Response { 630 let extracted = match crate::auth::extract_auth_token_from_header( 631 headers.get("Authorization").and_then(|h| h.to_str().ok()), 632 ) { 633 Some(t) => t, 634 None => return ApiError::AuthenticationRequired.into_response(), 635 }; 636 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 637 let http_uri = format!( 638 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 639 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 640 ); 641 let validated = match crate::auth::validate_token_with_dpop( 642 &state.db, 643 &extracted.token, 644 extracted.is_dpop, 645 dpop_proof, 646 "POST", 647 &http_uri, 648 true, 649 ) 650 .await 651 { 652 Ok(user) => user, 653 Err(e) => return ApiError::from(e).into_response(), 654 }; 655 let did = validated.did.clone(); 656 657 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await { 658 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str()) 659 .await; 660 } 661 662 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 663 .fetch_optional(&state.db) 664 .await 665 { 666 Ok(Some(id)) => id, 667 _ => { 668 return ApiError::InternalError(None).into_response(); 669 } 670 }; 671 let confirmation_token = Uuid::new_v4().to_string(); 672 let expires_at = Utc::now() + Duration::minutes(15); 673 let insert = sqlx::query!( 674 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 675 confirmation_token, 676 did.as_str(), 677 expires_at 678 ) 679 .execute(&state.db) 680 .await; 681 if let Err(e) = insert { 682 error!("DB error creating deletion token: {:?}", e); 683 return ApiError::InternalError(None).into_response(); 684 } 685 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 686 if let Err(e) = 687 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 688 .await 689 { 690 warn!("Failed to enqueue account deletion notification: {:?}", e); 691 } 692 info!("Account deletion requested for user {}", did); 693 EmptyResponse::ok().into_response() 694} 695 696#[derive(Deserialize)] 697pub struct DeleteAccountInput { 698 pub did: crate::types::Did, 699 pub password: PlainPassword, 700 pub token: String, 701} 702 703pub async fn delete_account( 704 State(state): State<AppState>, 705 Json(input): Json<DeleteAccountInput>, 706) -> Response { 707 let did = &input.did; 708 let password = &input.password; 709 let token = input.token.trim(); 710 if password.is_empty() { 711 return ApiError::InvalidRequest("password is required".into()).into_response(); 712 } 713 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 714 if password.len() > OLD_PASSWORD_MAX_LENGTH { 715 return ApiError::InvalidRequest("Invalid password length".into()).into_response(); 716 } 717 if token.is_empty() { 718 return ApiError::InvalidToken(Some("token is required".into())).into_response(); 719 } 720 let user = sqlx::query!( 721 "SELECT id, password_hash, handle FROM users WHERE did = $1", 722 did.as_str() 723 ) 724 .fetch_optional(&state.db) 725 .await; 726 let (user_id, password_hash, handle) = match user { 727 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 728 Ok(None) => { 729 return ApiError::InvalidRequest("account not found".into()).into_response(); 730 } 731 Err(e) => { 732 error!("DB error in delete_account: {:?}", e); 733 return ApiError::InternalError(None).into_response(); 734 } 735 }; 736 let password_valid = if password_hash 737 .as_ref() 738 .map(|h| verify(password, h).unwrap_or(false)) 739 .unwrap_or(false) 740 { 741 true 742 } else { 743 let app_pass_rows = sqlx::query!( 744 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 745 user_id 746 ) 747 .fetch_all(&state.db) 748 .await 749 .unwrap_or_default(); 750 app_pass_rows 751 .iter() 752 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 753 }; 754 if !password_valid { 755 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response(); 756 } 757 let deletion_request = sqlx::query!( 758 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 759 token 760 ) 761 .fetch_optional(&state.db) 762 .await; 763 let (token_did, expires_at) = match deletion_request { 764 Ok(Some(row)) => (row.did, row.expires_at), 765 Ok(None) => { 766 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response(); 767 } 768 Err(e) => { 769 error!("DB error fetching deletion token: {:?}", e); 770 return ApiError::InternalError(None).into_response(); 771 } 772 }; 773 if token_did != did.as_str() { 774 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response(); 775 } 776 if Utc::now() > expires_at { 777 let _ = sqlx::query!( 778 "DELETE FROM account_deletion_requests WHERE token = $1", 779 token 780 ) 781 .execute(&state.db) 782 .await; 783 return ApiError::ExpiredToken(None).into_response(); 784 } 785 let mut tx = match state.db.begin().await { 786 Ok(tx) => tx, 787 Err(e) => { 788 error!("Failed to begin transaction: {:?}", e); 789 return ApiError::InternalError(None).into_response(); 790 } 791 }; 792 let deletion_result: Result<(), sqlx::Error> = async { 793 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 794 .execute(&mut *tx) 795 .await?; 796 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 797 .execute(&mut *tx) 798 .await?; 799 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 800 .execute(&mut *tx) 801 .await?; 802 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 803 .execute(&mut *tx) 804 .await?; 805 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 806 .execute(&mut *tx) 807 .await?; 808 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 809 .execute(&mut *tx) 810 .await?; 811 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 812 .execute(&mut *tx) 813 .await?; 814 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 815 .execute(&mut *tx) 816 .await?; 817 Ok(()) 818 } 819 .await; 820 match deletion_result { 821 Ok(()) => { 822 if let Err(e) = tx.commit().await { 823 error!("Failed to commit account deletion transaction: {:?}", e); 824 return ApiError::InternalError(None).into_response(); 825 } 826 let account_seq = crate::api::repo::record::sequence_account_event( 827 &state, 828 did, 829 false, 830 Some("deleted"), 831 ) 832 .await; 833 match account_seq { 834 Ok(seq) => { 835 if let Err(e) = sqlx::query!( 836 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 837 did, 838 seq 839 ) 840 .execute(&state.db) 841 .await 842 { 843 warn!( 844 "Failed to cleanup sequences for deleted account {}: {}", 845 did, e 846 ); 847 } 848 } 849 Err(e) => { 850 warn!( 851 "Failed to sequence account deletion event for {}: {}", 852 did, e 853 ); 854 } 855 } 856 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 857 info!("Account {} deleted successfully", did); 858 EmptyResponse::ok().into_response() 859 } 860 Err(e) => { 861 error!("DB error deleting account, rolling back: {:?}", e); 862 ApiError::InternalError(None).into_response() 863 } 864 } 865}