this repo has no description
1use crate::api::error::ApiError; 2use crate::api::EmptyResponse; 3use crate::cache::Cache; 4use crate::plc::PlcClient; 5use crate::state::AppState; 6use crate::types::PlainPassword; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::verify; 14use chrono::{Duration, Utc}; 15use cid::Cid; 16use jacquard_repo::commit::Commit; 17use jacquard_repo::storage::BlockStore; 18use k256::ecdsa::SigningKey; 19use serde::{Deserialize, Serialize}; 20use std::str::FromStr; 21use std::sync::Arc; 22use tracing::{error, info, warn}; 23use uuid::Uuid; 24 25#[derive(Serialize)] 26#[serde(rename_all = "camelCase")] 27pub struct CheckAccountStatusOutput { 28 pub activated: bool, 29 pub valid_did: bool, 30 pub repo_commit: String, 31 pub repo_rev: String, 32 pub repo_blocks: i64, 33 pub indexed_records: i64, 34 pub private_state_values: i64, 35 pub expected_blobs: i64, 36 pub imported_blobs: i64, 37} 38 39pub async fn check_account_status( 40 State(state): State<AppState>, 41 headers: axum::http::HeaderMap, 42) -> Response { 43 let extracted = match crate::auth::extract_auth_token_from_header( 44 headers.get("Authorization").and_then(|h| h.to_str().ok()), 45 ) { 46 Some(t) => t, 47 None => return ApiError::AuthenticationRequired.into_response(), 48 }; 49 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 50 let http_uri = format!( 51 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 52 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 53 ); 54 let did = match crate::auth::validate_token_with_dpop( 55 &state.db, 56 &extracted.token, 57 extracted.is_dpop, 58 dpop_proof, 59 "GET", 60 &http_uri, 61 true, 62 ) 63 .await 64 { 65 Ok(user) => user.did, 66 Err(e) => return ApiError::from(e).into_response(), 67 }; 68 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 69 .fetch_optional(&state.db) 70 .await 71 { 72 Ok(Some(id)) => id, 73 _ => { 74 return ApiError::InternalError(None).into_response(); 75 } 76 }; 77 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did.as_str()) 78 .fetch_optional(&state.db) 79 .await; 80 let deactivated_at = match user_status { 81 Ok(Some(row)) => row.deactivated_at, 82 _ => None, 83 }; 84 let repo_result = sqlx::query!( 85 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 86 user_id 87 ) 88 .fetch_optional(&state.db) 89 .await; 90 let (repo_commit, repo_rev_from_db) = match repo_result { 91 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 92 _ => (String::new(), None), 93 }; 94 let block_count: i64 = sqlx::query_scalar!( 95 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", 96 user_id 97 ) 98 .fetch_one(&state.db) 99 .await 100 .unwrap_or(Some(0)) 101 .unwrap_or(0); 102 let repo_rev = if let Some(rev) = repo_rev_from_db { 103 rev 104 } else if !repo_commit.is_empty() { 105 if let Ok(cid) = Cid::from_str(&repo_commit) { 106 if let Ok(Some(block)) = state.block_store.get(&cid).await { 107 Commit::from_cbor(&block) 108 .ok() 109 .map(|c| c.rev().to_string()) 110 .unwrap_or_default() 111 } else { 112 String::new() 113 } 114 } else { 115 String::new() 116 } 117 } else { 118 String::new() 119 }; 120 let record_count: i64 = 121 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 122 .fetch_one(&state.db) 123 .await 124 .unwrap_or(Some(0)) 125 .unwrap_or(0); 126 let imported_blobs: i64 = sqlx::query_scalar!( 127 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 128 user_id 129 ) 130 .fetch_one(&state.db) 131 .await 132 .unwrap_or(Some(0)) 133 .unwrap_or(0); 134 let expected_blobs: i64 = sqlx::query_scalar!( 135 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1", 136 user_id 137 ) 138 .fetch_one(&state.db) 139 .await 140 .unwrap_or(Some(0)) 141 .unwrap_or(0); 142 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await; 143 ( 144 StatusCode::OK, 145 Json(CheckAccountStatusOutput { 146 activated: deactivated_at.is_none(), 147 valid_did, 148 repo_commit: repo_commit.clone(), 149 repo_rev, 150 repo_blocks: block_count as i64, 151 indexed_records: record_count, 152 private_state_values: 0, 153 expected_blobs, 154 imported_blobs, 155 }), 156 ) 157 .into_response() 158} 159 160async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool { 161 assert_valid_did_document_for_service(db, cache, did, false) 162 .await 163 .is_ok() 164} 165 166async fn assert_valid_did_document_for_service( 167 db: &sqlx::PgPool, 168 cache: Arc<dyn Cache>, 169 did: &str, 170 with_retry: bool, 171) -> Result<(), ApiError> { 172 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 173 let expected_endpoint = format!("https://{}", hostname); 174 175 if did.starts_with("did:plc:") { 176 let plc_client = PlcClient::with_cache(None, Some(cache.clone())); 177 178 let max_attempts = if with_retry { 5 } else { 1 }; 179 let mut last_error = None; 180 let mut doc_data = None; 181 for attempt in 0..max_attempts { 182 if attempt > 0 { 183 let delay_ms = 500 * (1 << (attempt - 1)); 184 info!( 185 "Waiting {}ms before retry {} for DID document validation ({})", 186 delay_ms, attempt, did 187 ); 188 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 189 } 190 191 match plc_client.get_document_data(did).await { 192 Ok(data) => { 193 let pds_endpoint = data 194 .get("services") 195 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 196 .and_then(|p| p.get("endpoint")) 197 .and_then(|e| e.as_str()); 198 199 if pds_endpoint == Some(&expected_endpoint) { 200 doc_data = Some(data); 201 break; 202 } else { 203 info!( 204 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 205 attempt + 1, 206 did, 207 pds_endpoint, 208 expected_endpoint 209 ); 210 last_error = Some(format!( 211 "DID document endpoint {:?} does not match expected {}", 212 pds_endpoint, expected_endpoint 213 )); 214 } 215 } 216 Err(e) => { 217 warn!( 218 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 219 attempt + 1, 220 did, 221 e 222 ); 223 last_error = Some(format!("Could not resolve DID document: {}", e)); 224 } 225 } 226 } 227 228 let Some(doc_data) = doc_data else { 229 return Err(ApiError::InvalidRequest( 230 last_error.unwrap_or_else(|| "DID document validation failed".to_string()), 231 )); 232 }; 233 234 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 235 if let Some(ref expected_rotation_key) = server_rotation_key { 236 let rotation_keys = doc_data 237 .get("rotationKeys") 238 .and_then(|v| v.as_array()) 239 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>()) 240 .unwrap_or_default(); 241 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 242 return Err(ApiError::InvalidRequest( 243 "Server rotation key not included in PLC DID data".into(), 244 )); 245 } 246 } 247 248 let doc_signing_key = doc_data 249 .get("verificationMethods") 250 .and_then(|v| v.get("atproto")) 251 .and_then(|k| k.as_str()); 252 253 let user_row = sqlx::query!( 254 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 255 did 256 ) 257 .fetch_optional(db) 258 .await 259 .map_err(|e| { 260 error!("Failed to fetch user key: {:?}", e); 261 ApiError::InternalError(None) 262 })?; 263 264 if let Some(row) = user_row { 265 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 266 .map_err(|e| { 267 error!("Failed to decrypt user key: {}", e); 268 ApiError::InternalError(None) 269 })?; 270 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 271 error!("Failed to create signing key: {:?}", e); 272 ApiError::InternalError(None) 273 })?; 274 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 275 276 if doc_signing_key != Some(&expected_did_key) { 277 warn!( 278 "DID {} has signing key {:?}, expected {}", 279 did, doc_signing_key, expected_did_key 280 ); 281 return Err(ApiError::InvalidRequest( 282 "DID document verification method does not match expected signing key".into(), 283 )); 284 } 285 } 286 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 287 let client = crate::api::proxy_client::did_resolution_client(); 288 let decoded = host_and_path.replace("%3A", ":"); 289 let parts: Vec<&str> = decoded.split(':').collect(); 290 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 291 { 292 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 293 } else { 294 (parts[0].to_string(), parts[1..].to_vec()) 295 }; 296 let scheme = 297 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 298 "http" 299 } else { 300 "https" 301 }; 302 let url = if path_parts.is_empty() { 303 format!("{}://{}/.well-known/did.json", scheme, host) 304 } else { 305 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 306 }; 307 let resp = client.get(&url).send().await.map_err(|e| { 308 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 309 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e)) 310 })?; 311 let doc: serde_json::Value = resp.json().await.map_err(|e| { 312 warn!("Failed to parse did:web document for {}: {:?}", did, e); 313 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e)) 314 })?; 315 316 let pds_endpoint = doc 317 .get("service") 318 .and_then(|s| s.as_array()) 319 .and_then(|arr| { 320 arr.iter().find(|svc| { 321 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 322 || svc.get("type").and_then(|t| t.as_str()) 323 == Some("AtprotoPersonalDataServer") 324 }) 325 }) 326 .and_then(|svc| svc.get("serviceEndpoint")) 327 .and_then(|e| e.as_str()); 328 329 if pds_endpoint != Some(&expected_endpoint) { 330 warn!( 331 "DID {} has endpoint {:?}, expected {}", 332 did, pds_endpoint, expected_endpoint 333 ); 334 return Err(ApiError::InvalidRequest( 335 "DID document atproto_pds service endpoint does not match PDS public url".into(), 336 )); 337 } 338 } 339 340 Ok(()) 341} 342 343pub async fn activate_account( 344 State(state): State<AppState>, 345 headers: axum::http::HeaderMap, 346) -> Response { 347 info!("[MIGRATION] activateAccount called"); 348 let extracted = match crate::auth::extract_auth_token_from_header( 349 headers.get("Authorization").and_then(|h| h.to_str().ok()), 350 ) { 351 Some(t) => t, 352 None => { 353 info!("[MIGRATION] activateAccount: No auth token"); 354 return ApiError::AuthenticationRequired.into_response(); 355 } 356 }; 357 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 358 let http_uri = format!( 359 "https://{}/xrpc/com.atproto.server.activateAccount", 360 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 361 ); 362 let auth_user = match crate::auth::validate_token_with_dpop( 363 &state.db, 364 &extracted.token, 365 extracted.is_dpop, 366 dpop_proof, 367 "POST", 368 &http_uri, 369 true, 370 ) 371 .await 372 { 373 Ok(user) => user, 374 Err(e) => { 375 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 376 return ApiError::from(e).into_response(); 377 } 378 }; 379 info!( 380 "[MIGRATION] activateAccount: Authenticated user did={}", 381 auth_user.did 382 ); 383 384 if let Err(e) = crate::auth::scope_check::check_account_scope( 385 auth_user.is_oauth, 386 auth_user.scope.as_deref(), 387 crate::oauth::scopes::AccountAttr::Repo, 388 crate::oauth::scopes::AccountAction::Manage, 389 ) { 390 info!("[MIGRATION] activateAccount: Scope check failed"); 391 return e; 392 } 393 394 let did = auth_user.did; 395 396 info!( 397 "[MIGRATION] activateAccount: Validating DID document for did={}", 398 did 399 ); 400 let did_validation_start = std::time::Instant::now(); 401 if let Err(e) = 402 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true).await 403 { 404 info!( 405 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 406 did, 407 did_validation_start.elapsed() 408 ); 409 return e.into_response(); 410 } 411 info!( 412 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 413 did, 414 did_validation_start.elapsed() 415 ); 416 417 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 418 .fetch_optional(&state.db) 419 .await 420 .ok() 421 .flatten(); 422 info!( 423 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 424 did, handle 425 ); 426 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did.as_str()) 427 .execute(&state.db) 428 .await; 429 match result { 430 Ok(_) => { 431 info!( 432 "[MIGRATION] activateAccount: DB update success for did={}", 433 did 434 ); 435 if let Some(ref h) = handle { 436 let _ = state.cache.delete(&format!("handle:{}", h)).await; 437 } 438 info!( 439 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 440 did 441 ); 442 if let Err(e) = 443 crate::api::repo::record::sequence_account_event(&state, did.as_str(), true, None).await 444 { 445 warn!( 446 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 447 e 448 ); 449 } else { 450 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 451 } 452 info!( 453 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 454 did, handle 455 ); 456 if let Err(e) = 457 crate::api::repo::record::sequence_identity_event(&state, did.as_str(), handle.as_deref()) 458 .await 459 { 460 warn!( 461 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 462 e 463 ); 464 } else { 465 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 466 } 467 let repo_root = sqlx::query_scalar!( 468 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 469 did.as_str() 470 ) 471 .fetch_optional(&state.db) 472 .await 473 .ok() 474 .flatten(); 475 if let Some(root_cid) = repo_root { 476 info!( 477 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 478 did, root_cid 479 ); 480 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 481 if let Ok(Some(block)) = state.block_store.get(&cid).await { 482 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 483 } else { 484 None 485 } 486 } else { 487 None 488 }; 489 if let Err(e) = crate::api::repo::record::sequence_sync_event( 490 &state, 491 did.as_str(), 492 &root_cid, 493 rev.as_deref(), 494 ) 495 .await 496 { 497 warn!( 498 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 499 e 500 ); 501 } else { 502 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 503 } 504 } else { 505 warn!( 506 "[MIGRATION] activateAccount: No repo root found for did={}", 507 did 508 ); 509 } 510 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 511 EmptyResponse::ok().into_response() 512 } 513 Err(e) => { 514 error!( 515 "[MIGRATION] activateAccount: DB error activating account: {:?}", 516 e 517 ); 518 ApiError::InternalError(None).into_response() 519 } 520 } 521} 522 523#[derive(Deserialize)] 524#[serde(rename_all = "camelCase")] 525pub struct DeactivateAccountInput { 526 pub delete_after: Option<String>, 527} 528 529pub async fn deactivate_account( 530 State(state): State<AppState>, 531 headers: axum::http::HeaderMap, 532 Json(input): Json<DeactivateAccountInput>, 533) -> Response { 534 let extracted = match crate::auth::extract_auth_token_from_header( 535 headers.get("Authorization").and_then(|h| h.to_str().ok()), 536 ) { 537 Some(t) => t, 538 None => return ApiError::AuthenticationRequired.into_response(), 539 }; 540 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 541 let http_uri = format!( 542 "https://{}/xrpc/com.atproto.server.deactivateAccount", 543 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 544 ); 545 let auth_user = match crate::auth::validate_token_with_dpop( 546 &state.db, 547 &extracted.token, 548 extracted.is_dpop, 549 dpop_proof, 550 "POST", 551 &http_uri, 552 false, 553 ) 554 .await 555 { 556 Ok(user) => user, 557 Err(e) => return ApiError::from(e).into_response(), 558 }; 559 560 if let Err(e) = crate::auth::scope_check::check_account_scope( 561 auth_user.is_oauth, 562 auth_user.scope.as_deref(), 563 crate::oauth::scopes::AccountAttr::Repo, 564 crate::oauth::scopes::AccountAction::Manage, 565 ) { 566 return e; 567 } 568 569 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 570 .delete_after 571 .as_ref() 572 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 573 .map(|dt| dt.with_timezone(&chrono::Utc)); 574 575 let did = auth_user.did; 576 577 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 578 .fetch_optional(&state.db) 579 .await 580 .ok() 581 .flatten(); 582 583 let result = sqlx::query!( 584 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 585 did.as_str(), 586 delete_after 587 ) 588 .execute(&state.db) 589 .await; 590 591 match result { 592 Ok(_) => { 593 if let Some(ref h) = handle { 594 let _ = state.cache.delete(&format!("handle:{}", h)).await; 595 } 596 if let Err(e) = crate::api::repo::record::sequence_account_event( 597 &state, 598 did.as_str(), 599 false, 600 Some("deactivated"), 601 ) 602 .await 603 { 604 warn!("Failed to sequence account deactivated event: {}", e); 605 } 606 EmptyResponse::ok().into_response() 607 } 608 Err(e) => { 609 error!("DB error deactivating account: {:?}", e); 610 ApiError::InternalError(None).into_response() 611 } 612 } 613} 614 615pub async fn request_account_delete( 616 State(state): State<AppState>, 617 headers: axum::http::HeaderMap, 618) -> Response { 619 let extracted = match crate::auth::extract_auth_token_from_header( 620 headers.get("Authorization").and_then(|h| h.to_str().ok()), 621 ) { 622 Some(t) => t, 623 None => return ApiError::AuthenticationRequired.into_response(), 624 }; 625 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 626 let http_uri = format!( 627 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 628 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 629 ); 630 let validated = match crate::auth::validate_token_with_dpop( 631 &state.db, 632 &extracted.token, 633 extracted.is_dpop, 634 dpop_proof, 635 "POST", 636 &http_uri, 637 true, 638 ) 639 .await 640 { 641 Ok(user) => user, 642 Err(e) => return ApiError::from(e).into_response(), 643 }; 644 let did = validated.did.clone(); 645 646 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await { 647 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str()).await; 648 } 649 650 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 651 .fetch_optional(&state.db) 652 .await 653 { 654 Ok(Some(id)) => id, 655 _ => { 656 return ApiError::InternalError(None).into_response(); 657 } 658 }; 659 let confirmation_token = Uuid::new_v4().to_string(); 660 let expires_at = Utc::now() + Duration::minutes(15); 661 let insert = sqlx::query!( 662 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 663 confirmation_token, 664 did.as_str(), 665 expires_at 666 ) 667 .execute(&state.db) 668 .await; 669 if let Err(e) = insert { 670 error!("DB error creating deletion token: {:?}", e); 671 return ApiError::InternalError(None).into_response(); 672 } 673 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 674 if let Err(e) = 675 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 676 .await 677 { 678 warn!("Failed to enqueue account deletion notification: {:?}", e); 679 } 680 info!("Account deletion requested for user {}", did); 681 EmptyResponse::ok().into_response() 682} 683 684#[derive(Deserialize)] 685pub struct DeleteAccountInput { 686 pub did: crate::types::Did, 687 pub password: PlainPassword, 688 pub token: String, 689} 690 691pub async fn delete_account( 692 State(state): State<AppState>, 693 Json(input): Json<DeleteAccountInput>, 694) -> Response { 695 let did = &input.did; 696 let password = &input.password; 697 let token = input.token.trim(); 698 if password.is_empty() { 699 return ApiError::InvalidRequest("password is required".into()).into_response(); 700 } 701 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 702 if password.len() > OLD_PASSWORD_MAX_LENGTH { 703 return ApiError::InvalidRequest("Invalid password length".into()).into_response(); 704 } 705 if token.is_empty() { 706 return ApiError::InvalidToken(Some("token is required".into())).into_response(); 707 } 708 let user = sqlx::query!( 709 "SELECT id, password_hash, handle FROM users WHERE did = $1", 710 did.as_str() 711 ) 712 .fetch_optional(&state.db) 713 .await; 714 let (user_id, password_hash, handle) = match user { 715 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 716 Ok(None) => { 717 return ApiError::InvalidRequest("account not found".into()).into_response(); 718 } 719 Err(e) => { 720 error!("DB error in delete_account: {:?}", e); 721 return ApiError::InternalError(None).into_response(); 722 } 723 }; 724 let password_valid = if password_hash 725 .as_ref() 726 .map(|h| verify(password, h).unwrap_or(false)) 727 .unwrap_or(false) 728 { 729 true 730 } else { 731 let app_pass_rows = sqlx::query!( 732 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 733 user_id 734 ) 735 .fetch_all(&state.db) 736 .await 737 .unwrap_or_default(); 738 app_pass_rows 739 .iter() 740 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 741 }; 742 if !password_valid { 743 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response(); 744 } 745 let deletion_request = sqlx::query!( 746 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 747 token 748 ) 749 .fetch_optional(&state.db) 750 .await; 751 let (token_did, expires_at) = match deletion_request { 752 Ok(Some(row)) => (row.did, row.expires_at), 753 Ok(None) => { 754 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response(); 755 } 756 Err(e) => { 757 error!("DB error fetching deletion token: {:?}", e); 758 return ApiError::InternalError(None).into_response(); 759 } 760 }; 761 if token_did != did.as_str() { 762 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response(); 763 } 764 if Utc::now() > expires_at { 765 let _ = sqlx::query!( 766 "DELETE FROM account_deletion_requests WHERE token = $1", 767 token 768 ) 769 .execute(&state.db) 770 .await; 771 return ApiError::ExpiredToken(None).into_response(); 772 } 773 let mut tx = match state.db.begin().await { 774 Ok(tx) => tx, 775 Err(e) => { 776 error!("Failed to begin transaction: {:?}", e); 777 return ApiError::InternalError(None).into_response(); 778 } 779 }; 780 let deletion_result: Result<(), sqlx::Error> = async { 781 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 782 .execute(&mut *tx) 783 .await?; 784 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 785 .execute(&mut *tx) 786 .await?; 787 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 788 .execute(&mut *tx) 789 .await?; 790 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 791 .execute(&mut *tx) 792 .await?; 793 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 794 .execute(&mut *tx) 795 .await?; 796 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 797 .execute(&mut *tx) 798 .await?; 799 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 800 .execute(&mut *tx) 801 .await?; 802 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 803 .execute(&mut *tx) 804 .await?; 805 Ok(()) 806 } 807 .await; 808 match deletion_result { 809 Ok(()) => { 810 if let Err(e) = tx.commit().await { 811 error!("Failed to commit account deletion transaction: {:?}", e); 812 return ApiError::InternalError(None).into_response(); 813 } 814 let account_seq = crate::api::repo::record::sequence_account_event( 815 &state, 816 did, 817 false, 818 Some("deleted"), 819 ) 820 .await; 821 match account_seq { 822 Ok(seq) => { 823 if let Err(e) = sqlx::query!( 824 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 825 did, 826 seq 827 ) 828 .execute(&state.db) 829 .await 830 { 831 warn!( 832 "Failed to cleanup sequences for deleted account {}: {}", 833 did, e 834 ); 835 } 836 } 837 Err(e) => { 838 warn!( 839 "Failed to sequence account deletion event for {}: {}", 840 did, e 841 ); 842 } 843 } 844 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 845 info!("Account {} deleted successfully", did); 846 EmptyResponse::ok().into_response() 847 } 848 Err(e) => { 849 error!("DB error deleting account, rolling back: {:?}", e); 850 ApiError::InternalError(None).into_response() 851 } 852 } 853}