this repo has no description
1use crate::api::ApiError; 2use crate::plc::PlcClient; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::{Duration, Utc}; 12use cid::Cid; 13use jacquard_repo::commit::Commit; 14use jacquard_repo::storage::BlockStore; 15use k256::ecdsa::SigningKey; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::str::FromStr; 19use tracing::{error, info, warn}; 20use uuid::Uuid; 21 22#[derive(Serialize)] 23#[serde(rename_all = "camelCase")] 24pub struct CheckAccountStatusOutput { 25 pub activated: bool, 26 pub valid_did: bool, 27 pub repo_commit: String, 28 pub repo_rev: String, 29 pub repo_blocks: i64, 30 pub indexed_records: i64, 31 pub private_state_values: i64, 32 pub expected_blobs: i64, 33 pub imported_blobs: i64, 34} 35 36pub async fn check_account_status( 37 State(state): State<AppState>, 38 headers: axum::http::HeaderMap, 39) -> Response { 40 let extracted = match crate::auth::extract_auth_token_from_header( 41 headers.get("Authorization").and_then(|h| h.to_str().ok()), 42 ) { 43 Some(t) => t, 44 None => return ApiError::AuthenticationRequired.into_response(), 45 }; 46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 47 let http_uri = format!( 48 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 50 ); 51 let did = match crate::auth::validate_token_with_dpop( 52 &state.db, 53 &extracted.token, 54 extracted.is_dpop, 55 dpop_proof, 56 "GET", 57 &http_uri, 58 true, 59 ) 60 .await 61 { 62 Ok(user) => user.did, 63 Err(e) => return ApiError::from(e).into_response(), 64 }; 65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 66 .fetch_optional(&state.db) 67 .await 68 { 69 Ok(Some(id)) => id, 70 _ => { 71 return ( 72 StatusCode::INTERNAL_SERVER_ERROR, 73 Json(json!({"error": "InternalError"})), 74 ) 75 .into_response(); 76 } 77 }; 78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 79 .fetch_optional(&state.db) 80 .await; 81 let deactivated_at = match user_status { 82 Ok(Some(row)) => row.deactivated_at, 83 _ => None, 84 }; 85 let repo_result = sqlx::query!( 86 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 87 user_id 88 ) 89 .fetch_optional(&state.db) 90 .await; 91 let (repo_commit, repo_rev_from_db) = match repo_result { 92 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 93 _ => (String::new(), None), 94 }; 95 let block_count: i64 = sqlx::query_scalar!( 96 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", 97 user_id 98 ) 99 .fetch_one(&state.db) 100 .await 101 .unwrap_or(Some(0)) 102 .unwrap_or(0); 103 let repo_rev = if let Some(rev) = repo_rev_from_db { 104 rev 105 } else if !repo_commit.is_empty() { 106 if let Ok(cid) = Cid::from_str(&repo_commit) { 107 if let Ok(Some(block)) = state.block_store.get(&cid).await { 108 Commit::from_cbor(&block) 109 .ok() 110 .map(|c| c.rev().to_string()) 111 .unwrap_or_default() 112 } else { 113 String::new() 114 } 115 } else { 116 String::new() 117 } 118 } else { 119 String::new() 120 }; 121 let record_count: i64 = 122 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 123 .fetch_one(&state.db) 124 .await 125 .unwrap_or(Some(0)) 126 .unwrap_or(0); 127 let imported_blobs: i64 = sqlx::query_scalar!( 128 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 129 user_id 130 ) 131 .fetch_one(&state.db) 132 .await 133 .unwrap_or(Some(0)) 134 .unwrap_or(0); 135 let expected_blobs: i64 = sqlx::query_scalar!( 136 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1", 137 user_id 138 ) 139 .fetch_one(&state.db) 140 .await 141 .unwrap_or(Some(0)) 142 .unwrap_or(0); 143 let valid_did = is_valid_did_for_service(&state.db, &did).await; 144 ( 145 StatusCode::OK, 146 Json(CheckAccountStatusOutput { 147 activated: deactivated_at.is_none(), 148 valid_did, 149 repo_commit: repo_commit.clone(), 150 repo_rev, 151 repo_blocks: block_count as i64, 152 indexed_records: record_count, 153 private_state_values: 0, 154 expected_blobs, 155 imported_blobs, 156 }), 157 ) 158 .into_response() 159} 160 161async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool { 162 assert_valid_did_document_for_service(db, did, false) 163 .await 164 .is_ok() 165} 166 167async fn assert_valid_did_document_for_service( 168 db: &sqlx::PgPool, 169 did: &str, 170 with_retry: bool, 171) -> Result<(), (StatusCode, Json<serde_json::Value>)> { 172 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 173 let expected_endpoint = format!("https://{}", hostname); 174 175 if did.starts_with("did:plc:") { 176 let plc_client = PlcClient::new(None); 177 178 let max_attempts = if with_retry { 5 } else { 1 }; 179 let mut last_error = None; 180 let mut doc_data = None; 181 for attempt in 0..max_attempts { 182 if attempt > 0 { 183 let delay_ms = 500 * (1 << (attempt - 1)); 184 info!( 185 "Waiting {}ms before retry {} for DID document validation ({})", 186 delay_ms, attempt, did 187 ); 188 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 189 } 190 191 match plc_client.get_document_data(did).await { 192 Ok(data) => { 193 let pds_endpoint = data 194 .get("services") 195 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 196 .and_then(|p| p.get("endpoint")) 197 .and_then(|e| e.as_str()); 198 199 if pds_endpoint == Some(&expected_endpoint) { 200 doc_data = Some(data); 201 break; 202 } else { 203 info!( 204 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 205 attempt + 1, 206 did, 207 pds_endpoint, 208 expected_endpoint 209 ); 210 last_error = Some(format!( 211 "DID document endpoint {:?} does not match expected {}", 212 pds_endpoint, expected_endpoint 213 )); 214 } 215 } 216 Err(e) => { 217 warn!( 218 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 219 attempt + 1, 220 did, 221 e 222 ); 223 last_error = Some(format!("Could not resolve DID document: {}", e)); 224 } 225 } 226 } 227 228 let doc_data = match doc_data { 229 Some(d) => d, 230 None => { 231 return Err(( 232 StatusCode::BAD_REQUEST, 233 Json(json!({ 234 "error": "InvalidRequest", 235 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string()) 236 })), 237 )); 238 } 239 }; 240 241 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 242 if let Some(ref expected_rotation_key) = server_rotation_key { 243 let rotation_keys = doc_data 244 .get("rotationKeys") 245 .and_then(|v| v.as_array()) 246 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>()) 247 .unwrap_or_default(); 248 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 249 return Err(( 250 StatusCode::BAD_REQUEST, 251 Json(json!({ 252 "error": "InvalidRequest", 253 "message": "Server rotation key not included in PLC DID data" 254 })), 255 )); 256 } 257 } 258 259 let doc_signing_key = doc_data 260 .get("verificationMethods") 261 .and_then(|v| v.get("atproto")) 262 .and_then(|k| k.as_str()); 263 264 let user_row = sqlx::query!( 265 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 266 did 267 ) 268 .fetch_optional(db) 269 .await 270 .map_err(|e| { 271 error!("Failed to fetch user key: {:?}", e); 272 ( 273 StatusCode::INTERNAL_SERVER_ERROR, 274 Json(json!({"error": "InternalError"})), 275 ) 276 })?; 277 278 if let Some(row) = user_row { 279 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 280 .map_err(|e| { 281 error!("Failed to decrypt user key: {}", e); 282 ( 283 StatusCode::INTERNAL_SERVER_ERROR, 284 Json(json!({"error": "InternalError"})), 285 ) 286 })?; 287 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 288 error!("Failed to create signing key: {:?}", e); 289 ( 290 StatusCode::INTERNAL_SERVER_ERROR, 291 Json(json!({"error": "InternalError"})), 292 ) 293 })?; 294 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 295 296 if doc_signing_key != Some(&expected_did_key) { 297 warn!( 298 "DID {} has signing key {:?}, expected {}", 299 did, doc_signing_key, expected_did_key 300 ); 301 return Err(( 302 StatusCode::BAD_REQUEST, 303 Json(json!({ 304 "error": "InvalidRequest", 305 "message": "DID document verification method does not match expected signing key" 306 })), 307 )); 308 } 309 } 310 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 311 let client = reqwest::Client::new(); 312 let decoded = host_and_path.replace("%3A", ":"); 313 let parts: Vec<&str> = decoded.split(':').collect(); 314 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 315 { 316 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 317 } else { 318 (parts[0].to_string(), parts[1..].to_vec()) 319 }; 320 let scheme = 321 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 322 "http" 323 } else { 324 "https" 325 }; 326 let url = if path_parts.is_empty() { 327 format!("{}://{}/.well-known/did.json", scheme, host) 328 } else { 329 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 330 }; 331 let resp = client.get(&url).send().await.map_err(|e| { 332 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 333 ( 334 StatusCode::BAD_REQUEST, 335 Json(json!({ 336 "error": "InvalidRequest", 337 "message": format!("Could not resolve DID document: {}", e) 338 })), 339 ) 340 })?; 341 let doc: serde_json::Value = resp.json().await.map_err(|e| { 342 warn!("Failed to parse did:web document for {}: {:?}", did, e); 343 ( 344 StatusCode::BAD_REQUEST, 345 Json(json!({ 346 "error": "InvalidRequest", 347 "message": format!("Could not parse DID document: {}", e) 348 })), 349 ) 350 })?; 351 352 let pds_endpoint = doc 353 .get("service") 354 .and_then(|s| s.as_array()) 355 .and_then(|arr| { 356 arr.iter().find(|svc| { 357 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 358 || svc.get("type").and_then(|t| t.as_str()) 359 == Some("AtprotoPersonalDataServer") 360 }) 361 }) 362 .and_then(|svc| svc.get("serviceEndpoint")) 363 .and_then(|e| e.as_str()); 364 365 if pds_endpoint != Some(&expected_endpoint) { 366 warn!( 367 "DID {} has endpoint {:?}, expected {}", 368 did, pds_endpoint, expected_endpoint 369 ); 370 return Err(( 371 StatusCode::BAD_REQUEST, 372 Json(json!({ 373 "error": "InvalidRequest", 374 "message": "DID document atproto_pds service endpoint does not match PDS public url" 375 })), 376 )); 377 } 378 } 379 380 Ok(()) 381} 382 383pub async fn activate_account( 384 State(state): State<AppState>, 385 headers: axum::http::HeaderMap, 386) -> Response { 387 info!("[MIGRATION] activateAccount called"); 388 let extracted = match crate::auth::extract_auth_token_from_header( 389 headers.get("Authorization").and_then(|h| h.to_str().ok()), 390 ) { 391 Some(t) => t, 392 None => { 393 info!("[MIGRATION] activateAccount: No auth token"); 394 return ApiError::AuthenticationRequired.into_response(); 395 } 396 }; 397 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 398 let http_uri = format!( 399 "https://{}/xrpc/com.atproto.server.activateAccount", 400 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 401 ); 402 let auth_user = match crate::auth::validate_token_with_dpop( 403 &state.db, 404 &extracted.token, 405 extracted.is_dpop, 406 dpop_proof, 407 "POST", 408 &http_uri, 409 true, 410 ) 411 .await 412 { 413 Ok(user) => user, 414 Err(e) => { 415 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 416 return ApiError::from(e).into_response(); 417 } 418 }; 419 info!( 420 "[MIGRATION] activateAccount: Authenticated user did={}", 421 auth_user.did 422 ); 423 424 if let Err(e) = crate::auth::scope_check::check_account_scope( 425 auth_user.is_oauth, 426 auth_user.scope.as_deref(), 427 crate::oauth::scopes::AccountAttr::Repo, 428 crate::oauth::scopes::AccountAction::Manage, 429 ) { 430 info!("[MIGRATION] activateAccount: Scope check failed"); 431 return e; 432 } 433 434 let did = auth_user.did; 435 436 info!( 437 "[MIGRATION] activateAccount: Validating DID document for did={}", 438 did 439 ); 440 let did_validation_start = std::time::Instant::now(); 441 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await 442 { 443 info!( 444 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 445 did, 446 did_validation_start.elapsed() 447 ); 448 return (status, json).into_response(); 449 } 450 info!( 451 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 452 did, 453 did_validation_start.elapsed() 454 ); 455 456 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 457 .fetch_optional(&state.db) 458 .await 459 .ok() 460 .flatten(); 461 info!( 462 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 463 did, handle 464 ); 465 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 466 .execute(&state.db) 467 .await; 468 match result { 469 Ok(_) => { 470 info!( 471 "[MIGRATION] activateAccount: DB update success for did={}", 472 did 473 ); 474 if let Some(ref h) = handle { 475 let _ = state.cache.delete(&format!("handle:{}", h)).await; 476 } 477 info!( 478 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 479 did 480 ); 481 if let Err(e) = 482 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 483 { 484 warn!( 485 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 486 e 487 ); 488 } else { 489 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 490 } 491 info!( 492 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 493 did, handle 494 ); 495 if let Err(e) = 496 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 497 .await 498 { 499 warn!( 500 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 501 e 502 ); 503 } else { 504 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 505 } 506 let repo_root = sqlx::query_scalar!( 507 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 508 did 509 ) 510 .fetch_optional(&state.db) 511 .await 512 .ok() 513 .flatten(); 514 if let Some(root_cid) = repo_root { 515 info!( 516 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 517 did, root_cid 518 ); 519 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 520 if let Ok(Some(block)) = state.block_store.get(&cid).await { 521 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 522 } else { 523 None 524 } 525 } else { 526 None 527 }; 528 if let Err(e) = crate::api::repo::record::sequence_sync_event( 529 &state, 530 &did, 531 &root_cid, 532 rev.as_deref(), 533 ) 534 .await 535 { 536 warn!( 537 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 538 e 539 ); 540 } else { 541 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 542 } 543 } else { 544 warn!( 545 "[MIGRATION] activateAccount: No repo root found for did={}", 546 did 547 ); 548 } 549 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 550 (StatusCode::OK, Json(json!({}))).into_response() 551 } 552 Err(e) => { 553 error!( 554 "[MIGRATION] activateAccount: DB error activating account: {:?}", 555 e 556 ); 557 ( 558 StatusCode::INTERNAL_SERVER_ERROR, 559 Json(json!({"error": "InternalError"})), 560 ) 561 .into_response() 562 } 563 } 564} 565 566#[derive(Deserialize)] 567#[serde(rename_all = "camelCase")] 568pub struct DeactivateAccountInput { 569 pub delete_after: Option<String>, 570 pub migrating_to: Option<String>, 571} 572 573pub async fn deactivate_account( 574 State(state): State<AppState>, 575 headers: axum::http::HeaderMap, 576 Json(input): Json<DeactivateAccountInput>, 577) -> Response { 578 let extracted = match crate::auth::extract_auth_token_from_header( 579 headers.get("Authorization").and_then(|h| h.to_str().ok()), 580 ) { 581 Some(t) => t, 582 None => return ApiError::AuthenticationRequired.into_response(), 583 }; 584 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 585 let http_uri = format!( 586 "https://{}/xrpc/com.atproto.server.deactivateAccount", 587 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 588 ); 589 let auth_user = match crate::auth::validate_token_with_dpop( 590 &state.db, 591 &extracted.token, 592 extracted.is_dpop, 593 dpop_proof, 594 "POST", 595 &http_uri, 596 false, 597 ) 598 .await 599 { 600 Ok(user) => user, 601 Err(e) => return ApiError::from(e).into_response(), 602 }; 603 604 if let Err(e) = crate::auth::scope_check::check_account_scope( 605 auth_user.is_oauth, 606 auth_user.scope.as_deref(), 607 crate::oauth::scopes::AccountAttr::Repo, 608 crate::oauth::scopes::AccountAction::Manage, 609 ) { 610 return e; 611 } 612 613 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 614 .delete_after 615 .as_ref() 616 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 617 .map(|dt| dt.with_timezone(&chrono::Utc)); 618 619 let did = auth_user.did; 620 621 let migrating_to = if let Some(ref url) = input.migrating_to { 622 let url = url.trim().trim_end_matches('/'); 623 if url.is_empty() || !did.starts_with("did:web:") { 624 None 625 } else { 626 if !url.starts_with("https://") { 627 return ApiError::InvalidRequest("migratingTo must start with https://".into()) 628 .into_response(); 629 } 630 Some(url.to_string()) 631 } 632 } else { 633 None 634 }; 635 636 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 637 .fetch_optional(&state.db) 638 .await 639 .ok() 640 .flatten(); 641 642 let result = if let Some(ref pds_url) = migrating_to { 643 sqlx::query!( 644 "UPDATE users SET deactivated_at = NOW(), delete_after = $2, migrated_to_pds = $3, migrated_at = NOW() WHERE did = $1", 645 did, 646 delete_after, 647 pds_url 648 ) 649 .execute(&state.db) 650 .await 651 } else { 652 sqlx::query!( 653 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 654 did, 655 delete_after 656 ) 657 .execute(&state.db) 658 .await 659 }; 660 661 let status = if migrating_to.is_some() { 662 "migrated" 663 } else { 664 "deactivated" 665 }; 666 667 match result { 668 Ok(_) => { 669 if let Some(ref h) = handle { 670 let _ = state.cache.delete(&format!("handle:{}", h)).await; 671 } 672 if let Err(e) = 673 crate::api::repo::record::sequence_account_event(&state, &did, false, Some(status)) 674 .await 675 { 676 warn!("Failed to sequence account {} event: {}", status, e); 677 } 678 (StatusCode::OK, Json(json!({}))).into_response() 679 } 680 Err(e) => { 681 error!("DB error deactivating account: {:?}", e); 682 ( 683 StatusCode::INTERNAL_SERVER_ERROR, 684 Json(json!({"error": "InternalError"})), 685 ) 686 .into_response() 687 } 688 } 689} 690 691pub async fn request_account_delete( 692 State(state): State<AppState>, 693 headers: axum::http::HeaderMap, 694) -> Response { 695 let extracted = match crate::auth::extract_auth_token_from_header( 696 headers.get("Authorization").and_then(|h| h.to_str().ok()), 697 ) { 698 Some(t) => t, 699 None => return ApiError::AuthenticationRequired.into_response(), 700 }; 701 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 702 let http_uri = format!( 703 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 704 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 705 ); 706 let validated = match crate::auth::validate_token_with_dpop( 707 &state.db, 708 &extracted.token, 709 extracted.is_dpop, 710 dpop_proof, 711 "POST", 712 &http_uri, 713 true, 714 ) 715 .await 716 { 717 Ok(user) => user, 718 Err(e) => return ApiError::from(e).into_response(), 719 }; 720 let did = validated.did.clone(); 721 722 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await { 723 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await; 724 } 725 726 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 727 .fetch_optional(&state.db) 728 .await 729 { 730 Ok(Some(id)) => id, 731 _ => { 732 return ( 733 StatusCode::INTERNAL_SERVER_ERROR, 734 Json(json!({"error": "InternalError"})), 735 ) 736 .into_response(); 737 } 738 }; 739 let confirmation_token = Uuid::new_v4().to_string(); 740 let expires_at = Utc::now() + Duration::minutes(15); 741 let insert = sqlx::query!( 742 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 743 confirmation_token, 744 did, 745 expires_at 746 ) 747 .execute(&state.db) 748 .await; 749 if let Err(e) = insert { 750 error!("DB error creating deletion token: {:?}", e); 751 return ( 752 StatusCode::INTERNAL_SERVER_ERROR, 753 Json(json!({"error": "InternalError"})), 754 ) 755 .into_response(); 756 } 757 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 758 if let Err(e) = 759 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 760 .await 761 { 762 warn!("Failed to enqueue account deletion notification: {:?}", e); 763 } 764 info!("Account deletion requested for user {}", did); 765 (StatusCode::OK, Json(json!({}))).into_response() 766} 767 768#[derive(Deserialize)] 769pub struct DeleteAccountInput { 770 pub did: String, 771 pub password: String, 772 pub token: String, 773} 774 775pub async fn delete_account( 776 State(state): State<AppState>, 777 Json(input): Json<DeleteAccountInput>, 778) -> Response { 779 let did = input.did.trim(); 780 let password = &input.password; 781 let token = input.token.trim(); 782 if did.is_empty() { 783 return ( 784 StatusCode::BAD_REQUEST, 785 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 786 ) 787 .into_response(); 788 } 789 if password.is_empty() { 790 return ( 791 StatusCode::BAD_REQUEST, 792 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 793 ) 794 .into_response(); 795 } 796 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 797 if password.len() > OLD_PASSWORD_MAX_LENGTH { 798 return ( 799 StatusCode::BAD_REQUEST, 800 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})), 801 ) 802 .into_response(); 803 } 804 if token.is_empty() { 805 return ( 806 StatusCode::BAD_REQUEST, 807 Json(json!({"error": "InvalidToken", "message": "token is required"})), 808 ) 809 .into_response(); 810 } 811 let user = sqlx::query!( 812 "SELECT id, password_hash, handle FROM users WHERE did = $1", 813 did 814 ) 815 .fetch_optional(&state.db) 816 .await; 817 let (user_id, password_hash, handle) = match user { 818 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 819 Ok(None) => { 820 return ( 821 StatusCode::BAD_REQUEST, 822 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 823 ) 824 .into_response(); 825 } 826 Err(e) => { 827 error!("DB error in delete_account: {:?}", e); 828 return ( 829 StatusCode::INTERNAL_SERVER_ERROR, 830 Json(json!({"error": "InternalError"})), 831 ) 832 .into_response(); 833 } 834 }; 835 let password_valid = if password_hash 836 .as_ref() 837 .map(|h| verify(password, h).unwrap_or(false)) 838 .unwrap_or(false) 839 { 840 true 841 } else { 842 let app_pass_rows = sqlx::query!( 843 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 844 user_id 845 ) 846 .fetch_all(&state.db) 847 .await 848 .unwrap_or_default(); 849 app_pass_rows 850 .iter() 851 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 852 }; 853 if !password_valid { 854 return ( 855 StatusCode::UNAUTHORIZED, 856 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 857 ) 858 .into_response(); 859 } 860 let deletion_request = sqlx::query!( 861 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 862 token 863 ) 864 .fetch_optional(&state.db) 865 .await; 866 let (token_did, expires_at) = match deletion_request { 867 Ok(Some(row)) => (row.did, row.expires_at), 868 Ok(None) => { 869 return ( 870 StatusCode::BAD_REQUEST, 871 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 872 ) 873 .into_response(); 874 } 875 Err(e) => { 876 error!("DB error fetching deletion token: {:?}", e); 877 return ( 878 StatusCode::INTERNAL_SERVER_ERROR, 879 Json(json!({"error": "InternalError"})), 880 ) 881 .into_response(); 882 } 883 }; 884 if token_did != did { 885 return ( 886 StatusCode::BAD_REQUEST, 887 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 888 ) 889 .into_response(); 890 } 891 if Utc::now() > expires_at { 892 let _ = sqlx::query!( 893 "DELETE FROM account_deletion_requests WHERE token = $1", 894 token 895 ) 896 .execute(&state.db) 897 .await; 898 return ( 899 StatusCode::BAD_REQUEST, 900 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 901 ) 902 .into_response(); 903 } 904 let mut tx = match state.db.begin().await { 905 Ok(tx) => tx, 906 Err(e) => { 907 error!("Failed to begin transaction: {:?}", e); 908 return ( 909 StatusCode::INTERNAL_SERVER_ERROR, 910 Json(json!({"error": "InternalError"})), 911 ) 912 .into_response(); 913 } 914 }; 915 let deletion_result: Result<(), sqlx::Error> = async { 916 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 917 .execute(&mut *tx) 918 .await?; 919 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 920 .execute(&mut *tx) 921 .await?; 922 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 923 .execute(&mut *tx) 924 .await?; 925 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 926 .execute(&mut *tx) 927 .await?; 928 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 929 .execute(&mut *tx) 930 .await?; 931 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 932 .execute(&mut *tx) 933 .await?; 934 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 935 .execute(&mut *tx) 936 .await?; 937 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 938 .execute(&mut *tx) 939 .await?; 940 Ok(()) 941 } 942 .await; 943 match deletion_result { 944 Ok(()) => { 945 if let Err(e) = tx.commit().await { 946 error!("Failed to commit account deletion transaction: {:?}", e); 947 return ( 948 StatusCode::INTERNAL_SERVER_ERROR, 949 Json(json!({"error": "InternalError"})), 950 ) 951 .into_response(); 952 } 953 let account_seq = crate::api::repo::record::sequence_account_event( 954 &state, 955 did, 956 false, 957 Some("deleted"), 958 ) 959 .await; 960 match account_seq { 961 Ok(seq) => { 962 if let Err(e) = sqlx::query!( 963 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 964 did, 965 seq 966 ) 967 .execute(&state.db) 968 .await 969 { 970 warn!( 971 "Failed to cleanup sequences for deleted account {}: {}", 972 did, e 973 ); 974 } 975 } 976 Err(e) => { 977 warn!( 978 "Failed to sequence account deletion event for {}: {}", 979 did, e 980 ); 981 } 982 } 983 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 984 info!("Account {} deleted successfully", did); 985 (StatusCode::OK, Json(json!({}))).into_response() 986 } 987 Err(e) => { 988 error!("DB error deleting account, rolling back: {:?}", e); 989 ( 990 StatusCode::INTERNAL_SERVER_ERROR, 991 Json(json!({"error": "InternalError"})), 992 ) 993 .into_response() 994 } 995 } 996}