this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use bcrypt::verify; 10use chrono::{Duration, Utc}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14use uuid::Uuid; 15 16#[derive(Serialize)] 17#[serde(rename_all = "camelCase")] 18pub struct CheckAccountStatusOutput { 19 pub activated: bool, 20 pub valid_did: bool, 21 pub repo_commit: String, 22 pub repo_rev: String, 23 pub repo_blocks: i64, 24 pub indexed_records: i64, 25 pub private_state_values: i64, 26 pub expected_blobs: i64, 27 pub imported_blobs: i64, 28} 29 30pub async fn check_account_status( 31 State(state): State<AppState>, 32 headers: axum::http::HeaderMap, 33) -> Response { 34 let extracted = match crate::auth::extract_auth_token_from_header( 35 headers.get("Authorization").and_then(|h| h.to_str().ok()), 36 ) { 37 Some(t) => t, 38 None => return ApiError::AuthenticationRequired.into_response(), 39 }; 40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 41 let http_uri = format!( 42 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 43 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 44 ); 45 let did = match crate::auth::validate_token_with_dpop( 46 &state.db, 47 &extracted.token, 48 extracted.is_dpop, 49 dpop_proof, 50 "GET", 51 &http_uri, 52 true, 53 ) 54 .await 55 { 56 Ok(user) => user.did, 57 Err(e) => return ApiError::from(e).into_response(), 58 }; 59 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 60 .fetch_optional(&state.db) 61 .await 62 { 63 Ok(Some(id)) => id, 64 _ => { 65 return ( 66 StatusCode::INTERNAL_SERVER_ERROR, 67 Json(json!({"error": "InternalError"})), 68 ) 69 .into_response(); 70 } 71 }; 72 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 73 .fetch_optional(&state.db) 74 .await; 75 let deactivated_at = match user_status { 76 Ok(Some(row)) => row.deactivated_at, 77 _ => None, 78 }; 79 let repo_result = sqlx::query!( 80 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 81 user_id 82 ) 83 .fetch_optional(&state.db) 84 .await; 85 let repo_commit = match repo_result { 86 Ok(Some(row)) => row.repo_root_cid, 87 _ => String::new(), 88 }; 89 let record_count: i64 = 90 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 91 .fetch_one(&state.db) 92 .await 93 .unwrap_or(Some(0)) 94 .unwrap_or(0); 95 let blob_count: i64 = sqlx::query_scalar!( 96 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 97 user_id 98 ) 99 .fetch_one(&state.db) 100 .await 101 .unwrap_or(Some(0)) 102 .unwrap_or(0); 103 let valid_did = did.starts_with("did:"); 104 ( 105 StatusCode::OK, 106 Json(CheckAccountStatusOutput { 107 activated: deactivated_at.is_none(), 108 valid_did, 109 repo_commit: repo_commit.clone(), 110 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 111 repo_blocks: 0, 112 indexed_records: record_count, 113 private_state_values: 0, 114 expected_blobs: blob_count, 115 imported_blobs: blob_count, 116 }), 117 ) 118 .into_response() 119} 120 121pub async fn activate_account( 122 State(state): State<AppState>, 123 headers: axum::http::HeaderMap, 124) -> Response { 125 let extracted = match crate::auth::extract_auth_token_from_header( 126 headers.get("Authorization").and_then(|h| h.to_str().ok()), 127 ) { 128 Some(t) => t, 129 None => return ApiError::AuthenticationRequired.into_response(), 130 }; 131 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 132 let http_uri = format!( 133 "https://{}/xrpc/com.atproto.server.activateAccount", 134 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 135 ); 136 let did = match crate::auth::validate_token_with_dpop( 137 &state.db, 138 &extracted.token, 139 extracted.is_dpop, 140 dpop_proof, 141 "POST", 142 &http_uri, 143 true, 144 ) 145 .await 146 { 147 Ok(user) => user.did, 148 Err(e) => return ApiError::from(e).into_response(), 149 }; 150 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 151 .fetch_optional(&state.db) 152 .await 153 .ok() 154 .flatten(); 155 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 156 .execute(&state.db) 157 .await; 158 match result { 159 Ok(_) => { 160 if let Some(h) = handle { 161 let _ = state.cache.delete(&format!("handle:{}", h)).await; 162 } 163 (StatusCode::OK, Json(json!({}))).into_response() 164 } 165 Err(e) => { 166 error!("DB error activating account: {:?}", e); 167 ( 168 StatusCode::INTERNAL_SERVER_ERROR, 169 Json(json!({"error": "InternalError"})), 170 ) 171 .into_response() 172 } 173 } 174} 175 176#[derive(Deserialize)] 177#[serde(rename_all = "camelCase")] 178pub struct DeactivateAccountInput { 179 pub delete_after: Option<String>, 180} 181 182pub async fn deactivate_account( 183 State(state): State<AppState>, 184 headers: axum::http::HeaderMap, 185 Json(_input): Json<DeactivateAccountInput>, 186) -> Response { 187 let extracted = match crate::auth::extract_auth_token_from_header( 188 headers.get("Authorization").and_then(|h| h.to_str().ok()), 189 ) { 190 Some(t) => t, 191 None => return ApiError::AuthenticationRequired.into_response(), 192 }; 193 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 194 let http_uri = format!( 195 "https://{}/xrpc/com.atproto.server.deactivateAccount", 196 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 197 ); 198 let did = match crate::auth::validate_token_with_dpop( 199 &state.db, 200 &extracted.token, 201 extracted.is_dpop, 202 dpop_proof, 203 "POST", 204 &http_uri, 205 false, 206 ) 207 .await 208 { 209 Ok(user) => user.did, 210 Err(e) => return ApiError::from(e).into_response(), 211 }; 212 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 213 .fetch_optional(&state.db) 214 .await 215 .ok() 216 .flatten(); 217 let result = sqlx::query!( 218 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 219 did 220 ) 221 .execute(&state.db) 222 .await; 223 match result { 224 Ok(_) => { 225 if let Some(h) = handle { 226 let _ = state.cache.delete(&format!("handle:{}", h)).await; 227 } 228 (StatusCode::OK, Json(json!({}))).into_response() 229 } 230 Err(e) => { 231 error!("DB error deactivating account: {:?}", e); 232 ( 233 StatusCode::INTERNAL_SERVER_ERROR, 234 Json(json!({"error": "InternalError"})), 235 ) 236 .into_response() 237 } 238 } 239} 240 241pub async fn request_account_delete( 242 State(state): State<AppState>, 243 headers: axum::http::HeaderMap, 244) -> Response { 245 let extracted = match crate::auth::extract_auth_token_from_header( 246 headers.get("Authorization").and_then(|h| h.to_str().ok()), 247 ) { 248 Some(t) => t, 249 None => return ApiError::AuthenticationRequired.into_response(), 250 }; 251 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 252 let http_uri = format!( 253 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 254 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 255 ); 256 let did = match crate::auth::validate_token_with_dpop( 257 &state.db, 258 &extracted.token, 259 extracted.is_dpop, 260 dpop_proof, 261 "POST", 262 &http_uri, 263 true, 264 ) 265 .await 266 { 267 Ok(user) => user.did, 268 Err(e) => return ApiError::from(e).into_response(), 269 }; 270 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 271 .fetch_optional(&state.db) 272 .await 273 { 274 Ok(Some(id)) => id, 275 _ => { 276 return ( 277 StatusCode::INTERNAL_SERVER_ERROR, 278 Json(json!({"error": "InternalError"})), 279 ) 280 .into_response(); 281 } 282 }; 283 let confirmation_token = Uuid::new_v4().to_string(); 284 let expires_at = Utc::now() + Duration::minutes(15); 285 let insert = sqlx::query!( 286 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 287 confirmation_token, 288 did, 289 expires_at 290 ) 291 .execute(&state.db) 292 .await; 293 if let Err(e) = insert { 294 error!("DB error creating deletion token: {:?}", e); 295 return ( 296 StatusCode::INTERNAL_SERVER_ERROR, 297 Json(json!({"error": "InternalError"})), 298 ) 299 .into_response(); 300 } 301 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 302 if let Err(e) = crate::notifications::enqueue_account_deletion( 303 &state.db, 304 user_id, 305 &confirmation_token, 306 &hostname, 307 ) 308 .await 309 { 310 warn!("Failed to enqueue account deletion notification: {:?}", e); 311 } 312 info!("Account deletion requested for user {}", did); 313 (StatusCode::OK, Json(json!({}))).into_response() 314} 315 316#[derive(Deserialize)] 317pub struct DeleteAccountInput { 318 pub did: String, 319 pub password: String, 320 pub token: String, 321} 322 323pub async fn delete_account( 324 State(state): State<AppState>, 325 Json(input): Json<DeleteAccountInput>, 326) -> Response { 327 let did = input.did.trim(); 328 let password = &input.password; 329 let token = input.token.trim(); 330 if did.is_empty() { 331 return ( 332 StatusCode::BAD_REQUEST, 333 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 334 ) 335 .into_response(); 336 } 337 if password.is_empty() { 338 return ( 339 StatusCode::BAD_REQUEST, 340 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 341 ) 342 .into_response(); 343 } 344 if token.is_empty() { 345 return ( 346 StatusCode::BAD_REQUEST, 347 Json(json!({"error": "InvalidToken", "message": "token is required"})), 348 ) 349 .into_response(); 350 } 351 let user = sqlx::query!( 352 "SELECT id, password_hash, handle FROM users WHERE did = $1", 353 did 354 ) 355 .fetch_optional(&state.db) 356 .await; 357 let (user_id, password_hash, handle) = match user { 358 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 359 Ok(None) => { 360 return ( 361 StatusCode::BAD_REQUEST, 362 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 363 ) 364 .into_response(); 365 } 366 Err(e) => { 367 error!("DB error in delete_account: {:?}", e); 368 return ( 369 StatusCode::INTERNAL_SERVER_ERROR, 370 Json(json!({"error": "InternalError"})), 371 ) 372 .into_response(); 373 } 374 }; 375 let password_valid = if verify(password, &password_hash).unwrap_or(false) { 376 true 377 } else { 378 let app_pass_rows = sqlx::query!( 379 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 380 user_id 381 ) 382 .fetch_all(&state.db) 383 .await 384 .unwrap_or_default(); 385 app_pass_rows 386 .iter() 387 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 388 }; 389 if !password_valid { 390 return ( 391 StatusCode::UNAUTHORIZED, 392 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 393 ) 394 .into_response(); 395 } 396 let deletion_request = sqlx::query!( 397 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 398 token 399 ) 400 .fetch_optional(&state.db) 401 .await; 402 let (token_did, expires_at) = match deletion_request { 403 Ok(Some(row)) => (row.did, row.expires_at), 404 Ok(None) => { 405 return ( 406 StatusCode::BAD_REQUEST, 407 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 408 ) 409 .into_response(); 410 } 411 Err(e) => { 412 error!("DB error fetching deletion token: {:?}", e); 413 return ( 414 StatusCode::INTERNAL_SERVER_ERROR, 415 Json(json!({"error": "InternalError"})), 416 ) 417 .into_response(); 418 } 419 }; 420 if token_did != did { 421 return ( 422 StatusCode::BAD_REQUEST, 423 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 424 ) 425 .into_response(); 426 } 427 if Utc::now() > expires_at { 428 let _ = sqlx::query!( 429 "DELETE FROM account_deletion_requests WHERE token = $1", 430 token 431 ) 432 .execute(&state.db) 433 .await; 434 return ( 435 StatusCode::BAD_REQUEST, 436 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 437 ) 438 .into_response(); 439 } 440 let mut tx = match state.db.begin().await { 441 Ok(tx) => tx, 442 Err(e) => { 443 error!("Failed to begin transaction: {:?}", e); 444 return ( 445 StatusCode::INTERNAL_SERVER_ERROR, 446 Json(json!({"error": "InternalError"})), 447 ) 448 .into_response(); 449 } 450 }; 451 let deletion_result: Result<(), sqlx::Error> = async { 452 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 453 .execute(&mut *tx) 454 .await?; 455 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 456 .execute(&mut *tx) 457 .await?; 458 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 459 .execute(&mut *tx) 460 .await?; 461 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 462 .execute(&mut *tx) 463 .await?; 464 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 465 .execute(&mut *tx) 466 .await?; 467 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 468 .execute(&mut *tx) 469 .await?; 470 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 471 .execute(&mut *tx) 472 .await?; 473 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 474 .execute(&mut *tx) 475 .await?; 476 Ok(()) 477 } 478 .await; 479 match deletion_result { 480 Ok(()) => { 481 if let Err(e) = tx.commit().await { 482 error!("Failed to commit account deletion transaction: {:?}", e); 483 return ( 484 StatusCode::INTERNAL_SERVER_ERROR, 485 Json(json!({"error": "InternalError"})), 486 ) 487 .into_response(); 488 } 489 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 490 info!("Account {} deleted successfully", did); 491 (StatusCode::OK, Json(json!({}))).into_response() 492 } 493 Err(e) => { 494 error!("DB error deleting account, rolling back: {:?}", e); 495 ( 496 StatusCode::INTERNAL_SERVER_ERROR, 497 Json(json!({"error": "InternalError"})), 498 ) 499 .into_response() 500 } 501 } 502}