this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use bcrypt::verify; 10use chrono::{Duration, Utc}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14use uuid::Uuid; 15 16#[derive(Serialize)] 17#[serde(rename_all = "camelCase")] 18pub struct CheckAccountStatusOutput { 19 pub activated: bool, 20 pub valid_did: bool, 21 pub repo_commit: String, 22 pub repo_rev: String, 23 pub repo_blocks: i64, 24 pub indexed_records: i64, 25 pub private_state_values: i64, 26 pub expected_blobs: i64, 27 pub imported_blobs: i64, 28} 29 30pub async fn check_account_status( 31 State(state): State<AppState>, 32 headers: axum::http::HeaderMap, 33) -> Response { 34 let extracted = match crate::auth::extract_auth_token_from_header( 35 headers.get("Authorization").and_then(|h| h.to_str().ok()), 36 ) { 37 Some(t) => t, 38 None => return ApiError::AuthenticationRequired.into_response(), 39 }; 40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 41 let http_uri = format!( 42 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 43 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 44 ); 45 let did = match crate::auth::validate_token_with_dpop( 46 &state.db, 47 &extracted.token, 48 extracted.is_dpop, 49 dpop_proof, 50 "GET", 51 &http_uri, 52 true, 53 ) 54 .await 55 { 56 Ok(user) => user.did, 57 Err(e) => return ApiError::from(e).into_response(), 58 }; 59 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 60 .fetch_optional(&state.db) 61 .await 62 { 63 Ok(Some(id)) => id, 64 _ => { 65 return ( 66 StatusCode::INTERNAL_SERVER_ERROR, 67 Json(json!({"error": "InternalError"})), 68 ) 69 .into_response(); 70 } 71 }; 72 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 73 .fetch_optional(&state.db) 74 .await; 75 let deactivated_at = match user_status { 76 Ok(Some(row)) => row.deactivated_at, 77 _ => None, 78 }; 79 let repo_result = sqlx::query!( 80 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 81 user_id 82 ) 83 .fetch_optional(&state.db) 84 .await; 85 let repo_commit = match repo_result { 86 Ok(Some(row)) => row.repo_root_cid, 87 _ => String::new(), 88 }; 89 let record_count: i64 = 90 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 91 .fetch_one(&state.db) 92 .await 93 .unwrap_or(Some(0)) 94 .unwrap_or(0); 95 let blob_count: i64 = sqlx::query_scalar!( 96 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 97 user_id 98 ) 99 .fetch_one(&state.db) 100 .await 101 .unwrap_or(Some(0)) 102 .unwrap_or(0); 103 let valid_did = did.starts_with("did:"); 104 ( 105 StatusCode::OK, 106 Json(CheckAccountStatusOutput { 107 activated: deactivated_at.is_none(), 108 valid_did, 109 repo_commit: repo_commit.clone(), 110 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 111 repo_blocks: 0, 112 indexed_records: record_count, 113 private_state_values: 0, 114 expected_blobs: blob_count, 115 imported_blobs: blob_count, 116 }), 117 ) 118 .into_response() 119} 120 121pub async fn activate_account( 122 State(state): State<AppState>, 123 headers: axum::http::HeaderMap, 124) -> Response { 125 let extracted = match crate::auth::extract_auth_token_from_header( 126 headers.get("Authorization").and_then(|h| h.to_str().ok()), 127 ) { 128 Some(t) => t, 129 None => return ApiError::AuthenticationRequired.into_response(), 130 }; 131 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 132 let http_uri = format!( 133 "https://{}/xrpc/com.atproto.server.activateAccount", 134 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 135 ); 136 let auth_user = match crate::auth::validate_token_with_dpop( 137 &state.db, 138 &extracted.token, 139 extracted.is_dpop, 140 dpop_proof, 141 "POST", 142 &http_uri, 143 true, 144 ) 145 .await 146 { 147 Ok(user) => user, 148 Err(e) => return ApiError::from(e).into_response(), 149 }; 150 151 if let Err(e) = crate::auth::scope_check::check_account_scope( 152 auth_user.is_oauth, 153 auth_user.scope.as_deref(), 154 crate::oauth::scopes::AccountAttr::Repo, 155 crate::oauth::scopes::AccountAction::Manage, 156 ) { 157 return e; 158 } 159 160 let did = auth_user.did; 161 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 162 .fetch_optional(&state.db) 163 .await 164 .ok() 165 .flatten(); 166 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 167 .execute(&state.db) 168 .await; 169 match result { 170 Ok(_) => { 171 if let Some(ref h) = handle { 172 let _ = state.cache.delete(&format!("handle:{}", h)).await; 173 } 174 if let Err(e) = 175 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 176 { 177 warn!("Failed to sequence account activation event: {}", e); 178 } 179 if let Err(e) = 180 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 181 .await 182 { 183 warn!("Failed to sequence identity event for activation: {}", e); 184 } 185 if let Err(e) = 186 crate::api::repo::record::sequence_empty_commit_event(&state, &did).await 187 { 188 warn!( 189 "Failed to sequence empty commit event for activation: {}", 190 e 191 ); 192 } 193 (StatusCode::OK, Json(json!({}))).into_response() 194 } 195 Err(e) => { 196 error!("DB error activating account: {:?}", e); 197 ( 198 StatusCode::INTERNAL_SERVER_ERROR, 199 Json(json!({"error": "InternalError"})), 200 ) 201 .into_response() 202 } 203 } 204} 205 206#[derive(Deserialize)] 207#[serde(rename_all = "camelCase")] 208pub struct DeactivateAccountInput { 209 pub delete_after: Option<String>, 210} 211 212pub async fn deactivate_account( 213 State(state): State<AppState>, 214 headers: axum::http::HeaderMap, 215 Json(_input): Json<DeactivateAccountInput>, 216) -> Response { 217 let extracted = match crate::auth::extract_auth_token_from_header( 218 headers.get("Authorization").and_then(|h| h.to_str().ok()), 219 ) { 220 Some(t) => t, 221 None => return ApiError::AuthenticationRequired.into_response(), 222 }; 223 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 224 let http_uri = format!( 225 "https://{}/xrpc/com.atproto.server.deactivateAccount", 226 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 227 ); 228 let auth_user = match crate::auth::validate_token_with_dpop( 229 &state.db, 230 &extracted.token, 231 extracted.is_dpop, 232 dpop_proof, 233 "POST", 234 &http_uri, 235 false, 236 ) 237 .await 238 { 239 Ok(user) => user, 240 Err(e) => return ApiError::from(e).into_response(), 241 }; 242 243 if let Err(e) = crate::auth::scope_check::check_account_scope( 244 auth_user.is_oauth, 245 auth_user.scope.as_deref(), 246 crate::oauth::scopes::AccountAttr::Repo, 247 crate::oauth::scopes::AccountAction::Manage, 248 ) { 249 return e; 250 } 251 252 let did = auth_user.did; 253 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 254 .fetch_optional(&state.db) 255 .await 256 .ok() 257 .flatten(); 258 let result = sqlx::query!( 259 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 260 did 261 ) 262 .execute(&state.db) 263 .await; 264 match result { 265 Ok(_) => { 266 if let Some(ref h) = handle { 267 let _ = state.cache.delete(&format!("handle:{}", h)).await; 268 } 269 if let Err(e) = crate::api::repo::record::sequence_account_event( 270 &state, 271 &did, 272 false, 273 Some("deactivated"), 274 ) 275 .await 276 { 277 warn!("Failed to sequence account deactivation event: {}", e); 278 } 279 (StatusCode::OK, Json(json!({}))).into_response() 280 } 281 Err(e) => { 282 error!("DB error deactivating account: {:?}", e); 283 ( 284 StatusCode::INTERNAL_SERVER_ERROR, 285 Json(json!({"error": "InternalError"})), 286 ) 287 .into_response() 288 } 289 } 290} 291 292pub async fn request_account_delete( 293 State(state): State<AppState>, 294 headers: axum::http::HeaderMap, 295) -> Response { 296 let extracted = match crate::auth::extract_auth_token_from_header( 297 headers.get("Authorization").and_then(|h| h.to_str().ok()), 298 ) { 299 Some(t) => t, 300 None => return ApiError::AuthenticationRequired.into_response(), 301 }; 302 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 303 let http_uri = format!( 304 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 305 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 306 ); 307 let validated = match crate::auth::validate_token_with_dpop( 308 &state.db, 309 &extracted.token, 310 extracted.is_dpop, 311 dpop_proof, 312 "POST", 313 &http_uri, 314 true, 315 ) 316 .await 317 { 318 Ok(user) => user, 319 Err(e) => return ApiError::from(e).into_response(), 320 }; 321 let did = validated.did.clone(); 322 323 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await { 324 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await; 325 } 326 327 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 328 .fetch_optional(&state.db) 329 .await 330 { 331 Ok(Some(id)) => id, 332 _ => { 333 return ( 334 StatusCode::INTERNAL_SERVER_ERROR, 335 Json(json!({"error": "InternalError"})), 336 ) 337 .into_response(); 338 } 339 }; 340 let confirmation_token = Uuid::new_v4().to_string(); 341 let expires_at = Utc::now() + Duration::minutes(15); 342 let insert = sqlx::query!( 343 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 344 confirmation_token, 345 did, 346 expires_at 347 ) 348 .execute(&state.db) 349 .await; 350 if let Err(e) = insert { 351 error!("DB error creating deletion token: {:?}", e); 352 return ( 353 StatusCode::INTERNAL_SERVER_ERROR, 354 Json(json!({"error": "InternalError"})), 355 ) 356 .into_response(); 357 } 358 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 359 if let Err(e) = 360 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 361 .await 362 { 363 warn!("Failed to enqueue account deletion notification: {:?}", e); 364 } 365 info!("Account deletion requested for user {}", did); 366 (StatusCode::OK, Json(json!({}))).into_response() 367} 368 369#[derive(Deserialize)] 370pub struct DeleteAccountInput { 371 pub did: String, 372 pub password: String, 373 pub token: String, 374} 375 376pub async fn delete_account( 377 State(state): State<AppState>, 378 Json(input): Json<DeleteAccountInput>, 379) -> Response { 380 let did = input.did.trim(); 381 let password = &input.password; 382 let token = input.token.trim(); 383 if did.is_empty() { 384 return ( 385 StatusCode::BAD_REQUEST, 386 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 387 ) 388 .into_response(); 389 } 390 if password.is_empty() { 391 return ( 392 StatusCode::BAD_REQUEST, 393 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 394 ) 395 .into_response(); 396 } 397 if token.is_empty() { 398 return ( 399 StatusCode::BAD_REQUEST, 400 Json(json!({"error": "InvalidToken", "message": "token is required"})), 401 ) 402 .into_response(); 403 } 404 let user = sqlx::query!( 405 "SELECT id, password_hash, handle FROM users WHERE did = $1", 406 did 407 ) 408 .fetch_optional(&state.db) 409 .await; 410 let (user_id, password_hash, handle) = match user { 411 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 412 Ok(None) => { 413 return ( 414 StatusCode::BAD_REQUEST, 415 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 416 ) 417 .into_response(); 418 } 419 Err(e) => { 420 error!("DB error in delete_account: {:?}", e); 421 return ( 422 StatusCode::INTERNAL_SERVER_ERROR, 423 Json(json!({"error": "InternalError"})), 424 ) 425 .into_response(); 426 } 427 }; 428 let password_valid = if password_hash 429 .as_ref() 430 .map(|h| verify(password, h).unwrap_or(false)) 431 .unwrap_or(false) 432 { 433 true 434 } else { 435 let app_pass_rows = sqlx::query!( 436 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 437 user_id 438 ) 439 .fetch_all(&state.db) 440 .await 441 .unwrap_or_default(); 442 app_pass_rows 443 .iter() 444 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 445 }; 446 if !password_valid { 447 return ( 448 StatusCode::UNAUTHORIZED, 449 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 450 ) 451 .into_response(); 452 } 453 let deletion_request = sqlx::query!( 454 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 455 token 456 ) 457 .fetch_optional(&state.db) 458 .await; 459 let (token_did, expires_at) = match deletion_request { 460 Ok(Some(row)) => (row.did, row.expires_at), 461 Ok(None) => { 462 return ( 463 StatusCode::BAD_REQUEST, 464 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 465 ) 466 .into_response(); 467 } 468 Err(e) => { 469 error!("DB error fetching deletion token: {:?}", e); 470 return ( 471 StatusCode::INTERNAL_SERVER_ERROR, 472 Json(json!({"error": "InternalError"})), 473 ) 474 .into_response(); 475 } 476 }; 477 if token_did != did { 478 return ( 479 StatusCode::BAD_REQUEST, 480 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 481 ) 482 .into_response(); 483 } 484 if Utc::now() > expires_at { 485 let _ = sqlx::query!( 486 "DELETE FROM account_deletion_requests WHERE token = $1", 487 token 488 ) 489 .execute(&state.db) 490 .await; 491 return ( 492 StatusCode::BAD_REQUEST, 493 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 494 ) 495 .into_response(); 496 } 497 let mut tx = match state.db.begin().await { 498 Ok(tx) => tx, 499 Err(e) => { 500 error!("Failed to begin transaction: {:?}", e); 501 return ( 502 StatusCode::INTERNAL_SERVER_ERROR, 503 Json(json!({"error": "InternalError"})), 504 ) 505 .into_response(); 506 } 507 }; 508 let deletion_result: Result<(), sqlx::Error> = async { 509 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 510 .execute(&mut *tx) 511 .await?; 512 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 513 .execute(&mut *tx) 514 .await?; 515 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 516 .execute(&mut *tx) 517 .await?; 518 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 519 .execute(&mut *tx) 520 .await?; 521 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 522 .execute(&mut *tx) 523 .await?; 524 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 525 .execute(&mut *tx) 526 .await?; 527 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 528 .execute(&mut *tx) 529 .await?; 530 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 531 .execute(&mut *tx) 532 .await?; 533 Ok(()) 534 } 535 .await; 536 match deletion_result { 537 Ok(()) => { 538 if let Err(e) = tx.commit().await { 539 error!("Failed to commit account deletion transaction: {:?}", e); 540 return ( 541 StatusCode::INTERNAL_SERVER_ERROR, 542 Json(json!({"error": "InternalError"})), 543 ) 544 .into_response(); 545 } 546 if let Err(e) = crate::api::repo::record::sequence_account_event( 547 &state, 548 did, 549 false, 550 Some("deleted"), 551 ) 552 .await 553 { 554 warn!( 555 "Failed to sequence account deletion event for {}: {}", 556 did, e 557 ); 558 } 559 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 560 info!("Account {} deleted successfully", did); 561 (StatusCode::OK, Json(json!({}))).into_response() 562 } 563 Err(e) => { 564 error!("DB error deleting account, rolling back: {:?}", e); 565 ( 566 StatusCode::INTERNAL_SERVER_ERROR, 567 Json(json!({"error": "InternalError"})), 568 ) 569 .into_response() 570 } 571 } 572}