this repo has no description
1use crate::api::ApiError; 2use crate::cache::Cache; 3use crate::plc::PlcClient; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use bcrypt::verify; 12use chrono::{Duration, Utc}; 13use cid::Cid; 14use jacquard_repo::commit::Commit; 15use jacquard_repo::storage::BlockStore; 16use k256::ecdsa::SigningKey; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::{error, info, warn}; 22use uuid::Uuid; 23 24#[derive(Serialize)] 25#[serde(rename_all = "camelCase")] 26pub struct CheckAccountStatusOutput { 27 pub activated: bool, 28 pub valid_did: bool, 29 pub repo_commit: String, 30 pub repo_rev: String, 31 pub repo_blocks: i64, 32 pub indexed_records: i64, 33 pub private_state_values: i64, 34 pub expected_blobs: i64, 35 pub imported_blobs: i64, 36} 37 38pub async fn check_account_status( 39 State(state): State<AppState>, 40 headers: axum::http::HeaderMap, 41) -> Response { 42 let extracted = match crate::auth::extract_auth_token_from_header( 43 headers.get("Authorization").and_then(|h| h.to_str().ok()), 44 ) { 45 Some(t) => t, 46 None => return ApiError::AuthenticationRequired.into_response(), 47 }; 48 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 49 let http_uri = format!( 50 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 51 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 52 ); 53 let did = match crate::auth::validate_token_with_dpop( 54 &state.db, 55 &extracted.token, 56 extracted.is_dpop, 57 dpop_proof, 58 "GET", 59 &http_uri, 60 true, 61 ) 62 .await 63 { 64 Ok(user) => user.did, 65 Err(e) => return ApiError::from(e).into_response(), 66 }; 67 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 68 .fetch_optional(&state.db) 69 .await 70 { 71 Ok(Some(id)) => id, 72 _ => { 73 return ( 74 StatusCode::INTERNAL_SERVER_ERROR, 75 Json(json!({"error": "InternalError"})), 76 ) 77 .into_response(); 78 } 79 }; 80 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 81 .fetch_optional(&state.db) 82 .await; 83 let deactivated_at = match user_status { 84 Ok(Some(row)) => row.deactivated_at, 85 _ => None, 86 }; 87 let repo_result = sqlx::query!( 88 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 89 user_id 90 ) 91 .fetch_optional(&state.db) 92 .await; 93 let (repo_commit, repo_rev_from_db) = match repo_result { 94 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 95 _ => (String::new(), None), 96 }; 97 let block_count: i64 = sqlx::query_scalar!( 98 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", 99 user_id 100 ) 101 .fetch_one(&state.db) 102 .await 103 .unwrap_or(Some(0)) 104 .unwrap_or(0); 105 let repo_rev = if let Some(rev) = repo_rev_from_db { 106 rev 107 } else if !repo_commit.is_empty() { 108 if let Ok(cid) = Cid::from_str(&repo_commit) { 109 if let Ok(Some(block)) = state.block_store.get(&cid).await { 110 Commit::from_cbor(&block) 111 .ok() 112 .map(|c| c.rev().to_string()) 113 .unwrap_or_default() 114 } else { 115 String::new() 116 } 117 } else { 118 String::new() 119 } 120 } else { 121 String::new() 122 }; 123 let record_count: i64 = 124 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 125 .fetch_one(&state.db) 126 .await 127 .unwrap_or(Some(0)) 128 .unwrap_or(0); 129 let imported_blobs: i64 = sqlx::query_scalar!( 130 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 131 user_id 132 ) 133 .fetch_one(&state.db) 134 .await 135 .unwrap_or(Some(0)) 136 .unwrap_or(0); 137 let expected_blobs: i64 = sqlx::query_scalar!( 138 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1", 139 user_id 140 ) 141 .fetch_one(&state.db) 142 .await 143 .unwrap_or(Some(0)) 144 .unwrap_or(0); 145 let valid_did = is_valid_did_for_service(&state.db, &state.cache, &did).await; 146 ( 147 StatusCode::OK, 148 Json(CheckAccountStatusOutput { 149 activated: deactivated_at.is_none(), 150 valid_did, 151 repo_commit: repo_commit.clone(), 152 repo_rev, 153 repo_blocks: block_count as i64, 154 indexed_records: record_count, 155 private_state_values: 0, 156 expected_blobs, 157 imported_blobs, 158 }), 159 ) 160 .into_response() 161} 162 163async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: &Arc<dyn Cache>, did: &str) -> bool { 164 assert_valid_did_document_for_service(db, cache, did, false) 165 .await 166 .is_ok() 167} 168 169async fn assert_valid_did_document_for_service( 170 db: &sqlx::PgPool, 171 cache: &Arc<dyn Cache>, 172 did: &str, 173 with_retry: bool, 174) -> Result<(), (StatusCode, Json<serde_json::Value>)> { 175 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 176 let expected_endpoint = format!("https://{}", hostname); 177 178 if did.starts_with("did:plc:") { 179 let plc_client = PlcClient::with_cache(None, Some(cache.clone())); 180 181 let max_attempts = if with_retry { 5 } else { 1 }; 182 let mut last_error = None; 183 let mut doc_data = None; 184 for attempt in 0..max_attempts { 185 if attempt > 0 { 186 let delay_ms = 500 * (1 << (attempt - 1)); 187 info!( 188 "Waiting {}ms before retry {} for DID document validation ({})", 189 delay_ms, attempt, did 190 ); 191 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 192 } 193 194 match plc_client.get_document_data(did).await { 195 Ok(data) => { 196 let pds_endpoint = data 197 .get("services") 198 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 199 .and_then(|p| p.get("endpoint")) 200 .and_then(|e| e.as_str()); 201 202 if pds_endpoint == Some(&expected_endpoint) { 203 doc_data = Some(data); 204 break; 205 } else { 206 info!( 207 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 208 attempt + 1, 209 did, 210 pds_endpoint, 211 expected_endpoint 212 ); 213 last_error = Some(format!( 214 "DID document endpoint {:?} does not match expected {}", 215 pds_endpoint, expected_endpoint 216 )); 217 } 218 } 219 Err(e) => { 220 warn!( 221 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 222 attempt + 1, 223 did, 224 e 225 ); 226 last_error = Some(format!("Could not resolve DID document: {}", e)); 227 } 228 } 229 } 230 231 let doc_data = match doc_data { 232 Some(d) => d, 233 None => { 234 return Err(( 235 StatusCode::BAD_REQUEST, 236 Json(json!({ 237 "error": "InvalidRequest", 238 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string()) 239 })), 240 )); 241 } 242 }; 243 244 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 245 if let Some(ref expected_rotation_key) = server_rotation_key { 246 let rotation_keys = doc_data 247 .get("rotationKeys") 248 .and_then(|v| v.as_array()) 249 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>()) 250 .unwrap_or_default(); 251 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 252 return Err(( 253 StatusCode::BAD_REQUEST, 254 Json(json!({ 255 "error": "InvalidRequest", 256 "message": "Server rotation key not included in PLC DID data" 257 })), 258 )); 259 } 260 } 261 262 let doc_signing_key = doc_data 263 .get("verificationMethods") 264 .and_then(|v| v.get("atproto")) 265 .and_then(|k| k.as_str()); 266 267 let user_row = sqlx::query!( 268 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 269 did 270 ) 271 .fetch_optional(db) 272 .await 273 .map_err(|e| { 274 error!("Failed to fetch user key: {:?}", e); 275 ( 276 StatusCode::INTERNAL_SERVER_ERROR, 277 Json(json!({"error": "InternalError"})), 278 ) 279 })?; 280 281 if let Some(row) = user_row { 282 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 283 .map_err(|e| { 284 error!("Failed to decrypt user key: {}", e); 285 ( 286 StatusCode::INTERNAL_SERVER_ERROR, 287 Json(json!({"error": "InternalError"})), 288 ) 289 })?; 290 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 291 error!("Failed to create signing key: {:?}", e); 292 ( 293 StatusCode::INTERNAL_SERVER_ERROR, 294 Json(json!({"error": "InternalError"})), 295 ) 296 })?; 297 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 298 299 if doc_signing_key != Some(&expected_did_key) { 300 warn!( 301 "DID {} has signing key {:?}, expected {}", 302 did, doc_signing_key, expected_did_key 303 ); 304 return Err(( 305 StatusCode::BAD_REQUEST, 306 Json(json!({ 307 "error": "InvalidRequest", 308 "message": "DID document verification method does not match expected signing key" 309 })), 310 )); 311 } 312 } 313 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 314 let client = crate::api::proxy_client::did_resolution_client(); 315 let decoded = host_and_path.replace("%3A", ":"); 316 let parts: Vec<&str> = decoded.split(':').collect(); 317 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 318 { 319 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 320 } else { 321 (parts[0].to_string(), parts[1..].to_vec()) 322 }; 323 let scheme = 324 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 325 "http" 326 } else { 327 "https" 328 }; 329 let url = if path_parts.is_empty() { 330 format!("{}://{}/.well-known/did.json", scheme, host) 331 } else { 332 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 333 }; 334 let resp = client.get(&url).send().await.map_err(|e| { 335 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 336 ( 337 StatusCode::BAD_REQUEST, 338 Json(json!({ 339 "error": "InvalidRequest", 340 "message": format!("Could not resolve DID document: {}", e) 341 })), 342 ) 343 })?; 344 let doc: serde_json::Value = resp.json().await.map_err(|e| { 345 warn!("Failed to parse did:web document for {}: {:?}", did, e); 346 ( 347 StatusCode::BAD_REQUEST, 348 Json(json!({ 349 "error": "InvalidRequest", 350 "message": format!("Could not parse DID document: {}", e) 351 })), 352 ) 353 })?; 354 355 let pds_endpoint = doc 356 .get("service") 357 .and_then(|s| s.as_array()) 358 .and_then(|arr| { 359 arr.iter().find(|svc| { 360 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 361 || svc.get("type").and_then(|t| t.as_str()) 362 == Some("AtprotoPersonalDataServer") 363 }) 364 }) 365 .and_then(|svc| svc.get("serviceEndpoint")) 366 .and_then(|e| e.as_str()); 367 368 if pds_endpoint != Some(&expected_endpoint) { 369 warn!( 370 "DID {} has endpoint {:?}, expected {}", 371 did, pds_endpoint, expected_endpoint 372 ); 373 return Err(( 374 StatusCode::BAD_REQUEST, 375 Json(json!({ 376 "error": "InvalidRequest", 377 "message": "DID document atproto_pds service endpoint does not match PDS public url" 378 })), 379 )); 380 } 381 } 382 383 Ok(()) 384} 385 386pub async fn activate_account( 387 State(state): State<AppState>, 388 headers: axum::http::HeaderMap, 389) -> Response { 390 info!("[MIGRATION] activateAccount called"); 391 let extracted = match crate::auth::extract_auth_token_from_header( 392 headers.get("Authorization").and_then(|h| h.to_str().ok()), 393 ) { 394 Some(t) => t, 395 None => { 396 info!("[MIGRATION] activateAccount: No auth token"); 397 return ApiError::AuthenticationRequired.into_response(); 398 } 399 }; 400 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 401 let http_uri = format!( 402 "https://{}/xrpc/com.atproto.server.activateAccount", 403 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 404 ); 405 let auth_user = match crate::auth::validate_token_with_dpop( 406 &state.db, 407 &extracted.token, 408 extracted.is_dpop, 409 dpop_proof, 410 "POST", 411 &http_uri, 412 true, 413 ) 414 .await 415 { 416 Ok(user) => user, 417 Err(e) => { 418 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 419 return ApiError::from(e).into_response(); 420 } 421 }; 422 info!( 423 "[MIGRATION] activateAccount: Authenticated user did={}", 424 auth_user.did 425 ); 426 427 if let Err(e) = crate::auth::scope_check::check_account_scope( 428 auth_user.is_oauth, 429 auth_user.scope.as_deref(), 430 crate::oauth::scopes::AccountAttr::Repo, 431 crate::oauth::scopes::AccountAction::Manage, 432 ) { 433 info!("[MIGRATION] activateAccount: Scope check failed"); 434 return e; 435 } 436 437 let did = auth_user.did; 438 439 info!( 440 "[MIGRATION] activateAccount: Validating DID document for did={}", 441 did 442 ); 443 let did_validation_start = std::time::Instant::now(); 444 if let Err((status, json)) = 445 assert_valid_did_document_for_service(&state.db, &state.cache, &did, true).await 446 { 447 info!( 448 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 449 did, 450 did_validation_start.elapsed() 451 ); 452 return (status, json).into_response(); 453 } 454 info!( 455 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 456 did, 457 did_validation_start.elapsed() 458 ); 459 460 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 461 .fetch_optional(&state.db) 462 .await 463 .ok() 464 .flatten(); 465 info!( 466 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 467 did, handle 468 ); 469 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 470 .execute(&state.db) 471 .await; 472 match result { 473 Ok(_) => { 474 info!( 475 "[MIGRATION] activateAccount: DB update success for did={}", 476 did 477 ); 478 if let Some(ref h) = handle { 479 let _ = state.cache.delete(&format!("handle:{}", h)).await; 480 } 481 info!( 482 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 483 did 484 ); 485 if let Err(e) = 486 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 487 { 488 warn!( 489 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 490 e 491 ); 492 } else { 493 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 494 } 495 info!( 496 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 497 did, handle 498 ); 499 if let Err(e) = 500 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 501 .await 502 { 503 warn!( 504 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 505 e 506 ); 507 } else { 508 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 509 } 510 let repo_root = sqlx::query_scalar!( 511 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 512 did 513 ) 514 .fetch_optional(&state.db) 515 .await 516 .ok() 517 .flatten(); 518 if let Some(root_cid) = repo_root { 519 info!( 520 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 521 did, root_cid 522 ); 523 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 524 if let Ok(Some(block)) = state.block_store.get(&cid).await { 525 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 526 } else { 527 None 528 } 529 } else { 530 None 531 }; 532 if let Err(e) = crate::api::repo::record::sequence_sync_event( 533 &state, 534 &did, 535 &root_cid, 536 rev.as_deref(), 537 ) 538 .await 539 { 540 warn!( 541 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 542 e 543 ); 544 } else { 545 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 546 } 547 } else { 548 warn!( 549 "[MIGRATION] activateAccount: No repo root found for did={}", 550 did 551 ); 552 } 553 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 554 (StatusCode::OK, Json(json!({}))).into_response() 555 } 556 Err(e) => { 557 error!( 558 "[MIGRATION] activateAccount: DB error activating account: {:?}", 559 e 560 ); 561 ( 562 StatusCode::INTERNAL_SERVER_ERROR, 563 Json(json!({"error": "InternalError"})), 564 ) 565 .into_response() 566 } 567 } 568} 569 570#[derive(Deserialize)] 571#[serde(rename_all = "camelCase")] 572pub struct DeactivateAccountInput { 573 pub delete_after: Option<String>, 574} 575 576pub async fn deactivate_account( 577 State(state): State<AppState>, 578 headers: axum::http::HeaderMap, 579 Json(input): Json<DeactivateAccountInput>, 580) -> Response { 581 let extracted = match crate::auth::extract_auth_token_from_header( 582 headers.get("Authorization").and_then(|h| h.to_str().ok()), 583 ) { 584 Some(t) => t, 585 None => return ApiError::AuthenticationRequired.into_response(), 586 }; 587 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 588 let http_uri = format!( 589 "https://{}/xrpc/com.atproto.server.deactivateAccount", 590 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 591 ); 592 let auth_user = match crate::auth::validate_token_with_dpop( 593 &state.db, 594 &extracted.token, 595 extracted.is_dpop, 596 dpop_proof, 597 "POST", 598 &http_uri, 599 false, 600 ) 601 .await 602 { 603 Ok(user) => user, 604 Err(e) => return ApiError::from(e).into_response(), 605 }; 606 607 if let Err(e) = crate::auth::scope_check::check_account_scope( 608 auth_user.is_oauth, 609 auth_user.scope.as_deref(), 610 crate::oauth::scopes::AccountAttr::Repo, 611 crate::oauth::scopes::AccountAction::Manage, 612 ) { 613 return e; 614 } 615 616 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 617 .delete_after 618 .as_ref() 619 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 620 .map(|dt| dt.with_timezone(&chrono::Utc)); 621 622 let did = auth_user.did; 623 624 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 625 .fetch_optional(&state.db) 626 .await 627 .ok() 628 .flatten(); 629 630 let result = sqlx::query!( 631 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 632 did, 633 delete_after 634 ) 635 .execute(&state.db) 636 .await; 637 638 match result { 639 Ok(_) => { 640 if let Some(ref h) = handle { 641 let _ = state.cache.delete(&format!("handle:{}", h)).await; 642 } 643 if let Err(e) = crate::api::repo::record::sequence_account_event( 644 &state, 645 &did, 646 false, 647 Some("deactivated"), 648 ) 649 .await 650 { 651 warn!("Failed to sequence account deactivated event: {}", e); 652 } 653 (StatusCode::OK, Json(json!({}))).into_response() 654 } 655 Err(e) => { 656 error!("DB error deactivating account: {:?}", e); 657 ( 658 StatusCode::INTERNAL_SERVER_ERROR, 659 Json(json!({"error": "InternalError"})), 660 ) 661 .into_response() 662 } 663 } 664} 665 666pub async fn request_account_delete( 667 State(state): State<AppState>, 668 headers: axum::http::HeaderMap, 669) -> Response { 670 let extracted = match crate::auth::extract_auth_token_from_header( 671 headers.get("Authorization").and_then(|h| h.to_str().ok()), 672 ) { 673 Some(t) => t, 674 None => return ApiError::AuthenticationRequired.into_response(), 675 }; 676 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 677 let http_uri = format!( 678 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 679 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 680 ); 681 let validated = match crate::auth::validate_token_with_dpop( 682 &state.db, 683 &extracted.token, 684 extracted.is_dpop, 685 dpop_proof, 686 "POST", 687 &http_uri, 688 true, 689 ) 690 .await 691 { 692 Ok(user) => user, 693 Err(e) => return ApiError::from(e).into_response(), 694 }; 695 let did = validated.did.clone(); 696 697 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await { 698 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await; 699 } 700 701 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 702 .fetch_optional(&state.db) 703 .await 704 { 705 Ok(Some(id)) => id, 706 _ => { 707 return ( 708 StatusCode::INTERNAL_SERVER_ERROR, 709 Json(json!({"error": "InternalError"})), 710 ) 711 .into_response(); 712 } 713 }; 714 let confirmation_token = Uuid::new_v4().to_string(); 715 let expires_at = Utc::now() + Duration::minutes(15); 716 let insert = sqlx::query!( 717 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 718 confirmation_token, 719 did, 720 expires_at 721 ) 722 .execute(&state.db) 723 .await; 724 if let Err(e) = insert { 725 error!("DB error creating deletion token: {:?}", e); 726 return ( 727 StatusCode::INTERNAL_SERVER_ERROR, 728 Json(json!({"error": "InternalError"})), 729 ) 730 .into_response(); 731 } 732 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 733 if let Err(e) = 734 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 735 .await 736 { 737 warn!("Failed to enqueue account deletion notification: {:?}", e); 738 } 739 info!("Account deletion requested for user {}", did); 740 (StatusCode::OK, Json(json!({}))).into_response() 741} 742 743#[derive(Deserialize)] 744pub struct DeleteAccountInput { 745 pub did: String, 746 pub password: String, 747 pub token: String, 748} 749 750pub async fn delete_account( 751 State(state): State<AppState>, 752 Json(input): Json<DeleteAccountInput>, 753) -> Response { 754 let did = input.did.trim(); 755 let password = &input.password; 756 let token = input.token.trim(); 757 if did.is_empty() { 758 return ( 759 StatusCode::BAD_REQUEST, 760 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 761 ) 762 .into_response(); 763 } 764 if password.is_empty() { 765 return ( 766 StatusCode::BAD_REQUEST, 767 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 768 ) 769 .into_response(); 770 } 771 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 772 if password.len() > OLD_PASSWORD_MAX_LENGTH { 773 return ( 774 StatusCode::BAD_REQUEST, 775 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})), 776 ) 777 .into_response(); 778 } 779 if token.is_empty() { 780 return ( 781 StatusCode::BAD_REQUEST, 782 Json(json!({"error": "InvalidToken", "message": "token is required"})), 783 ) 784 .into_response(); 785 } 786 let user = sqlx::query!( 787 "SELECT id, password_hash, handle FROM users WHERE did = $1", 788 did 789 ) 790 .fetch_optional(&state.db) 791 .await; 792 let (user_id, password_hash, handle) = match user { 793 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 794 Ok(None) => { 795 return ( 796 StatusCode::BAD_REQUEST, 797 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 798 ) 799 .into_response(); 800 } 801 Err(e) => { 802 error!("DB error in delete_account: {:?}", e); 803 return ( 804 StatusCode::INTERNAL_SERVER_ERROR, 805 Json(json!({"error": "InternalError"})), 806 ) 807 .into_response(); 808 } 809 }; 810 let password_valid = if password_hash 811 .as_ref() 812 .map(|h| verify(password, h).unwrap_or(false)) 813 .unwrap_or(false) 814 { 815 true 816 } else { 817 let app_pass_rows = sqlx::query!( 818 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 819 user_id 820 ) 821 .fetch_all(&state.db) 822 .await 823 .unwrap_or_default(); 824 app_pass_rows 825 .iter() 826 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 827 }; 828 if !password_valid { 829 return ( 830 StatusCode::UNAUTHORIZED, 831 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 832 ) 833 .into_response(); 834 } 835 let deletion_request = sqlx::query!( 836 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 837 token 838 ) 839 .fetch_optional(&state.db) 840 .await; 841 let (token_did, expires_at) = match deletion_request { 842 Ok(Some(row)) => (row.did, row.expires_at), 843 Ok(None) => { 844 return ( 845 StatusCode::BAD_REQUEST, 846 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 847 ) 848 .into_response(); 849 } 850 Err(e) => { 851 error!("DB error fetching deletion token: {:?}", e); 852 return ( 853 StatusCode::INTERNAL_SERVER_ERROR, 854 Json(json!({"error": "InternalError"})), 855 ) 856 .into_response(); 857 } 858 }; 859 if token_did != did { 860 return ( 861 StatusCode::BAD_REQUEST, 862 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 863 ) 864 .into_response(); 865 } 866 if Utc::now() > expires_at { 867 let _ = sqlx::query!( 868 "DELETE FROM account_deletion_requests WHERE token = $1", 869 token 870 ) 871 .execute(&state.db) 872 .await; 873 return ( 874 StatusCode::BAD_REQUEST, 875 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 876 ) 877 .into_response(); 878 } 879 let mut tx = match state.db.begin().await { 880 Ok(tx) => tx, 881 Err(e) => { 882 error!("Failed to begin transaction: {:?}", e); 883 return ( 884 StatusCode::INTERNAL_SERVER_ERROR, 885 Json(json!({"error": "InternalError"})), 886 ) 887 .into_response(); 888 } 889 }; 890 let deletion_result: Result<(), sqlx::Error> = async { 891 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 892 .execute(&mut *tx) 893 .await?; 894 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 895 .execute(&mut *tx) 896 .await?; 897 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 898 .execute(&mut *tx) 899 .await?; 900 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 901 .execute(&mut *tx) 902 .await?; 903 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 904 .execute(&mut *tx) 905 .await?; 906 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 907 .execute(&mut *tx) 908 .await?; 909 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 910 .execute(&mut *tx) 911 .await?; 912 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 913 .execute(&mut *tx) 914 .await?; 915 Ok(()) 916 } 917 .await; 918 match deletion_result { 919 Ok(()) => { 920 if let Err(e) = tx.commit().await { 921 error!("Failed to commit account deletion transaction: {:?}", e); 922 return ( 923 StatusCode::INTERNAL_SERVER_ERROR, 924 Json(json!({"error": "InternalError"})), 925 ) 926 .into_response(); 927 } 928 let account_seq = crate::api::repo::record::sequence_account_event( 929 &state, 930 did, 931 false, 932 Some("deleted"), 933 ) 934 .await; 935 match account_seq { 936 Ok(seq) => { 937 if let Err(e) = sqlx::query!( 938 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 939 did, 940 seq 941 ) 942 .execute(&state.db) 943 .await 944 { 945 warn!( 946 "Failed to cleanup sequences for deleted account {}: {}", 947 did, e 948 ); 949 } 950 } 951 Err(e) => { 952 warn!( 953 "Failed to sequence account deletion event for {}: {}", 954 did, e 955 ); 956 } 957 } 958 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 959 info!("Account {} deleted successfully", did); 960 (StatusCode::OK, Json(json!({}))).into_response() 961 } 962 Err(e) => { 963 error!("DB error deleting account, rolling back: {:?}", e); 964 ( 965 StatusCode::INTERNAL_SERVER_ERROR, 966 Json(json!({"error": "InternalError"})), 967 ) 968 .into_response() 969 } 970 } 971}