this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use bcrypt::verify; 10use chrono::{Duration, Utc}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14use uuid::Uuid; 15 16#[derive(Serialize)] 17#[serde(rename_all = "camelCase")] 18pub struct CheckAccountStatusOutput { 19 pub activated: bool, 20 pub valid_did: bool, 21 pub repo_commit: String, 22 pub repo_rev: String, 23 pub repo_blocks: i64, 24 pub indexed_records: i64, 25 pub private_state_values: i64, 26 pub expected_blobs: i64, 27 pub imported_blobs: i64, 28} 29 30pub async fn check_account_status( 31 State(state): State<AppState>, 32 headers: axum::http::HeaderMap, 33) -> Response { 34 let extracted = match crate::auth::extract_auth_token_from_header( 35 headers.get("Authorization").and_then(|h| h.to_str().ok()), 36 ) { 37 Some(t) => t, 38 None => return ApiError::AuthenticationRequired.into_response(), 39 }; 40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 41 let http_uri = format!( 42 "https://{}/xrpc/com.atproto.server.checkAccountStatus", 43 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 44 ); 45 let did = match crate::auth::validate_token_with_dpop( 46 &state.db, 47 &extracted.token, 48 extracted.is_dpop, 49 dpop_proof, 50 "GET", 51 &http_uri, 52 true, 53 ) 54 .await 55 { 56 Ok(user) => user.did, 57 Err(e) => return ApiError::from(e).into_response(), 58 }; 59 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 60 .fetch_optional(&state.db) 61 .await 62 { 63 Ok(Some(id)) => id, 64 _ => { 65 return ( 66 StatusCode::INTERNAL_SERVER_ERROR, 67 Json(json!({"error": "InternalError"})), 68 ) 69 .into_response(); 70 } 71 }; 72 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 73 .fetch_optional(&state.db) 74 .await; 75 let deactivated_at = match user_status { 76 Ok(Some(row)) => row.deactivated_at, 77 _ => None, 78 }; 79 let repo_result = sqlx::query!( 80 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 81 user_id 82 ) 83 .fetch_optional(&state.db) 84 .await; 85 let repo_commit = match repo_result { 86 Ok(Some(row)) => row.repo_root_cid, 87 _ => String::new(), 88 }; 89 let record_count: i64 = 90 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 91 .fetch_one(&state.db) 92 .await 93 .unwrap_or(Some(0)) 94 .unwrap_or(0); 95 let blob_count: i64 = sqlx::query_scalar!( 96 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", 97 user_id 98 ) 99 .fetch_one(&state.db) 100 .await 101 .unwrap_or(Some(0)) 102 .unwrap_or(0); 103 let valid_did = did.starts_with("did:"); 104 ( 105 StatusCode::OK, 106 Json(CheckAccountStatusOutput { 107 activated: deactivated_at.is_none(), 108 valid_did, 109 repo_commit: repo_commit.clone(), 110 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 111 repo_blocks: 0, 112 indexed_records: record_count, 113 private_state_values: 0, 114 expected_blobs: blob_count, 115 imported_blobs: blob_count, 116 }), 117 ) 118 .into_response() 119} 120 121pub async fn activate_account( 122 State(state): State<AppState>, 123 headers: axum::http::HeaderMap, 124) -> Response { 125 let extracted = match crate::auth::extract_auth_token_from_header( 126 headers.get("Authorization").and_then(|h| h.to_str().ok()), 127 ) { 128 Some(t) => t, 129 None => return ApiError::AuthenticationRequired.into_response(), 130 }; 131 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 132 let http_uri = format!( 133 "https://{}/xrpc/com.atproto.server.activateAccount", 134 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 135 ); 136 let did = match crate::auth::validate_token_with_dpop( 137 &state.db, 138 &extracted.token, 139 extracted.is_dpop, 140 dpop_proof, 141 "POST", 142 &http_uri, 143 true, 144 ) 145 .await 146 { 147 Ok(user) => user.did, 148 Err(e) => return ApiError::from(e).into_response(), 149 }; 150 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 151 .fetch_optional(&state.db) 152 .await 153 .ok() 154 .flatten(); 155 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 156 .execute(&state.db) 157 .await; 158 match result { 159 Ok(_) => { 160 if let Some(ref h) = handle { 161 let _ = state.cache.delete(&format!("handle:{}", h)).await; 162 } 163 if let Err(e) = 164 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 165 { 166 warn!("Failed to sequence account activation event: {}", e); 167 } 168 if let Err(e) = 169 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref()) 170 .await 171 { 172 warn!("Failed to sequence identity event for activation: {}", e); 173 } 174 (StatusCode::OK, Json(json!({}))).into_response() 175 } 176 Err(e) => { 177 error!("DB error activating account: {:?}", e); 178 ( 179 StatusCode::INTERNAL_SERVER_ERROR, 180 Json(json!({"error": "InternalError"})), 181 ) 182 .into_response() 183 } 184 } 185} 186 187#[derive(Deserialize)] 188#[serde(rename_all = "camelCase")] 189pub struct DeactivateAccountInput { 190 pub delete_after: Option<String>, 191} 192 193pub async fn deactivate_account( 194 State(state): State<AppState>, 195 headers: axum::http::HeaderMap, 196 Json(_input): Json<DeactivateAccountInput>, 197) -> Response { 198 let extracted = match crate::auth::extract_auth_token_from_header( 199 headers.get("Authorization").and_then(|h| h.to_str().ok()), 200 ) { 201 Some(t) => t, 202 None => return ApiError::AuthenticationRequired.into_response(), 203 }; 204 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 205 let http_uri = format!( 206 "https://{}/xrpc/com.atproto.server.deactivateAccount", 207 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 208 ); 209 let did = match crate::auth::validate_token_with_dpop( 210 &state.db, 211 &extracted.token, 212 extracted.is_dpop, 213 dpop_proof, 214 "POST", 215 &http_uri, 216 false, 217 ) 218 .await 219 { 220 Ok(user) => user.did, 221 Err(e) => return ApiError::from(e).into_response(), 222 }; 223 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 224 .fetch_optional(&state.db) 225 .await 226 .ok() 227 .flatten(); 228 let result = sqlx::query!( 229 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 230 did 231 ) 232 .execute(&state.db) 233 .await; 234 match result { 235 Ok(_) => { 236 if let Some(ref h) = handle { 237 let _ = state.cache.delete(&format!("handle:{}", h)).await; 238 } 239 if let Err(e) = 240 crate::api::repo::record::sequence_account_event(&state, &did, false, Some("deactivated")).await 241 { 242 warn!("Failed to sequence account deactivation event: {}", e); 243 } 244 (StatusCode::OK, Json(json!({}))).into_response() 245 } 246 Err(e) => { 247 error!("DB error deactivating account: {:?}", e); 248 ( 249 StatusCode::INTERNAL_SERVER_ERROR, 250 Json(json!({"error": "InternalError"})), 251 ) 252 .into_response() 253 } 254 } 255} 256 257pub async fn request_account_delete( 258 State(state): State<AppState>, 259 headers: axum::http::HeaderMap, 260) -> Response { 261 let extracted = match crate::auth::extract_auth_token_from_header( 262 headers.get("Authorization").and_then(|h| h.to_str().ok()), 263 ) { 264 Some(t) => t, 265 None => return ApiError::AuthenticationRequired.into_response(), 266 }; 267 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 268 let http_uri = format!( 269 "https://{}/xrpc/com.atproto.server.requestAccountDelete", 270 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 271 ); 272 let did = match crate::auth::validate_token_with_dpop( 273 &state.db, 274 &extracted.token, 275 extracted.is_dpop, 276 dpop_proof, 277 "POST", 278 &http_uri, 279 true, 280 ) 281 .await 282 { 283 Ok(user) => user.did, 284 Err(e) => return ApiError::from(e).into_response(), 285 }; 286 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 287 .fetch_optional(&state.db) 288 .await 289 { 290 Ok(Some(id)) => id, 291 _ => { 292 return ( 293 StatusCode::INTERNAL_SERVER_ERROR, 294 Json(json!({"error": "InternalError"})), 295 ) 296 .into_response(); 297 } 298 }; 299 let confirmation_token = Uuid::new_v4().to_string(); 300 let expires_at = Utc::now() + Duration::minutes(15); 301 let insert = sqlx::query!( 302 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 303 confirmation_token, 304 did, 305 expires_at 306 ) 307 .execute(&state.db) 308 .await; 309 if let Err(e) = insert { 310 error!("DB error creating deletion token: {:?}", e); 311 return ( 312 StatusCode::INTERNAL_SERVER_ERROR, 313 Json(json!({"error": "InternalError"})), 314 ) 315 .into_response(); 316 } 317 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 318 if let Err(e) = crate::comms::enqueue_account_deletion( 319 &state.db, 320 user_id, 321 &confirmation_token, 322 &hostname, 323 ) 324 .await 325 { 326 warn!("Failed to enqueue account deletion notification: {:?}", e); 327 } 328 info!("Account deletion requested for user {}", did); 329 (StatusCode::OK, Json(json!({}))).into_response() 330} 331 332#[derive(Deserialize)] 333pub struct DeleteAccountInput { 334 pub did: String, 335 pub password: String, 336 pub token: String, 337} 338 339pub async fn delete_account( 340 State(state): State<AppState>, 341 Json(input): Json<DeleteAccountInput>, 342) -> Response { 343 let did = input.did.trim(); 344 let password = &input.password; 345 let token = input.token.trim(); 346 if did.is_empty() { 347 return ( 348 StatusCode::BAD_REQUEST, 349 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 350 ) 351 .into_response(); 352 } 353 if password.is_empty() { 354 return ( 355 StatusCode::BAD_REQUEST, 356 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 357 ) 358 .into_response(); 359 } 360 if token.is_empty() { 361 return ( 362 StatusCode::BAD_REQUEST, 363 Json(json!({"error": "InvalidToken", "message": "token is required"})), 364 ) 365 .into_response(); 366 } 367 let user = sqlx::query!( 368 "SELECT id, password_hash, handle FROM users WHERE did = $1", 369 did 370 ) 371 .fetch_optional(&state.db) 372 .await; 373 let (user_id, password_hash, handle) = match user { 374 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 375 Ok(None) => { 376 return ( 377 StatusCode::BAD_REQUEST, 378 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 379 ) 380 .into_response(); 381 } 382 Err(e) => { 383 error!("DB error in delete_account: {:?}", e); 384 return ( 385 StatusCode::INTERNAL_SERVER_ERROR, 386 Json(json!({"error": "InternalError"})), 387 ) 388 .into_response(); 389 } 390 }; 391 let password_valid = if verify(password, &password_hash).unwrap_or(false) { 392 true 393 } else { 394 let app_pass_rows = sqlx::query!( 395 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 396 user_id 397 ) 398 .fetch_all(&state.db) 399 .await 400 .unwrap_or_default(); 401 app_pass_rows 402 .iter() 403 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 404 }; 405 if !password_valid { 406 return ( 407 StatusCode::UNAUTHORIZED, 408 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 409 ) 410 .into_response(); 411 } 412 let deletion_request = sqlx::query!( 413 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 414 token 415 ) 416 .fetch_optional(&state.db) 417 .await; 418 let (token_did, expires_at) = match deletion_request { 419 Ok(Some(row)) => (row.did, row.expires_at), 420 Ok(None) => { 421 return ( 422 StatusCode::BAD_REQUEST, 423 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 424 ) 425 .into_response(); 426 } 427 Err(e) => { 428 error!("DB error fetching deletion token: {:?}", e); 429 return ( 430 StatusCode::INTERNAL_SERVER_ERROR, 431 Json(json!({"error": "InternalError"})), 432 ) 433 .into_response(); 434 } 435 }; 436 if token_did != did { 437 return ( 438 StatusCode::BAD_REQUEST, 439 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 440 ) 441 .into_response(); 442 } 443 if Utc::now() > expires_at { 444 let _ = sqlx::query!( 445 "DELETE FROM account_deletion_requests WHERE token = $1", 446 token 447 ) 448 .execute(&state.db) 449 .await; 450 return ( 451 StatusCode::BAD_REQUEST, 452 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 453 ) 454 .into_response(); 455 } 456 let mut tx = match state.db.begin().await { 457 Ok(tx) => tx, 458 Err(e) => { 459 error!("Failed to begin transaction: {:?}", e); 460 return ( 461 StatusCode::INTERNAL_SERVER_ERROR, 462 Json(json!({"error": "InternalError"})), 463 ) 464 .into_response(); 465 } 466 }; 467 let deletion_result: Result<(), sqlx::Error> = async { 468 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 469 .execute(&mut *tx) 470 .await?; 471 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 472 .execute(&mut *tx) 473 .await?; 474 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 475 .execute(&mut *tx) 476 .await?; 477 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 478 .execute(&mut *tx) 479 .await?; 480 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 481 .execute(&mut *tx) 482 .await?; 483 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 484 .execute(&mut *tx) 485 .await?; 486 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 487 .execute(&mut *tx) 488 .await?; 489 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 490 .execute(&mut *tx) 491 .await?; 492 Ok(()) 493 } 494 .await; 495 match deletion_result { 496 Ok(()) => { 497 if let Err(e) = tx.commit().await { 498 error!("Failed to commit account deletion transaction: {:?}", e); 499 return ( 500 StatusCode::INTERNAL_SERVER_ERROR, 501 Json(json!({"error": "InternalError"})), 502 ) 503 .into_response(); 504 } 505 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 506 info!("Account {} deleted successfully", did); 507 (StatusCode::OK, Json(json!({}))).into_response() 508 } 509 Err(e) => { 510 error!("DB error deleting account, rolling back: {:?}", e); 511 ( 512 StatusCode::INTERNAL_SERVER_ERROR, 513 Json(json!({"error": "InternalError"})), 514 ) 515 .into_response() 516 } 517 } 518}