this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::State, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use bcrypt::verify; 9use chrono::{Duration, Utc}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::{error, info, warn}; 13use uuid::Uuid; 14 15#[derive(Serialize)] 16#[serde(rename_all = "camelCase")] 17pub struct CheckAccountStatusOutput { 18 pub activated: bool, 19 pub valid_did: bool, 20 pub repo_commit: String, 21 pub repo_rev: String, 22 pub repo_blocks: i64, 23 pub indexed_records: i64, 24 pub private_state_values: i64, 25 pub expected_blobs: i64, 26 pub imported_blobs: i64, 27} 28 29pub async fn check_account_status( 30 State(state): State<AppState>, 31 headers: axum::http::HeaderMap, 32) -> Response { 33 let auth_header = headers.get("Authorization"); 34 if auth_header.is_none() { 35 return ( 36 StatusCode::UNAUTHORIZED, 37 Json(json!({"error": "AuthenticationRequired"})), 38 ) 39 .into_response(); 40 } 41 42 let token = auth_header 43 .unwrap() 44 .to_str() 45 .unwrap_or("") 46 .replace("Bearer ", ""); 47 48 let session = sqlx::query!( 49 r#" 50 SELECT s.did, k.key_bytes, u.id as user_id 51 FROM sessions s 52 JOIN users u ON s.did = u.did 53 JOIN user_keys k ON u.id = k.user_id 54 WHERE s.access_jwt = $1 55 "#, 56 token 57 ) 58 .fetch_optional(&state.db) 59 .await; 60 61 let (did, key_bytes, user_id) = match session { 62 Ok(Some(row)) => (row.did, row.key_bytes, row.user_id), 63 Ok(None) => { 64 return ( 65 StatusCode::UNAUTHORIZED, 66 Json(json!({"error": "AuthenticationFailed"})), 67 ) 68 .into_response(); 69 } 70 Err(e) => { 71 error!("DB error in check_account_status: {:?}", e); 72 return ( 73 StatusCode::INTERNAL_SERVER_ERROR, 74 Json(json!({"error": "InternalError"})), 75 ) 76 .into_response(); 77 } 78 }; 79 80 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 81 return ( 82 StatusCode::UNAUTHORIZED, 83 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 84 ) 85 .into_response(); 86 } 87 88 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 89 .fetch_optional(&state.db) 90 .await; 91 92 let deactivated_at = match user_status { 93 Ok(Some(row)) => row.deactivated_at, 94 _ => None, 95 }; 96 97 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 98 .fetch_optional(&state.db) 99 .await; 100 101 let repo_commit = match repo_result { 102 Ok(Some(row)) => row.repo_root_cid, 103 _ => String::new(), 104 }; 105 106 let record_count: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 107 .fetch_one(&state.db) 108 .await 109 .unwrap_or(Some(0)) 110 .unwrap_or(0); 111 112 let blob_count: i64 = 113 sqlx::query_scalar!("SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", user_id) 114 .fetch_one(&state.db) 115 .await 116 .unwrap_or(Some(0)) 117 .unwrap_or(0); 118 119 let valid_did = did.starts_with("did:"); 120 121 ( 122 StatusCode::OK, 123 Json(CheckAccountStatusOutput { 124 activated: deactivated_at.is_none(), 125 valid_did, 126 repo_commit: repo_commit.clone(), 127 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 128 repo_blocks: 0, 129 indexed_records: record_count, 130 private_state_values: 0, 131 expected_blobs: blob_count, 132 imported_blobs: blob_count, 133 }), 134 ) 135 .into_response() 136} 137 138pub async fn activate_account( 139 State(state): State<AppState>, 140 headers: axum::http::HeaderMap, 141) -> Response { 142 let auth_header = headers.get("Authorization"); 143 if auth_header.is_none() { 144 return ( 145 StatusCode::UNAUTHORIZED, 146 Json(json!({"error": "AuthenticationRequired"})), 147 ) 148 .into_response(); 149 } 150 151 let token = auth_header 152 .unwrap() 153 .to_str() 154 .unwrap_or("") 155 .replace("Bearer ", ""); 156 157 let session = sqlx::query!( 158 r#" 159 SELECT s.did, k.key_bytes 160 FROM sessions s 161 JOIN users u ON s.did = u.did 162 JOIN user_keys k ON u.id = k.user_id 163 WHERE s.access_jwt = $1 164 "#, 165 token 166 ) 167 .fetch_optional(&state.db) 168 .await; 169 170 let (did, key_bytes) = match session { 171 Ok(Some(row)) => (row.did, row.key_bytes), 172 Ok(None) => { 173 return ( 174 StatusCode::UNAUTHORIZED, 175 Json(json!({"error": "AuthenticationFailed"})), 176 ) 177 .into_response(); 178 } 179 Err(e) => { 180 error!("DB error in activate_account: {:?}", e); 181 return ( 182 StatusCode::INTERNAL_SERVER_ERROR, 183 Json(json!({"error": "InternalError"})), 184 ) 185 .into_response(); 186 } 187 }; 188 189 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 190 return ( 191 StatusCode::UNAUTHORIZED, 192 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 193 ) 194 .into_response(); 195 } 196 197 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 198 .execute(&state.db) 199 .await; 200 201 match result { 202 Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(), 203 Err(e) => { 204 error!("DB error activating account: {:?}", e); 205 ( 206 StatusCode::INTERNAL_SERVER_ERROR, 207 Json(json!({"error": "InternalError"})), 208 ) 209 .into_response() 210 } 211 } 212} 213 214#[derive(Deserialize)] 215#[serde(rename_all = "camelCase")] 216pub struct DeactivateAccountInput { 217 pub delete_after: Option<String>, 218} 219 220pub async fn deactivate_account( 221 State(state): State<AppState>, 222 headers: axum::http::HeaderMap, 223 Json(_input): Json<DeactivateAccountInput>, 224) -> Response { 225 let auth_header = headers.get("Authorization"); 226 if auth_header.is_none() { 227 return ( 228 StatusCode::UNAUTHORIZED, 229 Json(json!({"error": "AuthenticationRequired"})), 230 ) 231 .into_response(); 232 } 233 234 let token = auth_header 235 .unwrap() 236 .to_str() 237 .unwrap_or("") 238 .replace("Bearer ", ""); 239 240 let session = sqlx::query!( 241 r#" 242 SELECT s.did, k.key_bytes 243 FROM sessions s 244 JOIN users u ON s.did = u.did 245 JOIN user_keys k ON u.id = k.user_id 246 WHERE s.access_jwt = $1 247 "#, 248 token 249 ) 250 .fetch_optional(&state.db) 251 .await; 252 253 let (did, key_bytes) = match session { 254 Ok(Some(row)) => (row.did, row.key_bytes), 255 Ok(None) => { 256 return ( 257 StatusCode::UNAUTHORIZED, 258 Json(json!({"error": "AuthenticationFailed"})), 259 ) 260 .into_response(); 261 } 262 Err(e) => { 263 error!("DB error in deactivate_account: {:?}", e); 264 return ( 265 StatusCode::INTERNAL_SERVER_ERROR, 266 Json(json!({"error": "InternalError"})), 267 ) 268 .into_response(); 269 } 270 }; 271 272 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 273 return ( 274 StatusCode::UNAUTHORIZED, 275 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 276 ) 277 .into_response(); 278 } 279 280 let result = sqlx::query!("UPDATE users SET deactivated_at = NOW() WHERE did = $1", did) 281 .execute(&state.db) 282 .await; 283 284 match result { 285 Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(), 286 Err(e) => { 287 error!("DB error deactivating account: {:?}", e); 288 ( 289 StatusCode::INTERNAL_SERVER_ERROR, 290 Json(json!({"error": "InternalError"})), 291 ) 292 .into_response() 293 } 294 } 295} 296 297pub async fn request_account_delete( 298 State(state): State<AppState>, 299 headers: axum::http::HeaderMap, 300) -> Response { 301 let auth_header = headers.get("Authorization"); 302 if auth_header.is_none() { 303 return ( 304 StatusCode::UNAUTHORIZED, 305 Json(json!({"error": "AuthenticationRequired"})), 306 ) 307 .into_response(); 308 } 309 310 let token = auth_header 311 .unwrap() 312 .to_str() 313 .unwrap_or("") 314 .replace("Bearer ", ""); 315 316 let session = sqlx::query!( 317 r#" 318 SELECT s.did, u.id as user_id, u.email, u.handle, k.key_bytes 319 FROM sessions s 320 JOIN users u ON s.did = u.did 321 JOIN user_keys k ON u.id = k.user_id 322 WHERE s.access_jwt = $1 323 "#, 324 token 325 ) 326 .fetch_optional(&state.db) 327 .await; 328 329 let (did, user_id, email, handle, key_bytes) = match session { 330 Ok(Some(row)) => (row.did, row.user_id, row.email, row.handle, row.key_bytes), 331 Ok(None) => { 332 return ( 333 StatusCode::UNAUTHORIZED, 334 Json(json!({"error": "AuthenticationFailed"})), 335 ) 336 .into_response(); 337 } 338 Err(e) => { 339 error!("DB error in request_account_delete: {:?}", e); 340 return ( 341 StatusCode::INTERNAL_SERVER_ERROR, 342 Json(json!({"error": "InternalError"})), 343 ) 344 .into_response(); 345 } 346 }; 347 348 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 349 return ( 350 StatusCode::UNAUTHORIZED, 351 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 352 ) 353 .into_response(); 354 } 355 356 let confirmation_token = Uuid::new_v4().to_string(); 357 let expires_at = Utc::now() + Duration::minutes(15); 358 359 let insert = sqlx::query!( 360 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 361 confirmation_token, 362 did, 363 expires_at 364 ) 365 .execute(&state.db) 366 .await; 367 368 if let Err(e) = insert { 369 error!("DB error creating deletion token: {:?}", e); 370 return ( 371 StatusCode::INTERNAL_SERVER_ERROR, 372 Json(json!({"error": "InternalError"})), 373 ) 374 .into_response(); 375 } 376 377 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 378 if let Err(e) = crate::notifications::enqueue_account_deletion( 379 &state.db, 380 user_id, 381 &email, 382 &handle, 383 &confirmation_token, 384 &hostname, 385 ) 386 .await 387 { 388 warn!("Failed to enqueue account deletion notification: {:?}", e); 389 } 390 391 info!("Account deletion requested for user {}", did); 392 393 (StatusCode::OK, Json(json!({}))).into_response() 394} 395 396#[derive(Deserialize)] 397pub struct DeleteAccountInput { 398 pub did: String, 399 pub password: String, 400 pub token: String, 401} 402 403pub async fn delete_account( 404 State(state): State<AppState>, 405 Json(input): Json<DeleteAccountInput>, 406) -> Response { 407 let did = input.did.trim(); 408 let password = &input.password; 409 let token = input.token.trim(); 410 411 if did.is_empty() { 412 return ( 413 StatusCode::BAD_REQUEST, 414 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 415 ) 416 .into_response(); 417 } 418 419 if password.is_empty() { 420 return ( 421 StatusCode::BAD_REQUEST, 422 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 423 ) 424 .into_response(); 425 } 426 427 if token.is_empty() { 428 return ( 429 StatusCode::BAD_REQUEST, 430 Json(json!({"error": "InvalidToken", "message": "token is required"})), 431 ) 432 .into_response(); 433 } 434 435 let user = sqlx::query!( 436 "SELECT id, password_hash FROM users WHERE did = $1", 437 did 438 ) 439 .fetch_optional(&state.db) 440 .await; 441 442 let (user_id, password_hash) = match user { 443 Ok(Some(row)) => (row.id, row.password_hash), 444 Ok(None) => { 445 return ( 446 StatusCode::BAD_REQUEST, 447 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 448 ) 449 .into_response(); 450 } 451 Err(e) => { 452 error!("DB error in delete_account: {:?}", e); 453 return ( 454 StatusCode::INTERNAL_SERVER_ERROR, 455 Json(json!({"error": "InternalError"})), 456 ) 457 .into_response(); 458 } 459 }; 460 461 let password_valid = if verify(password, &password_hash).unwrap_or(false) { 462 true 463 } else { 464 let app_pass_rows = sqlx::query!( 465 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 466 user_id 467 ) 468 .fetch_all(&state.db) 469 .await 470 .unwrap_or_default(); 471 472 app_pass_rows 473 .iter() 474 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 475 }; 476 477 if !password_valid { 478 return ( 479 StatusCode::UNAUTHORIZED, 480 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 481 ) 482 .into_response(); 483 } 484 485 let deletion_request = sqlx::query!( 486 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 487 token 488 ) 489 .fetch_optional(&state.db) 490 .await; 491 492 let (token_did, expires_at) = match deletion_request { 493 Ok(Some(row)) => (row.did, row.expires_at), 494 Ok(None) => { 495 return ( 496 StatusCode::BAD_REQUEST, 497 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 498 ) 499 .into_response(); 500 } 501 Err(e) => { 502 error!("DB error fetching deletion token: {:?}", e); 503 return ( 504 StatusCode::INTERNAL_SERVER_ERROR, 505 Json(json!({"error": "InternalError"})), 506 ) 507 .into_response(); 508 } 509 }; 510 511 if token_did != did { 512 return ( 513 StatusCode::BAD_REQUEST, 514 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 515 ) 516 .into_response(); 517 } 518 519 if Utc::now() > expires_at { 520 let _ = sqlx::query!("DELETE FROM account_deletion_requests WHERE token = $1", token) 521 .execute(&state.db) 522 .await; 523 524 return ( 525 StatusCode::BAD_REQUEST, 526 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 527 ) 528 .into_response(); 529 } 530 531 let mut tx = match state.db.begin().await { 532 Ok(tx) => tx, 533 Err(e) => { 534 error!("Failed to begin transaction: {:?}", e); 535 return ( 536 StatusCode::INTERNAL_SERVER_ERROR, 537 Json(json!({"error": "InternalError"})), 538 ) 539 .into_response(); 540 } 541 }; 542 543 let deletion_result: Result<(), sqlx::Error> = async { 544 sqlx::query!("DELETE FROM sessions WHERE did = $1", did) 545 .execute(&mut *tx) 546 .await?; 547 548 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 549 .execute(&mut *tx) 550 .await?; 551 552 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 553 .execute(&mut *tx) 554 .await?; 555 556 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 557 .execute(&mut *tx) 558 .await?; 559 560 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 561 .execute(&mut *tx) 562 .await?; 563 564 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 565 .execute(&mut *tx) 566 .await?; 567 568 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 569 .execute(&mut *tx) 570 .await?; 571 572 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 573 .execute(&mut *tx) 574 .await?; 575 576 Ok(()) 577 } 578 .await; 579 580 match deletion_result { 581 Ok(()) => { 582 if let Err(e) = tx.commit().await { 583 error!("Failed to commit account deletion transaction: {:?}", e); 584 return ( 585 StatusCode::INTERNAL_SERVER_ERROR, 586 Json(json!({"error": "InternalError"})), 587 ) 588 .into_response(); 589 } 590 info!("Account {} deleted successfully", did); 591 (StatusCode::OK, Json(json!({}))).into_response() 592 } 593 Err(e) => { 594 error!("DB error deleting account, rolling back: {:?}", e); 595 ( 596 StatusCode::INTERNAL_SERVER_ERROR, 597 Json(json!({"error": "InternalError"})), 598 ) 599 .into_response() 600 } 601 } 602}