this repo has no description
at main 30 kB view raw
1use crate::api::EmptyResponse; 2use crate::api::error::ApiError; 3use crate::cache::Cache; 4use crate::plc::PlcClient; 5use crate::state::AppState; 6use crate::types::{Handle, PlainPassword}; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use backon::{ExponentialBuilder, Retryable}; 14use bcrypt::verify; 15use chrono::{Duration, Utc}; 16use cid::Cid; 17use jacquard_repo::commit::Commit; 18use jacquard_repo::storage::BlockStore; 19use k256::ecdsa::SigningKey; 20use serde::{Deserialize, Serialize}; 21use std::str::FromStr; 22use std::sync::Arc; 23use std::sync::atomic::{AtomicUsize, Ordering}; 24use tracing::{error, info, warn}; 25use uuid::Uuid; 26 27#[derive(Serialize)] 28#[serde(rename_all = "camelCase")] 29pub struct CheckAccountStatusOutput { 30 pub activated: bool, 31 pub valid_did: bool, 32 pub repo_commit: String, 33 pub repo_rev: String, 34 pub repo_blocks: i64, 35 pub indexed_records: i64, 36 pub private_state_values: i64, 37 pub expected_blobs: i64, 38 pub imported_blobs: i64, 39} 40 41pub async fn check_account_status( 42 State(state): State<AppState>, 43 headers: axum::http::HeaderMap, 44) -> Response { 45 let extracted = match crate::auth::extract_auth_token_from_header( 46 headers.get("Authorization").and_then(|h| h.to_str().ok()), 47 ) { 48 Some(t) => t, 49 None => return ApiError::AuthenticationRequired.into_response(), 50 }; 51 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 52 let http_uri = format!( 53 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 54 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 55 ); 56 let did = match crate::auth::validate_token_with_dpop( 57 &state.db, 58 &extracted.token, 59 extracted.is_dpop, 60 dpop_proof, 61 "GET", 62 &http_uri, 63 true, 64 false, 65 ) 66 .await 67 { 68 Ok(user) => user.did, 69 Err(e) => return ApiError::from(e).into_response(), 70 }; 71 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 72 .fetch_optional(&state.db) 73 .await 74 { 75 Ok(Some(id)) => id, 76 _ => { 77 return ApiError::InternalError(None).into_response(); 78 } 79 }; 80 let user_status = sqlx::query!( 81 "SELECT deactivated_at FROM users WHERE did = $1", 82 did.as_str() 83 ) 84 .fetch_optional(&state.db) 85 .await; 86 let deactivated_at = match user_status { 87 Ok(Some(row)) => row.deactivated_at, 88 _ => None, 89 }; 90 let repo_result = sqlx::query!( 91 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 92 user_id 93 ) 94 .fetch_optional(&state.db) 95 .await; 96 let (repo_commit, repo_rev_from_db) = match repo_result { 97 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 98 _ => (String::new(), None), 99 }; 100 let block_count: i64 = sqlx::query_scalar!( 101 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", 102 user_id 103 ) 104 .fetch_one(&state.db) 105 .await 106 .unwrap_or(Some(0)) 107 .unwrap_or(0); 108 let repo_rev = if let Some(rev) = repo_rev_from_db { 109 rev 110 } else if !repo_commit.is_empty() { 111 if let Ok(cid) = Cid::from_str(&repo_commit) { 112 if let Ok(Some(block)) = state.block_store.get(&cid).await { 113 Commit::from_cbor(&block) 114 .ok() 115 .map(|c| c.rev().to_string()) 116 .unwrap_or_default() 117 } else { 118 String::new() 119 } 120 } else { 121 String::new() 122 } 123 } else { 124 String::new() 125 }; 126 let record_count: i64 = 127 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 128 .fetch_one(&state.db) 129 .await 130 .unwrap_or(Some(0)) 131 .unwrap_or(0); 132 let imported_blobs: i64 = sqlx::query_scalar!( 133 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 134 user_id 135 ) 136 .fetch_one(&state.db) 137 .await 138 .unwrap_or(Some(0)) 139 .unwrap_or(0); 140 let expected_blobs: i64 = sqlx::query_scalar!( 141 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1", 142 user_id 143 ) 144 .fetch_one(&state.db) 145 .await 146 .unwrap_or(Some(0)) 147 .unwrap_or(0); 148 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await; 149 ( 150 StatusCode::OK, 151 Json(CheckAccountStatusOutput { 152 activated: deactivated_at.is_none(), 153 valid_did, 154 repo_commit: repo_commit.clone(), 155 repo_rev, 156 repo_blocks: block_count as i64, 157 indexed_records: record_count, 158 private_state_values: 0, 159 expected_blobs, 160 imported_blobs, 161 }), 162 ) 163 .into_response() 164} 165 166async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool { 167 assert_valid_did_document_for_service(db, cache, did, false) 168 .await 169 .is_ok() 170} 171 172async fn assert_valid_did_document_for_service( 173 db: &sqlx::PgPool, 174 cache: Arc<dyn Cache>, 175 did: &str, 176 with_retry: bool, 177) -> Result<(), ApiError> { 178 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 179 let expected_endpoint = format!("https://{}", hostname); 180 181 if did.starts_with("did:plc:") { 182 let max_attempts = if with_retry { 5 } else { 1 }; 183 let cache_for_retry = cache.clone(); 184 let did_owned = did.to_string(); 185 let expected_owned = expected_endpoint.clone(); 186 let attempt_counter = Arc::new(AtomicUsize::new(0)); 187 188 let doc_data: serde_json::Value = (|| { 189 let cache_ref = cache_for_retry.clone(); 190 let did_ref = did_owned.clone(); 191 let expected_ref = expected_owned.clone(); 192 let counter = attempt_counter.clone(); 193 async move { 194 let attempt = counter.fetch_add(1, Ordering::SeqCst); 195 if attempt > 0 { 196 info!( 197 "Retry {} for DID document validation ({})", 198 attempt, did_ref 199 ); 200 } 201 let plc_client = PlcClient::with_cache(None, Some(cache_ref)); 202 match plc_client.get_document_data(&did_ref).await { 203 Ok(data) => { 204 let pds_endpoint = data 205 .get("services") 206 .and_then(|s: &serde_json::Value| { 207 s.get("atproto_pds").or_else(|| s.get("atprotoPds")) 208 }) 209 .and_then(|p: &serde_json::Value| p.get("endpoint")) 210 .and_then(|e: &serde_json::Value| e.as_str()); 211 212 if pds_endpoint == Some(expected_ref.as_str()) { 213 Ok(data) 214 } else { 215 info!( 216 "Attempt {}: DID {} has endpoint {:?}, expected {}", 217 attempt + 1, 218 did_ref, 219 pds_endpoint, 220 expected_ref 221 ); 222 Err(format!( 223 "DID document endpoint {:?} does not match expected {}", 224 pds_endpoint, expected_ref 225 )) 226 } 227 } 228 Err(e) => { 229 warn!( 230 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 231 attempt + 1, 232 did_ref, 233 e 234 ); 235 Err(format!("Could not resolve DID document: {}", e)) 236 } 237 } 238 } 239 }) 240 .retry( 241 ExponentialBuilder::default() 242 .with_min_delay(std::time::Duration::from_millis(500)) 243 .with_max_times(max_attempts), 244 ) 245 .await 246 .map_err(ApiError::InvalidRequest)?; 247 248 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 249 if let Some(ref expected_rotation_key) = server_rotation_key { 250 let rotation_keys = doc_data 251 .get("rotationKeys") 252 .and_then(|v| v.as_array()) 253 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>()) 254 .unwrap_or_default(); 255 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 256 return Err(ApiError::InvalidRequest( 257 "Server rotation key not included in PLC DID data".into(), 258 )); 259 } 260 } 261 262 let doc_signing_key = doc_data 263 .get("verificationMethods") 264 .and_then(|v| v.get("atproto")) 265 .and_then(|k| k.as_str()); 266 267 let user_row = sqlx::query!( 268 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 269 did 270 ) 271 .fetch_optional(db) 272 .await 273 .map_err(|e| { 274 error!("Failed to fetch user key: {:?}", e); 275 ApiError::InternalError(None) 276 })?; 277 278 if let Some(row) = user_row { 279 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 280 .map_err(|e| { 281 error!("Failed to decrypt user key: {}", e); 282 ApiError::InternalError(None) 283 })?; 284 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 285 error!("Failed to create signing key: {:?}", e); 286 ApiError::InternalError(None) 287 })?; 288 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 289 290 if doc_signing_key != Some(&expected_did_key) { 291 warn!( 292 "DID {} has signing key {:?}, expected {}", 293 did, doc_signing_key, expected_did_key 294 ); 295 return Err(ApiError::InvalidRequest( 296 "DID document verification method does not match expected signing key".into(), 297 )); 298 } 299 } 300 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 301 let client = crate::api::proxy_client::did_resolution_client(); 302 let decoded = host_and_path.replace("%3A", ":"); 303 let parts: Vec<&str> = decoded.split(':').collect(); 304 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 305 { 306 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 307 } else { 308 (parts[0].to_string(), parts[1..].to_vec()) 309 }; 310 let scheme = 311 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 312 "http" 313 } else { 314 "https" 315 }; 316 let url = if path_parts.is_empty() { 317 format!("{}://{}/.well-known/did.json", scheme, host) 318 } else { 319 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 320 }; 321 let resp = client.get(&url).send().await.map_err(|e| { 322 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 323 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e)) 324 })?; 325 let doc: serde_json::Value = resp.json().await.map_err(|e| { 326 warn!("Failed to parse did:web document for {}: {:?}", did, e); 327 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e)) 328 })?; 329 330 let pds_endpoint = doc 331 .get("service") 332 .and_then(|s| s.as_array()) 333 .and_then(|arr| { 334 arr.iter().find(|svc| { 335 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 336 || svc.get("type").and_then(|t| t.as_str()) 337 == Some("AtprotoPersonalDataServer") 338 }) 339 }) 340 .and_then(|svc| svc.get("serviceEndpoint")) 341 .and_then(|e| e.as_str()); 342 343 if pds_endpoint != Some(&expected_endpoint) { 344 warn!( 345 "DID {} has endpoint {:?}, expected {}", 346 did, pds_endpoint, expected_endpoint 347 ); 348 return Err(ApiError::InvalidRequest( 349 "DID document atproto_pds service endpoint does not match PDS public url".into(), 350 )); 351 } 352 } 353 354 Ok(()) 355} 356 357pub async fn activate_account( 358 State(state): State<AppState>, 359 headers: axum::http::HeaderMap, 360) -> Response { 361 info!("[MIGRATION] activateAccount called"); 362 let extracted = match crate::auth::extract_auth_token_from_header( 363 headers.get("Authorization").and_then(|h| h.to_str().ok()), 364 ) { 365 Some(t) => t, 366 None => { 367 info!("[MIGRATION] activateAccount: No auth token"); 368 return ApiError::AuthenticationRequired.into_response(); 369 } 370 }; 371 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 372 let http_uri = format!( 373 "https://{}/xrpc/com.atproto.server.activateAccount", 374 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 375 ); 376 let auth_user = match crate::auth::validate_token_with_dpop( 377 &state.db, 378 &extracted.token, 379 extracted.is_dpop, 380 dpop_proof, 381 "POST", 382 &http_uri, 383 true, 384 false, 385 ) 386 .await 387 { 388 Ok(user) => user, 389 Err(e) => { 390 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 391 return ApiError::from(e).into_response(); 392 } 393 }; 394 info!( 395 "[MIGRATION] activateAccount: Authenticated user did={}", 396 auth_user.did 397 ); 398 399 if let Err(e) = crate::auth::scope_check::check_account_scope( 400 auth_user.is_oauth, 401 auth_user.scope.as_deref(), 402 crate::oauth::scopes::AccountAttr::Repo, 403 crate::oauth::scopes::AccountAction::Manage, 404 ) { 405 info!("[MIGRATION] activateAccount: Scope check failed"); 406 return e; 407 } 408 409 let did = auth_user.did; 410 411 info!( 412 "[MIGRATION] activateAccount: Validating DID document for did={}", 413 did 414 ); 415 let did_validation_start = std::time::Instant::now(); 416 if let Err(e) = 417 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true) 418 .await 419 { 420 info!( 421 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 422 did, 423 did_validation_start.elapsed() 424 ); 425 return e.into_response(); 426 } 427 info!( 428 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 429 did, 430 did_validation_start.elapsed() 431 ); 432 433 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 434 .fetch_optional(&state.db) 435 .await 436 .ok() 437 .flatten(); 438 info!( 439 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 440 did, handle 441 ); 442 let result = sqlx::query!( 443 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 444 did.as_str() 445 ) 446 .execute(&state.db) 447 .await; 448 match result { 449 Ok(_) => { 450 info!( 451 "[MIGRATION] activateAccount: DB update success for did={}", 452 did 453 ); 454 if let Some(ref h) = handle { 455 let _ = state.cache.delete(&format!("handle:{}", h)).await; 456 } 457 info!( 458 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 459 did 460 ); 461 if let Err(e) = 462 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 463 { 464 warn!( 465 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 466 e 467 ); 468 } else { 469 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 470 } 471 info!( 472 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 473 did, handle 474 ); 475 let handle_typed = handle.as_ref().map(Handle::new_unchecked); 476 if let Err(e) = crate::api::repo::record::sequence_identity_event( 477 &state, 478 &did, 479 handle_typed.as_ref(), 480 ) 481 .await 482 { 483 warn!( 484 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 485 e 486 ); 487 } else { 488 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 489 } 490 let repo_root = sqlx::query_scalar!( 491 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 492 did.as_str() 493 ) 494 .fetch_optional(&state.db) 495 .await 496 .ok() 497 .flatten(); 498 if let Some(root_cid) = repo_root { 499 info!( 500 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 501 did, root_cid 502 ); 503 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 504 if let Ok(Some(block)) = state.block_store.get(&cid).await { 505 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 506 } else { 507 None 508 } 509 } else { 510 None 511 }; 512 if let Err(e) = crate::api::repo::record::sequence_sync_event( 513 &state, 514 &did, 515 &root_cid, 516 rev.as_deref(), 517 ) 518 .await 519 { 520 warn!( 521 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 522 e 523 ); 524 } else { 525 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 526 } 527 } else { 528 warn!( 529 "[MIGRATION] activateAccount: No repo root found for did={}", 530 did 531 ); 532 } 533 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 534 EmptyResponse::ok().into_response() 535 } 536 Err(e) => { 537 error!( 538 "[MIGRATION] activateAccount: DB error activating account: {:?}", 539 e 540 ); 541 ApiError::InternalError(None).into_response() 542 } 543 } 544} 545 546#[derive(Deserialize)] 547#[serde(rename_all = "camelCase")] 548pub struct DeactivateAccountInput { 549 pub delete_after: Option<String>, 550} 551 552pub async fn deactivate_account( 553 State(state): State<AppState>, 554 headers: axum::http::HeaderMap, 555 Json(input): Json<DeactivateAccountInput>, 556) -> Response { 557 let extracted = match crate::auth::extract_auth_token_from_header( 558 headers.get("Authorization").and_then(|h| h.to_str().ok()), 559 ) { 560 Some(t) => t, 561 None => return ApiError::AuthenticationRequired.into_response(), 562 }; 563 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 564 let http_uri = format!( 565 "https://{}/xrpc/com.atproto.server.deactivateAccount", 566 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 567 ); 568 let auth_user = match crate::auth::validate_token_with_dpop( 569 &state.db, 570 &extracted.token, 571 extracted.is_dpop, 572 dpop_proof, 573 "POST", 574 &http_uri, 575 false, 576 false, 577 ) 578 .await 579 { 580 Ok(user) => user, 581 Err(e) => return ApiError::from(e).into_response(), 582 }; 583 584 if let Err(e) = crate::auth::scope_check::check_account_scope( 585 auth_user.is_oauth, 586 auth_user.scope.as_deref(), 587 crate::oauth::scopes::AccountAttr::Repo, 588 crate::oauth::scopes::AccountAction::Manage, 589 ) { 590 return e; 591 } 592 593 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 594 .delete_after 595 .as_ref() 596 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 597 .map(|dt| dt.with_timezone(&chrono::Utc)); 598 599 let did = auth_user.did; 600 601 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 602 .fetch_optional(&state.db) 603 .await 604 .ok() 605 .flatten(); 606 607 let result = sqlx::query!( 608 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 609 did.as_str(), 610 delete_after 611 ) 612 .execute(&state.db) 613 .await; 614 615 match result { 616 Ok(_) => { 617 if let Some(ref h) = handle { 618 let _ = state.cache.delete(&format!("handle:{}", h)).await; 619 } 620 if let Err(e) = crate::api::repo::record::sequence_account_event( 621 &state, 622 &did, 623 false, 624 Some("deactivated"), 625 ) 626 .await 627 { 628 warn!("Failed to sequence account deactivated event: {}", e); 629 } 630 EmptyResponse::ok().into_response() 631 } 632 Err(e) => { 633 error!("DB error deactivating account: {:?}", e); 634 ApiError::InternalError(None).into_response() 635 } 636 } 637} 638 639pub async fn request_account_delete( 640 State(state): State<AppState>, 641 headers: axum::http::HeaderMap, 642) -> Response { 643 let extracted = match crate::auth::extract_auth_token_from_header( 644 headers.get("Authorization").and_then(|h| h.to_str().ok()), 645 ) { 646 Some(t) => t, 647 None => return ApiError::AuthenticationRequired.into_response(), 648 }; 649 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 650 let http_uri = format!( 651 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 652 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 653 ); 654 let validated = match crate::auth::validate_token_with_dpop( 655 &state.db, 656 &extracted.token, 657 extracted.is_dpop, 658 dpop_proof, 659 "POST", 660 &http_uri, 661 true, 662 false, 663 ) 664 .await 665 { 666 Ok(user) => user, 667 Err(e) => return ApiError::from(e).into_response(), 668 }; 669 let did = validated.did.clone(); 670 671 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await { 672 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str()) 673 .await; 674 } 675 676 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 677 .fetch_optional(&state.db) 678 .await 679 { 680 Ok(Some(id)) => id, 681 _ => { 682 return ApiError::InternalError(None).into_response(); 683 } 684 }; 685 let confirmation_token = Uuid::new_v4().to_string(); 686 let expires_at = Utc::now() + Duration::minutes(15); 687 let insert = sqlx::query!( 688 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 689 confirmation_token, 690 did.as_str(), 691 expires_at 692 ) 693 .execute(&state.db) 694 .await; 695 if let Err(e) = insert { 696 error!("DB error creating deletion token: {:?}", e); 697 return ApiError::InternalError(None).into_response(); 698 } 699 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 700 if let Err(e) = 701 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 702 .await 703 { 704 warn!("Failed to enqueue account deletion notification: {:?}", e); 705 } 706 info!("Account deletion requested for user {}", did); 707 EmptyResponse::ok().into_response() 708} 709 710#[derive(Deserialize)] 711pub struct DeleteAccountInput { 712 pub did: crate::types::Did, 713 pub password: PlainPassword, 714 pub token: String, 715} 716 717pub async fn delete_account( 718 State(state): State<AppState>, 719 Json(input): Json<DeleteAccountInput>, 720) -> Response { 721 let did = &input.did; 722 let password = &input.password; 723 let token = input.token.trim(); 724 if password.is_empty() { 725 return ApiError::InvalidRequest("password is required".into()).into_response(); 726 } 727 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 728 if password.len() > OLD_PASSWORD_MAX_LENGTH { 729 return ApiError::InvalidRequest("Invalid password length".into()).into_response(); 730 } 731 if token.is_empty() { 732 return ApiError::InvalidToken(Some("token is required".into())).into_response(); 733 } 734 let user = sqlx::query!( 735 "SELECT id, password_hash, handle FROM users WHERE did = $1", 736 did.as_str() 737 ) 738 .fetch_optional(&state.db) 739 .await; 740 let (user_id, password_hash, handle) = match user { 741 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 742 Ok(None) => { 743 return ApiError::InvalidRequest("account not found".into()).into_response(); 744 } 745 Err(e) => { 746 error!("DB error in delete_account: {:?}", e); 747 return ApiError::InternalError(None).into_response(); 748 } 749 }; 750 let password_valid = if password_hash 751 .as_ref() 752 .map(|h| verify(password, h).unwrap_or(false)) 753 .unwrap_or(false) 754 { 755 true 756 } else { 757 let app_pass_rows = sqlx::query!( 758 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 759 user_id 760 ) 761 .fetch_all(&state.db) 762 .await 763 .unwrap_or_default(); 764 app_pass_rows 765 .iter() 766 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 767 }; 768 if !password_valid { 769 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response(); 770 } 771 let deletion_request = sqlx::query!( 772 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 773 token 774 ) 775 .fetch_optional(&state.db) 776 .await; 777 let (token_did, expires_at) = match deletion_request { 778 Ok(Some(row)) => (row.did, row.expires_at), 779 Ok(None) => { 780 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response(); 781 } 782 Err(e) => { 783 error!("DB error fetching deletion token: {:?}", e); 784 return ApiError::InternalError(None).into_response(); 785 } 786 }; 787 if token_did != did.as_str() { 788 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response(); 789 } 790 if Utc::now() > expires_at { 791 let _ = sqlx::query!( 792 "DELETE FROM account_deletion_requests WHERE token = $1", 793 token 794 ) 795 .execute(&state.db) 796 .await; 797 return ApiError::ExpiredToken(None).into_response(); 798 } 799 let mut tx = match state.db.begin().await { 800 Ok(tx) => tx, 801 Err(e) => { 802 error!("Failed to begin transaction: {:?}", e); 803 return ApiError::InternalError(None).into_response(); 804 } 805 }; 806 let deletion_result: Result<(), sqlx::Error> = async { 807 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 808 .execute(&mut *tx) 809 .await?; 810 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 811 .execute(&mut *tx) 812 .await?; 813 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 814 .execute(&mut *tx) 815 .await?; 816 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 817 .execute(&mut *tx) 818 .await?; 819 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 820 .execute(&mut *tx) 821 .await?; 822 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 823 .execute(&mut *tx) 824 .await?; 825 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 826 .execute(&mut *tx) 827 .await?; 828 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 829 .execute(&mut *tx) 830 .await?; 831 Ok(()) 832 } 833 .await; 834 match deletion_result { 835 Ok(()) => { 836 if let Err(e) = tx.commit().await { 837 error!("Failed to commit account deletion transaction: {:?}", e); 838 return ApiError::InternalError(None).into_response(); 839 } 840 let account_seq = crate::api::repo::record::sequence_account_event( 841 &state, 842 did, 843 false, 844 Some("deleted"), 845 ) 846 .await; 847 match account_seq { 848 Ok(seq) => { 849 if let Err(e) = sqlx::query!( 850 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 851 did, 852 seq 853 ) 854 .execute(&state.db) 855 .await 856 { 857 warn!( 858 "Failed to cleanup sequences for deleted account {}: {}", 859 did, e 860 ); 861 } 862 } 863 Err(e) => { 864 warn!( 865 "Failed to sequence account deletion event for {}: {}", 866 did, e 867 ); 868 } 869 } 870 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 871 info!("Account {} deleted successfully", did); 872 EmptyResponse::ok().into_response() 873 } 874 Err(e) => { 875 error!("DB error deleting account, rolling back: {:?}", e); 876 ApiError::InternalError(None).into_response() 877 } 878 } 879}