this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use bcrypt::verify; 10use chrono::{Duration, Utc}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14use uuid::Uuid; 15 16#[derive(Serialize)] 17#[serde(rename_all = "camelCase")] 18pub struct CheckAccountStatusOutput { 19 pub activated: bool, 20 pub valid_did: bool, 21 pub repo_commit: String, 22 pub repo_rev: String, 23 pub repo_blocks: i64, 24 pub indexed_records: i64, 25 pub private_state_values: i64, 26 pub expected_blobs: i64, 27 pub imported_blobs: i64, 28} 29 30pub async fn check_account_status( 31 State(state): State<AppState>, 32 headers: axum::http::HeaderMap, 33) -> Response { 34 let extracted = match crate::auth::extract_auth_token_from_header( 35 headers.get("Authorization").and_then(|h| h.to_str().ok()), 36 ) { 37 Some(t) => t, 38 None => return ApiError::AuthenticationRequired.into_response(), 39 }; 40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 41 let http_uri = format!( 42 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 43 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 44 ); 45 let did = match crate::auth::validate_token_with_dpop( 46 &state.db, 47 &extracted.token, 48 extracted.is_dpop, 49 dpop_proof, 50 "GET", 51 &http_uri, 52 true, 53 ) 54 .await 55 { 56 Ok(user) => user.did, 57 Err(e) => return ApiError::from(e).into_response(), 58 }; 59 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 60 .fetch_optional(&state.db) 61 .await 62 { 63 Ok(Some(id)) => id, 64 _ => { 65 return ( 66 StatusCode::INTERNAL_SERVER_ERROR, 67 Json(json!({"error": "InternalError"})), 68 ) 69 .into_response(); 70 } 71 }; 72 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 73 .fetch_optional(&state.db) 74 .await; 75 let deactivated_at = match user_status { 76 Ok(Some(row)) => row.deactivated_at, 77 _ => None, 78 }; 79 let repo_result = sqlx::query!( 80 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 81 user_id 82 ) 83 .fetch_optional(&state.db) 84 .await; 85 let repo_commit = match repo_result { 86 Ok(Some(row)) => row.repo_root_cid, 87 _ => String::new(), 88 }; 89 let record_count: i64 = 90 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 91 .fetch_one(&state.db) 92 .await 93 .unwrap_or(Some(0)) 94 .unwrap_or(0); 95 let blob_count: i64 = sqlx::query_scalar!( 96 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 97 user_id 98 ) 99 .fetch_one(&state.db) 100 .await 101 .unwrap_or(Some(0)) 102 .unwrap_or(0); 103 let valid_did = did.starts_with("did:"); 104 ( 105 StatusCode::OK, 106 Json(CheckAccountStatusOutput { 107 activated: deactivated_at.is_none(), 108 valid_did, 109 repo_commit: repo_commit.clone(), 110 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 111 repo_blocks: 0, 112 indexed_records: record_count, 113 private_state_values: 0, 114 expected_blobs: blob_count, 115 imported_blobs: blob_count, 116 }), 117 ) 118 .into_response() 119} 120 121pub async fn activate_account( 122 State(state): State<AppState>, 123 headers: axum::http::HeaderMap, 124) -> Response { 125 let extracted = match crate::auth::extract_auth_token_from_header( 126 headers.get("Authorization").and_then(|h| h.to_str().ok()), 127 ) { 128 Some(t) => t, 129 None => return ApiError::AuthenticationRequired.into_response(), 130 }; 131 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 132 let http_uri = format!( 133 "https://{}/xrpc/com.atproto.server.activateAccount", 134 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 135 ); 136 let auth_user = match crate::auth::validate_token_with_dpop( 137 &state.db, 138 &extracted.token, 139 extracted.is_dpop, 140 dpop_proof, 141 "POST", 142 &http_uri, 143 true, 144 ) 145 .await 146 { 147 Ok(user) => user, 148 Err(e) => return ApiError::from(e).into_response(), 149 }; 150 151 if let Err(e) = crate::auth::scope_check::check_account_scope( 152 auth_user.is_oauth, 153 auth_user.scope.as_deref(), 154 crate::oauth::scopes::AccountAttr::Repo, 155 crate::oauth::scopes::AccountAction::Manage, 156 ) { 157 return e; 158 } 159 160 let did = auth_user.did; 161 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 162 .fetch_optional(&state.db) 163 .await 164 .ok() 165 .flatten(); 166 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 167 .execute(&state.db) 168 .await; 169 match result { 170 Ok(_) => { 171 if let Some(ref h) = handle { 172 let _ = state.cache.delete(&format!("handle:{}", h)).await; 173 } 174 if let Err(e) = 175 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 176 { 177 warn!("Failed to sequence account activation event: {}", e); 178 } 179 if let Err(e) = 180 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 181 .await 182 { 183 warn!("Failed to sequence identity event for activation: {}", e); 184 } 185 if let Err(e) = 186 crate::api::repo::record::sequence_empty_commit_event(&state, &did).await 187 { 188 warn!( 189 "Failed to sequence empty commit event for activation: {}", 190 e 191 ); 192 } 193 (StatusCode::OK, Json(json!({}))).into_response() 194 } 195 Err(e) => { 196 error!("DB error activating account: {:?}", e); 197 ( 198 StatusCode::INTERNAL_SERVER_ERROR, 199 Json(json!({"error": "InternalError"})), 200 ) 201 .into_response() 202 } 203 } 204} 205 206#[derive(Deserialize)] 207#[serde(rename_all = "camelCase")] 208pub struct DeactivateAccountInput { 209 pub delete_after: Option<String>, 210} 211 212pub async fn deactivate_account( 213 State(state): State<AppState>, 214 headers: axum::http::HeaderMap, 215 Json(_input): Json<DeactivateAccountInput>, 216) -> Response { 217 let extracted = match crate::auth::extract_auth_token_from_header( 218 headers.get("Authorization").and_then(|h| h.to_str().ok()), 219 ) { 220 Some(t) => t, 221 None => return ApiError::AuthenticationRequired.into_response(), 222 }; 223 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 224 let http_uri = format!( 225 "https://{}/xrpc/com.atproto.server.deactivateAccount", 226 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 227 ); 228 let auth_user = match crate::auth::validate_token_with_dpop( 229 &state.db, 230 &extracted.token, 231 extracted.is_dpop, 232 dpop_proof, 233 "POST", 234 &http_uri, 235 false, 236 ) 237 .await 238 { 239 Ok(user) => user, 240 Err(e) => return ApiError::from(e).into_response(), 241 }; 242 243 if let Err(e) = crate::auth::scope_check::check_account_scope( 244 auth_user.is_oauth, 245 auth_user.scope.as_deref(), 246 crate::oauth::scopes::AccountAttr::Repo, 247 crate::oauth::scopes::AccountAction::Manage, 248 ) { 249 return e; 250 } 251 252 let did = auth_user.did; 253 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 254 .fetch_optional(&state.db) 255 .await 256 .ok() 257 .flatten(); 258 let result = sqlx::query!( 259 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 260 did 261 ) 262 .execute(&state.db) 263 .await; 264 match result { 265 Ok(_) => { 266 if let Some(ref h) = handle { 267 let _ = state.cache.delete(&format!("handle:{}", h)).await; 268 } 269 if let Err(e) = crate::api::repo::record::sequence_account_event( 270 &state, 271 &did, 272 false, 273 Some("deactivated"), 274 ) 275 .await 276 { 277 warn!("Failed to sequence account deactivation event: {}", e); 278 } 279 (StatusCode::OK, Json(json!({}))).into_response() 280 } 281 Err(e) => { 282 error!("DB error deactivating account: {:?}", e); 283 ( 284 StatusCode::INTERNAL_SERVER_ERROR, 285 Json(json!({"error": "InternalError"})), 286 ) 287 .into_response() 288 } 289 } 290} 291 292pub async fn request_account_delete( 293 State(state): State<AppState>, 294 headers: axum::http::HeaderMap, 295) -> Response { 296 let extracted = match crate::auth::extract_auth_token_from_header( 297 headers.get("Authorization").and_then(|h| h.to_str().ok()), 298 ) { 299 Some(t) => t, 300 None => return ApiError::AuthenticationRequired.into_response(), 301 }; 302 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 303 let http_uri = format!( 304 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 305 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 306 ); 307 let did = match crate::auth::validate_token_with_dpop( 308 &state.db, 309 &extracted.token, 310 extracted.is_dpop, 311 dpop_proof, 312 "POST", 313 &http_uri, 314 true, 315 ) 316 .await 317 { 318 Ok(user) => user.did, 319 Err(e) => return ApiError::from(e).into_response(), 320 }; 321 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 322 .fetch_optional(&state.db) 323 .await 324 { 325 Ok(Some(id)) => id, 326 _ => { 327 return ( 328 StatusCode::INTERNAL_SERVER_ERROR, 329 Json(json!({"error": "InternalError"})), 330 ) 331 .into_response(); 332 } 333 }; 334 let confirmation_token = Uuid::new_v4().to_string(); 335 let expires_at = Utc::now() + Duration::minutes(15); 336 let insert = sqlx::query!( 337 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 338 confirmation_token, 339 did, 340 expires_at 341 ) 342 .execute(&state.db) 343 .await; 344 if let Err(e) = insert { 345 error!("DB error creating deletion token: {:?}", e); 346 return ( 347 StatusCode::INTERNAL_SERVER_ERROR, 348 Json(json!({"error": "InternalError"})), 349 ) 350 .into_response(); 351 } 352 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 353 if let Err(e) = 354 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname) 355 .await 356 { 357 warn!("Failed to enqueue account deletion notification: {:?}", e); 358 } 359 info!("Account deletion requested for user {}", did); 360 (StatusCode::OK, Json(json!({}))).into_response() 361} 362 363#[derive(Deserialize)] 364pub struct DeleteAccountInput { 365 pub did: String, 366 pub password: String, 367 pub token: String, 368} 369 370pub async fn delete_account( 371 State(state): State<AppState>, 372 Json(input): Json<DeleteAccountInput>, 373) -> Response { 374 let did = input.did.trim(); 375 let password = &input.password; 376 let token = input.token.trim(); 377 if did.is_empty() { 378 return ( 379 StatusCode::BAD_REQUEST, 380 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 381 ) 382 .into_response(); 383 } 384 if password.is_empty() { 385 return ( 386 StatusCode::BAD_REQUEST, 387 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 388 ) 389 .into_response(); 390 } 391 if token.is_empty() { 392 return ( 393 StatusCode::BAD_REQUEST, 394 Json(json!({"error": "InvalidToken", "message": "token is required"})), 395 ) 396 .into_response(); 397 } 398 let user = sqlx::query!( 399 "SELECT id, password_hash, handle FROM users WHERE did = $1", 400 did 401 ) 402 .fetch_optional(&state.db) 403 .await; 404 let (user_id, password_hash, handle) = match user { 405 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 406 Ok(None) => { 407 return ( 408 StatusCode::BAD_REQUEST, 409 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 410 ) 411 .into_response(); 412 } 413 Err(e) => { 414 error!("DB error in delete_account: {:?}", e); 415 return ( 416 StatusCode::INTERNAL_SERVER_ERROR, 417 Json(json!({"error": "InternalError"})), 418 ) 419 .into_response(); 420 } 421 }; 422 let password_valid = if verify(password, &password_hash).unwrap_or(false) { 423 true 424 } else { 425 let app_pass_rows = sqlx::query!( 426 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 427 user_id 428 ) 429 .fetch_all(&state.db) 430 .await 431 .unwrap_or_default(); 432 app_pass_rows 433 .iter() 434 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 435 }; 436 if !password_valid { 437 return ( 438 StatusCode::UNAUTHORIZED, 439 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 440 ) 441 .into_response(); 442 } 443 let deletion_request = sqlx::query!( 444 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 445 token 446 ) 447 .fetch_optional(&state.db) 448 .await; 449 let (token_did, expires_at) = match deletion_request { 450 Ok(Some(row)) => (row.did, row.expires_at), 451 Ok(None) => { 452 return ( 453 StatusCode::BAD_REQUEST, 454 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 455 ) 456 .into_response(); 457 } 458 Err(e) => { 459 error!("DB error fetching deletion token: {:?}", e); 460 return ( 461 StatusCode::INTERNAL_SERVER_ERROR, 462 Json(json!({"error": "InternalError"})), 463 ) 464 .into_response(); 465 } 466 }; 467 if token_did != did { 468 return ( 469 StatusCode::BAD_REQUEST, 470 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 471 ) 472 .into_response(); 473 } 474 if Utc::now() > expires_at { 475 let _ = sqlx::query!( 476 "DELETE FROM account_deletion_requests WHERE token = $1", 477 token 478 ) 479 .execute(&state.db) 480 .await; 481 return ( 482 StatusCode::BAD_REQUEST, 483 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 484 ) 485 .into_response(); 486 } 487 let mut tx = match state.db.begin().await { 488 Ok(tx) => tx, 489 Err(e) => { 490 error!("Failed to begin transaction: {:?}", e); 491 return ( 492 StatusCode::INTERNAL_SERVER_ERROR, 493 Json(json!({"error": "InternalError"})), 494 ) 495 .into_response(); 496 } 497 }; 498 let deletion_result: Result<(), sqlx::Error> = async { 499 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 500 .execute(&mut *tx) 501 .await?; 502 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 503 .execute(&mut *tx) 504 .await?; 505 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 506 .execute(&mut *tx) 507 .await?; 508 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 509 .execute(&mut *tx) 510 .await?; 511 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 512 .execute(&mut *tx) 513 .await?; 514 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 515 .execute(&mut *tx) 516 .await?; 517 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 518 .execute(&mut *tx) 519 .await?; 520 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 521 .execute(&mut *tx) 522 .await?; 523 Ok(()) 524 } 525 .await; 526 match deletion_result { 527 Ok(()) => { 528 if let Err(e) = tx.commit().await { 529 error!("Failed to commit account deletion transaction: {:?}", e); 530 return ( 531 StatusCode::INTERNAL_SERVER_ERROR, 532 Json(json!({"error": "InternalError"})), 533 ) 534 .into_response(); 535 } 536 if let Err(e) = crate::api::repo::record::sequence_account_event( 537 &state, 538 did, 539 false, 540 Some("deleted"), 541 ) 542 .await 543 { 544 warn!( 545 "Failed to sequence account deletion event for {}: {}", 546 did, e 547 ); 548 } 549 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 550 info!("Account {} deleted successfully", did); 551 (StatusCode::OK, Json(json!({}))).into_response() 552 } 553 Err(e) => { 554 error!("DB error deleting account, rolling back: {:?}", e); 555 ( 556 StatusCode::INTERNAL_SERVER_ERROR, 557 Json(json!({"error": "InternalError"})), 558 ) 559 .into_response() 560 } 561 } 562}