this repo has no description
1use crate::api::ApiError; 2use crate::plc::PlcClient; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::{Duration, Utc}; 12use cid::Cid; 13use jacquard_repo::commit::Commit; 14use jacquard_repo::storage::BlockStore; 15use k256::ecdsa::SigningKey; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::str::FromStr; 19use tracing::{error, info, warn}; 20use uuid::Uuid; 21 22#[derive(Serialize)] 23#[serde(rename_all = "camelCase")] 24pub struct CheckAccountStatusOutput { 25 pub activated: bool, 26 pub valid_did: bool, 27 pub repo_commit: String, 28 pub repo_rev: String, 29 pub repo_blocks: i64, 30 pub indexed_records: i64, 31 pub private_state_values: i64, 32 pub expected_blobs: i64, 33 pub imported_blobs: i64, 34} 35 36pub async fn check_account_status( 37 State(state): State<AppState>, 38 headers: axum::http::HeaderMap, 39) -> Response { 40 let extracted = match crate::auth::extract_auth_token_from_header( 41 headers.get("Authorization").and_then(|h| h.to_str().ok()), 42 ) { 43 Some(t) => t, 44 None => return ApiError::AuthenticationRequired.into_response(), 45 }; 46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 47 let http_uri = format!( 48 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 50 ); 51 let did = match crate::auth::validate_token_with_dpop( 52 &state.db, 53 &extracted.token, 54 extracted.is_dpop, 55 dpop_proof, 56 "GET", 57 &http_uri, 58 true, 59 ) 60 .await 61 { 62 Ok(user) => user.did, 63 Err(e) => return ApiError::from(e).into_response(), 64 }; 65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 66 .fetch_optional(&state.db) 67 .await 68 { 69 Ok(Some(id)) => id, 70 _ => { 71 return ( 72 StatusCode::INTERNAL_SERVER_ERROR, 73 Json(json!({"error": "InternalError"})), 74 ) 75 .into_response(); 76 } 77 }; 78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 79 .fetch_optional(&state.db) 80 .await; 81 let deactivated_at = match user_status { 82 Ok(Some(row)) => row.deactivated_at, 83 _ => None, 84 }; 85 let repo_result = sqlx::query!( 86 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 87 user_id 88 ) 89 .fetch_optional(&state.db) 90 .await; 91 let (repo_commit, repo_rev_from_db) = match repo_result { 92 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 93 _ => (String::new(), None), 94 }; 95 let block_count: i64 = 96 sqlx::query_scalar!("SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", user_id) 97 .fetch_one(&state.db) 98 .await 99 .unwrap_or(Some(0)) 100 .unwrap_or(0); 101 let repo_rev = if let Some(rev) = repo_rev_from_db { 102 rev 103 } else if !repo_commit.is_empty() { 104 if let Ok(cid) = Cid::from_str(&repo_commit) { 105 if let Ok(Some(block)) = state.block_store.get(&cid).await { 106 Commit::from_cbor(&block) 107 .ok() 108 .map(|c| c.rev().to_string()) 109 .unwrap_or_default() 110 } else { 111 String::new() 112 } 113 } else { 114 String::new() 115 } 116 } else { 117 String::new() 118 }; 119 let record_count: i64 = 120 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 121 .fetch_one(&state.db) 122 .await 123 .unwrap_or(Some(0)) 124 .unwrap_or(0); 125 let imported_blobs: i64 = sqlx::query_scalar!( 126 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 127 user_id 128 ) 129 .fetch_one(&state.db) 130 .await 131 .unwrap_or(Some(0)) 132 .unwrap_or(0); 133 let expected_blobs: i64 = sqlx::query_scalar!( 134 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1", 135 user_id 136 ) 137 .fetch_one(&state.db) 138 .await 139 .unwrap_or(Some(0)) 140 .unwrap_or(0); 141 let valid_did = is_valid_did_for_service(&state.db, &did).await; 142 ( 143 StatusCode::OK, 144 Json(CheckAccountStatusOutput { 145 activated: deactivated_at.is_none(), 146 valid_did, 147 repo_commit: repo_commit.clone(), 148 repo_rev, 149 repo_blocks: block_count as i64, 150 indexed_records: record_count, 151 private_state_values: 0, 152 expected_blobs, 153 imported_blobs, 154 }), 155 ) 156 .into_response() 157} 158 159async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool { 160 assert_valid_did_document_for_service(db, did, false) 161 .await 162 .is_ok() 163} 164 165async fn assert_valid_did_document_for_service( 166 db: &sqlx::PgPool, 167 did: &str, 168 with_retry: bool, 169) -> Result<(), (StatusCode, Json<serde_json::Value>)> { 170 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 171 let expected_endpoint = format!("https://{}", hostname); 172 173 if did.starts_with("did:plc:") { 174 let plc_client = PlcClient::new(None); 175 176 let max_attempts = if with_retry { 5 } else { 1 }; 177 let mut last_error = None; 178 let mut doc_data = None; 179 for attempt in 0..max_attempts { 180 if attempt > 0 { 181 let delay_ms = 500 * (1 << (attempt - 1)); 182 info!( 183 "Waiting {}ms before retry {} for DID document validation ({})", 184 delay_ms, attempt, did 185 ); 186 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 187 } 188 189 match plc_client.get_document_data(did).await { 190 Ok(data) => { 191 let pds_endpoint = data 192 .get("services") 193 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 194 .and_then(|p| p.get("endpoint")) 195 .and_then(|e| e.as_str()); 196 197 if pds_endpoint == Some(&expected_endpoint) { 198 doc_data = Some(data); 199 break; 200 } else { 201 info!( 202 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 203 attempt + 1, 204 did, 205 pds_endpoint, 206 expected_endpoint 207 ); 208 last_error = Some(format!( 209 "DID document endpoint {:?} does not match expected {}", 210 pds_endpoint, expected_endpoint 211 )); 212 } 213 } 214 Err(e) => { 215 warn!( 216 "Attempt {}: Failed to fetch PLC document for {}: {:?}", 217 attempt + 1, 218 did, 219 e 220 ); 221 last_error = Some(format!("Could not resolve DID document: {}", e)); 222 } 223 } 224 } 225 226 let doc_data = match doc_data { 227 Some(d) => d, 228 None => { 229 return Err(( 230 StatusCode::BAD_REQUEST, 231 Json(json!({ 232 "error": "InvalidRequest", 233 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string()) 234 })), 235 )); 236 } 237 }; 238 239 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 240 if let Some(ref expected_rotation_key) = server_rotation_key { 241 let rotation_keys = doc_data 242 .get("rotationKeys") 243 .and_then(|v| v.as_array()) 244 .map(|arr| { 245 arr.iter() 246 .filter_map(|k| k.as_str()) 247 .collect::<Vec<_>>() 248 }) 249 .unwrap_or_default(); 250 if !rotation_keys.contains(&expected_rotation_key.as_str()) { 251 return Err(( 252 StatusCode::BAD_REQUEST, 253 Json(json!({ 254 "error": "InvalidRequest", 255 "message": "Server rotation key not included in PLC DID data" 256 })), 257 )); 258 } 259 } 260 261 let doc_signing_key = doc_data 262 .get("verificationMethods") 263 .and_then(|v| v.get("atproto")) 264 .and_then(|k| k.as_str()); 265 266 let user_row = sqlx::query!( 267 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1", 268 did 269 ) 270 .fetch_optional(db) 271 .await 272 .map_err(|e| { 273 error!("Failed to fetch user key: {:?}", e); 274 ( 275 StatusCode::INTERNAL_SERVER_ERROR, 276 Json(json!({"error": "InternalError"})), 277 ) 278 })?; 279 280 if let Some(row) = user_row { 281 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) 282 .map_err(|e| { 283 error!("Failed to decrypt user key: {}", e); 284 ( 285 StatusCode::INTERNAL_SERVER_ERROR, 286 Json(json!({"error": "InternalError"})), 287 ) 288 })?; 289 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| { 290 error!("Failed to create signing key: {:?}", e); 291 ( 292 StatusCode::INTERNAL_SERVER_ERROR, 293 Json(json!({"error": "InternalError"})), 294 ) 295 })?; 296 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key); 297 298 if doc_signing_key != Some(&expected_did_key) { 299 warn!( 300 "DID {} has signing key {:?}, expected {}", 301 did, doc_signing_key, expected_did_key 302 ); 303 return Err(( 304 StatusCode::BAD_REQUEST, 305 Json(json!({ 306 "error": "InvalidRequest", 307 "message": "DID document verification method does not match expected signing key" 308 })), 309 )); 310 } 311 } 312 } else if let Some(host_and_path) = did.strip_prefix("did:web:") { 313 let client = reqwest::Client::new(); 314 let decoded = host_and_path.replace("%3A", ":"); 315 let parts: Vec<&str> = decoded.split(':').collect(); 316 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) 317 { 318 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) 319 } else { 320 (parts[0].to_string(), parts[1..].to_vec()) 321 }; 322 let scheme = 323 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { 324 "http" 325 } else { 326 "https" 327 }; 328 let url = if path_parts.is_empty() { 329 format!("{}://{}/.well-known/did.json", scheme, host) 330 } else { 331 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) 332 }; 333 let resp = client.get(&url).send().await.map_err(|e| { 334 warn!("Failed to fetch did:web document for {}: {:?}", did, e); 335 ( 336 StatusCode::BAD_REQUEST, 337 Json(json!({ 338 "error": "InvalidRequest", 339 "message": format!("Could not resolve DID document: {}", e) 340 })), 341 ) 342 })?; 343 let doc: serde_json::Value = resp.json().await.map_err(|e| { 344 warn!("Failed to parse did:web document for {}: {:?}", did, e); 345 ( 346 StatusCode::BAD_REQUEST, 347 Json(json!({ 348 "error": "InvalidRequest", 349 "message": format!("Could not parse DID document: {}", e) 350 })), 351 ) 352 })?; 353 354 let pds_endpoint = doc 355 .get("service") 356 .and_then(|s| s.as_array()) 357 .and_then(|arr| { 358 arr.iter().find(|svc| { 359 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds") 360 || svc.get("type").and_then(|t| t.as_str()) 361 == Some("AtprotoPersonalDataServer") 362 }) 363 }) 364 .and_then(|svc| svc.get("serviceEndpoint")) 365 .and_then(|e| e.as_str()); 366 367 if pds_endpoint != Some(&expected_endpoint) { 368 warn!( 369 "DID {} has endpoint {:?}, expected {}", 370 did, pds_endpoint, expected_endpoint 371 ); 372 return Err(( 373 StatusCode::BAD_REQUEST, 374 Json(json!({ 375 "error": "InvalidRequest", 376 "message": "DID document atproto_pds service endpoint does not match PDS public url" 377 })), 378 )); 379 } 380 } 381 382 Ok(()) 383} 384 385pub async fn activate_account( 386 State(state): State<AppState>, 387 headers: axum::http::HeaderMap, 388) -> Response { 389 info!("[MIGRATION] activateAccount called"); 390 let extracted = match crate::auth::extract_auth_token_from_header( 391 headers.get("Authorization").and_then(|h| h.to_str().ok()), 392 ) { 393 Some(t) => t, 394 None => { 395 info!("[MIGRATION] activateAccount: No auth token"); 396 return ApiError::AuthenticationRequired.into_response(); 397 } 398 }; 399 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 400 let http_uri = format!( 401 "https://{}/xrpc/com.atproto.server.activateAccount", 402 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 403 ); 404 let auth_user = match crate::auth::validate_token_with_dpop( 405 &state.db, 406 &extracted.token, 407 extracted.is_dpop, 408 dpop_proof, 409 "POST", 410 &http_uri, 411 true, 412 ) 413 .await 414 { 415 Ok(user) => user, 416 Err(e) => { 417 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e); 418 return ApiError::from(e).into_response(); 419 } 420 }; 421 info!( 422 "[MIGRATION] activateAccount: Authenticated user did={}", 423 auth_user.did 424 ); 425 426 if let Err(e) = crate::auth::scope_check::check_account_scope( 427 auth_user.is_oauth, 428 auth_user.scope.as_deref(), 429 crate::oauth::scopes::AccountAttr::Repo, 430 crate::oauth::scopes::AccountAction::Manage, 431 ) { 432 info!("[MIGRATION] activateAccount: Scope check failed"); 433 return e; 434 } 435 436 let did = auth_user.did; 437 438 info!( 439 "[MIGRATION] activateAccount: Validating DID document for did={}", 440 did 441 ); 442 let did_validation_start = std::time::Instant::now(); 443 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await { 444 info!( 445 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 446 did, 447 did_validation_start.elapsed() 448 ); 449 return (status, json).into_response(); 450 } 451 info!( 452 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})", 453 did, 454 did_validation_start.elapsed() 455 ); 456 457 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 458 .fetch_optional(&state.db) 459 .await 460 .ok() 461 .flatten(); 462 info!( 463 "[MIGRATION] activateAccount: Activating account did={} handle={:?}", 464 did, handle 465 ); 466 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 467 .execute(&state.db) 468 .await; 469 match result { 470 Ok(_) => { 471 info!( 472 "[MIGRATION] activateAccount: DB update success for did={}", 473 did 474 ); 475 if let Some(ref h) = handle { 476 let _ = state.cache.delete(&format!("handle:{}", h)).await; 477 } 478 info!( 479 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}", 480 did 481 ); 482 if let Err(e) = 483 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 484 { 485 warn!( 486 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}", 487 e 488 ); 489 } else { 490 info!("[MIGRATION] activateAccount: Account event sequenced successfully"); 491 } 492 info!( 493 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}", 494 did, handle 495 ); 496 if let Err(e) = 497 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 498 .await 499 { 500 warn!( 501 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}", 502 e 503 ); 504 } else { 505 info!("[MIGRATION] activateAccount: Identity event sequenced successfully"); 506 } 507 let repo_root = sqlx::query_scalar!( 508 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 509 did 510 ) 511 .fetch_optional(&state.db) 512 .await 513 .ok() 514 .flatten(); 515 if let Some(root_cid) = repo_root { 516 info!( 517 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}", 518 did, root_cid 519 ); 520 let rev = if let Ok(cid) = Cid::from_str(&root_cid) { 521 if let Ok(Some(block)) = state.block_store.get(&cid).await { 522 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) 523 } else { 524 None 525 } 526 } else { 527 None 528 }; 529 if let Err(e) = crate::api::repo::record::sequence_sync_event( 530 &state, 531 &did, 532 &root_cid, 533 rev.as_deref(), 534 ) 535 .await 536 { 537 warn!( 538 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}", 539 e 540 ); 541 } else { 542 info!("[MIGRATION] activateAccount: Sync event sequenced successfully"); 543 } 544 } else { 545 warn!( 546 "[MIGRATION] activateAccount: No repo root found for did={}", 547 did 548 ); 549 } 550 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did); 551 (StatusCode::OK, Json(json!({}))).into_response() 552 } 553 Err(e) => { 554 error!( 555 "[MIGRATION] activateAccount: DB error activating account: {:?}", 556 e 557 ); 558 ( 559 StatusCode::INTERNAL_SERVER_ERROR, 560 Json(json!({"error": "InternalError"})), 561 ) 562 .into_response() 563 } 564 } 565} 566 567#[derive(Deserialize)] 568#[serde(rename_all = "camelCase")] 569pub struct DeactivateAccountInput { 570 pub delete_after: Option<String>, 571 pub migrating_to: Option<String>, 572} 573 574pub async fn deactivate_account( 575 State(state): State<AppState>, 576 headers: axum::http::HeaderMap, 577 Json(input): Json<DeactivateAccountInput>, 578) -> Response { 579 let extracted = match crate::auth::extract_auth_token_from_header( 580 headers.get("Authorization").and_then(|h| h.to_str().ok()), 581 ) { 582 Some(t) => t, 583 None => return ApiError::AuthenticationRequired.into_response(), 584 }; 585 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 586 let http_uri = format!( 587 "https://{}/xrpc/com.atproto.server.deactivateAccount", 588 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 589 ); 590 let auth_user = match crate::auth::validate_token_with_dpop( 591 &state.db, 592 &extracted.token, 593 extracted.is_dpop, 594 dpop_proof, 595 "POST", 596 &http_uri, 597 false, 598 ) 599 .await 600 { 601 Ok(user) => user, 602 Err(e) => return ApiError::from(e).into_response(), 603 }; 604 605 if let Err(e) = crate::auth::scope_check::check_account_scope( 606 auth_user.is_oauth, 607 auth_user.scope.as_deref(), 608 crate::oauth::scopes::AccountAttr::Repo, 609 crate::oauth::scopes::AccountAction::Manage, 610 ) { 611 return e; 612 } 613 614 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 615 .delete_after 616 .as_ref() 617 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 618 .map(|dt| dt.with_timezone(&chrono::Utc)); 619 620 let did = auth_user.did; 621 622 let migrating_to = if let Some(ref url) = input.migrating_to { 623 let url = url.trim().trim_end_matches('/'); 624 if url.is_empty() || !did.starts_with("did:web:") { 625 None 626 } else { 627 if !url.starts_with("https://") { 628 return ApiError::InvalidRequest("migratingTo must start with https://".into()) 629 .into_response(); 630 } 631 Some(url.to_string()) 632 } 633 } else { 634 None 635 }; 636 637 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 638 .fetch_optional(&state.db) 639 .await 640 .ok() 641 .flatten(); 642 643 let result = if let Some(ref pds_url) = migrating_to { 644 sqlx::query!( 645 "UPDATE users SET deactivated_at = NOW(), delete_after = $2, migrated_to_pds = $3, migrated_at = NOW() WHERE did = $1", 646 did, 647 delete_after, 648 pds_url 649 ) 650 .execute(&state.db) 651 .await 652 } else { 653 sqlx::query!( 654 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 655 did, 656 delete_after 657 ) 658 .execute(&state.db) 659 .await 660 }; 661 662 let status = if migrating_to.is_some() { 663 "migrated" 664 } else { 665 "deactivated" 666 }; 667 668 match result { 669 Ok(_) => { 670 if let Some(ref h) = handle { 671 let _ = state.cache.delete(&format!("handle:{}", h)).await; 672 } 673 if let Err(e) = 674 crate::api::repo::record::sequence_account_event(&state, &did, false, Some(status)) 675 .await 676 { 677 warn!("Failed to sequence account {} event: {}", status, e); 678 } 679 (StatusCode::OK, Json(json!({}))).into_response() 680 } 681 Err(e) => { 682 error!("DB error deactivating account: {:?}", e); 683 ( 684 StatusCode::INTERNAL_SERVER_ERROR, 685 Json(json!({"error": "InternalError"})), 686 ) 687 .into_response() 688 } 689 } 690} 691 692pub async fn request_account_delete( 693 State(state): State<AppState>, 694 headers: axum::http::HeaderMap, 695) -> Response { 696 let extracted = match crate::auth::extract_auth_token_from_header( 697 headers.get("Authorization").and_then(|h| h.to_str().ok()), 698 ) { 699 Some(t) => t, 700 None => return ApiError::AuthenticationRequired.into_response(), 701 }; 702 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 703 let http_uri = format!( 704 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 705 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 706 ); 707 let validated = match crate::auth::validate_token_with_dpop( 708 &state.db, 709 &extracted.token, 710 extracted.is_dpop, 711 dpop_proof, 712 "POST", 713 &http_uri, 714 true, 715 ) 716 .await 717 { 718 Ok(user) => user, 719 Err(e) => return ApiError::from(e).into_response(), 720 }; 721 let did = validated.did.clone(); 722 723 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await { 724 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await; 725 } 726 727 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 728 .fetch_optional(&state.db) 729 .await 730 { 731 Ok(Some(id)) => id, 732 _ => { 733 return ( 734 StatusCode::INTERNAL_SERVER_ERROR, 735 Json(json!({"error": "InternalError"})), 736 ) 737 .into_response(); 738 } 739 }; 740 let confirmation_token = Uuid::new_v4().to_string(); 741 let expires_at = Utc::now() + Duration::minutes(15); 742 let insert = sqlx::query!( 743 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 744 confirmation_token, 745 did, 746 expires_at 747 ) 748 .execute(&state.db) 749 .await; 750 if let Err(e) = insert { 751 error!("DB error creating deletion token: {:?}", e); 752 return ( 753 StatusCode::INTERNAL_SERVER_ERROR, 754 Json(json!({"error": "InternalError"})), 755 ) 756 .into_response(); 757 } 758 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 759 if let Err(e) = 760 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 761 .await 762 { 763 warn!("Failed to enqueue account deletion notification: {:?}", e); 764 } 765 info!("Account deletion requested for user {}", did); 766 (StatusCode::OK, Json(json!({}))).into_response() 767} 768 769#[derive(Deserialize)] 770pub struct DeleteAccountInput { 771 pub did: String, 772 pub password: String, 773 pub token: String, 774} 775 776pub async fn delete_account( 777 State(state): State<AppState>, 778 Json(input): Json<DeleteAccountInput>, 779) -> Response { 780 let did = input.did.trim(); 781 let password = &input.password; 782 let token = input.token.trim(); 783 if did.is_empty() { 784 return ( 785 StatusCode::BAD_REQUEST, 786 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 787 ) 788 .into_response(); 789 } 790 if password.is_empty() { 791 return ( 792 StatusCode::BAD_REQUEST, 793 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 794 ) 795 .into_response(); 796 } 797 const OLD_PASSWORD_MAX_LENGTH: usize = 512; 798 if password.len() > OLD_PASSWORD_MAX_LENGTH { 799 return ( 800 StatusCode::BAD_REQUEST, 801 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})), 802 ) 803 .into_response(); 804 } 805 if token.is_empty() { 806 return ( 807 StatusCode::BAD_REQUEST, 808 Json(json!({"error": "InvalidToken", "message": "token is required"})), 809 ) 810 .into_response(); 811 } 812 let user = sqlx::query!( 813 "SELECT id, password_hash, handle FROM users WHERE did = $1", 814 did 815 ) 816 .fetch_optional(&state.db) 817 .await; 818 let (user_id, password_hash, handle) = match user { 819 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 820 Ok(None) => { 821 return ( 822 StatusCode::BAD_REQUEST, 823 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 824 ) 825 .into_response(); 826 } 827 Err(e) => { 828 error!("DB error in delete_account: {:?}", e); 829 return ( 830 StatusCode::INTERNAL_SERVER_ERROR, 831 Json(json!({"error": "InternalError"})), 832 ) 833 .into_response(); 834 } 835 }; 836 let password_valid = if password_hash 837 .as_ref() 838 .map(|h| verify(password, h).unwrap_or(false)) 839 .unwrap_or(false) 840 { 841 true 842 } else { 843 let app_pass_rows = sqlx::query!( 844 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 845 user_id 846 ) 847 .fetch_all(&state.db) 848 .await 849 .unwrap_or_default(); 850 app_pass_rows 851 .iter() 852 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 853 }; 854 if !password_valid { 855 return ( 856 StatusCode::UNAUTHORIZED, 857 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 858 ) 859 .into_response(); 860 } 861 let deletion_request = sqlx::query!( 862 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 863 token 864 ) 865 .fetch_optional(&state.db) 866 .await; 867 let (token_did, expires_at) = match deletion_request { 868 Ok(Some(row)) => (row.did, row.expires_at), 869 Ok(None) => { 870 return ( 871 StatusCode::BAD_REQUEST, 872 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 873 ) 874 .into_response(); 875 } 876 Err(e) => { 877 error!("DB error fetching deletion token: {:?}", e); 878 return ( 879 StatusCode::INTERNAL_SERVER_ERROR, 880 Json(json!({"error": "InternalError"})), 881 ) 882 .into_response(); 883 } 884 }; 885 if token_did != did { 886 return ( 887 StatusCode::BAD_REQUEST, 888 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 889 ) 890 .into_response(); 891 } 892 if Utc::now() > expires_at { 893 let _ = sqlx::query!( 894 "DELETE FROM account_deletion_requests WHERE token = $1", 895 token 896 ) 897 .execute(&state.db) 898 .await; 899 return ( 900 StatusCode::BAD_REQUEST, 901 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 902 ) 903 .into_response(); 904 } 905 let mut tx = match state.db.begin().await { 906 Ok(tx) => tx, 907 Err(e) => { 908 error!("Failed to begin transaction: {:?}", e); 909 return ( 910 StatusCode::INTERNAL_SERVER_ERROR, 911 Json(json!({"error": "InternalError"})), 912 ) 913 .into_response(); 914 } 915 }; 916 let deletion_result: Result<(), sqlx::Error> = async { 917 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 918 .execute(&mut *tx) 919 .await?; 920 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 921 .execute(&mut *tx) 922 .await?; 923 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 924 .execute(&mut *tx) 925 .await?; 926 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 927 .execute(&mut *tx) 928 .await?; 929 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 930 .execute(&mut *tx) 931 .await?; 932 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 933 .execute(&mut *tx) 934 .await?; 935 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 936 .execute(&mut *tx) 937 .await?; 938 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 939 .execute(&mut *tx) 940 .await?; 941 Ok(()) 942 } 943 .await; 944 match deletion_result { 945 Ok(()) => { 946 if let Err(e) = tx.commit().await { 947 error!("Failed to commit account deletion transaction: {:?}", e); 948 return ( 949 StatusCode::INTERNAL_SERVER_ERROR, 950 Json(json!({"error": "InternalError"})), 951 ) 952 .into_response(); 953 } 954 let account_seq = crate::api::repo::record::sequence_account_event( 955 &state, 956 did, 957 false, 958 Some("deleted"), 959 ) 960 .await; 961 match account_seq { 962 Ok(seq) => { 963 if let Err(e) = sqlx::query!( 964 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 965 did, 966 seq 967 ) 968 .execute(&state.db) 969 .await 970 { 971 warn!( 972 "Failed to cleanup sequences for deleted account {}: {}", 973 did, e 974 ); 975 } 976 } 977 Err(e) => { 978 warn!( 979 "Failed to sequence account deletion event for {}: {}", 980 did, e 981 ); 982 } 983 } 984 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 985 info!("Account {} deleted successfully", did); 986 (StatusCode::OK, Json(json!({}))).into_response() 987 } 988 Err(e) => { 989 error!("DB error deleting account, rolling back: {:?}", e); 990 ( 991 StatusCode::INTERNAL_SERVER_ERROR, 992 Json(json!({"error": "InternalError"})), 993 ) 994 .into_response() 995 } 996 } 997}