this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use bcrypt::verify; 10use chrono::{Duration, Utc}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14use uuid::Uuid; 15 16#[derive(Serialize)] 17#[serde(rename_all = "camelCase")] 18pub struct CheckAccountStatusOutput { 19 pub activated: bool, 20 pub valid_did: bool, 21 pub repo_commit: String, 22 pub repo_rev: String, 23 pub repo_blocks: i64, 24 pub indexed_records: i64, 25 pub private_state_values: i64, 26 pub expected_blobs: i64, 27 pub imported_blobs: i64, 28} 29 30pub async fn check_account_status( 31 State(state): State<AppState>, 32 headers: axum::http::HeaderMap, 33) -> Response { 34 let extracted = match crate::auth::extract_auth_token_from_header( 35 headers.get("Authorization").and_then(|h| h.to_str().ok()) 36 ) { 37 Some(t) => t, 38 None => return ApiError::AuthenticationRequired.into_response(), 39 }; 40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 41 let http_uri = format!("https://{}/xrpc/com.atproto.server.checkAccountStatus", 42 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())); 43 let did = match crate::auth::validate_token_with_dpop( 44 &state.db, 45 &extracted.token, 46 extracted.is_dpop, 47 dpop_proof, 48 "GET", 49 &http_uri, 50 true, 51 ).await { 52 Ok(user) => user.did, 53 Err(e) => return ApiError::from(e).into_response(), 54 }; 55 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 56 .fetch_optional(&state.db) 57 .await 58 { 59 Ok(Some(id)) => id, 60 _ => { 61 return ( 62 StatusCode::INTERNAL_SERVER_ERROR, 63 Json(json!({"error": "InternalError"})), 64 ) 65 .into_response(); 66 } 67 }; 68 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did) 69 .fetch_optional(&state.db) 70 .await; 71 let deactivated_at = match user_status { 72 Ok(Some(row)) => row.deactivated_at, 73 _ => None, 74 }; 75 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 76 .fetch_optional(&state.db) 77 .await; 78 let repo_commit = match repo_result { 79 Ok(Some(row)) => row.repo_root_cid, 80 _ => String::new(), 81 }; 82 let record_count: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) 83 .fetch_one(&state.db) 84 .await 85 .unwrap_or(Some(0)) 86 .unwrap_or(0); 87 let blob_count: i64 = 88 sqlx::query_scalar!("SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", user_id) 89 .fetch_one(&state.db) 90 .await 91 .unwrap_or(Some(0)) 92 .unwrap_or(0); 93 let valid_did = did.starts_with("did:"); 94 ( 95 StatusCode::OK, 96 Json(CheckAccountStatusOutput { 97 activated: deactivated_at.is_none(), 98 valid_did, 99 repo_commit: repo_commit.clone(), 100 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 101 repo_blocks: 0, 102 indexed_records: record_count, 103 private_state_values: 0, 104 expected_blobs: blob_count, 105 imported_blobs: blob_count, 106 }), 107 ) 108 .into_response() 109} 110 111pub async fn activate_account( 112 State(state): State<AppState>, 113 headers: axum::http::HeaderMap, 114) -> Response { 115 let extracted = match crate::auth::extract_auth_token_from_header( 116 headers.get("Authorization").and_then(|h| h.to_str().ok()) 117 ) { 118 Some(t) => t, 119 None => return ApiError::AuthenticationRequired.into_response(), 120 }; 121 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 122 let http_uri = format!("https://{}/xrpc/com.atproto.server.activateAccount", 123 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())); 124 let did = match crate::auth::validate_token_with_dpop( 125 &state.db, 126 &extracted.token, 127 extracted.is_dpop, 128 dpop_proof, 129 "POST", 130 &http_uri, 131 true, 132 ).await { 133 Ok(user) => user.did, 134 Err(e) => return ApiError::from(e).into_response(), 135 }; 136 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 137 .fetch_optional(&state.db) 138 .await 139 .ok() 140 .flatten(); 141 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 142 .execute(&state.db) 143 .await; 144 match result { 145 Ok(_) => { 146 if let Some(h) = handle { 147 let _ = state.cache.delete(&format!("handle:{}", h)).await; 148 } 149 (StatusCode::OK, Json(json!({}))).into_response() 150 } 151 Err(e) => { 152 error!("DB error activating account: {:?}", e); 153 ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError"})), 156 ) 157 .into_response() 158 } 159 } 160} 161 162#[derive(Deserialize)] 163#[serde(rename_all = "camelCase")] 164pub struct DeactivateAccountInput { 165 pub delete_after: Option<String>, 166} 167 168pub async fn deactivate_account( 169 State(state): State<AppState>, 170 headers: axum::http::HeaderMap, 171 Json(_input): Json<DeactivateAccountInput>, 172) -> Response { 173 let extracted = match crate::auth::extract_auth_token_from_header( 174 headers.get("Authorization").and_then(|h| h.to_str().ok()) 175 ) { 176 Some(t) => t, 177 None => return ApiError::AuthenticationRequired.into_response(), 178 }; 179 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 180 let http_uri = format!("https://{}/xrpc/com.atproto.server.deactivateAccount", 181 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())); 182 let did = match crate::auth::validate_token_with_dpop( 183 &state.db, 184 &extracted.token, 185 extracted.is_dpop, 186 dpop_proof, 187 "POST", 188 &http_uri, 189 false, 190 ).await { 191 Ok(user) => user.did, 192 Err(e) => return ApiError::from(e).into_response(), 193 }; 194 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 195 .fetch_optional(&state.db) 196 .await 197 .ok() 198 .flatten(); 199 let result = sqlx::query!("UPDATE users SET deactivated_at = NOW() WHERE did = $1", did) 200 .execute(&state.db) 201 .await; 202 match result { 203 Ok(_) => { 204 if let Some(h) = handle { 205 let _ = state.cache.delete(&format!("handle:{}", h)).await; 206 } 207 (StatusCode::OK, Json(json!({}))).into_response() 208 } 209 Err(e) => { 210 error!("DB error deactivating account: {:?}", e); 211 ( 212 StatusCode::INTERNAL_SERVER_ERROR, 213 Json(json!({"error": "InternalError"})), 214 ) 215 .into_response() 216 } 217 } 218} 219 220pub async fn request_account_delete( 221 State(state): State<AppState>, 222 headers: axum::http::HeaderMap, 223) -> Response { 224 let extracted = match crate::auth::extract_auth_token_from_header( 225 headers.get("Authorization").and_then(|h| h.to_str().ok()) 226 ) { 227 Some(t) => t, 228 None => return ApiError::AuthenticationRequired.into_response(), 229 }; 230 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 231 let http_uri = format!("https://{}/xrpc/com.atproto.server.requestAccountDelete", 232 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())); 233 let did = match crate::auth::validate_token_with_dpop( 234 &state.db, 235 &extracted.token, 236 extracted.is_dpop, 237 dpop_proof, 238 "POST", 239 &http_uri, 240 true, 241 ).await { 242 Ok(user) => user.did, 243 Err(e) => return ApiError::from(e).into_response(), 244 }; 245 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 246 .fetch_optional(&state.db) 247 .await 248 { 249 Ok(Some(id)) => id, 250 _ => { 251 return ( 252 StatusCode::INTERNAL_SERVER_ERROR, 253 Json(json!({"error": "InternalError"})), 254 ) 255 .into_response(); 256 } 257 }; 258 let confirmation_token = Uuid::new_v4().to_string(); 259 let expires_at = Utc::now() + Duration::minutes(15); 260 let insert = sqlx::query!( 261 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)", 262 confirmation_token, 263 did, 264 expires_at 265 ) 266 .execute(&state.db) 267 .await; 268 if let Err(e) = insert { 269 error!("DB error creating deletion token: {:?}", e); 270 return ( 271 StatusCode::INTERNAL_SERVER_ERROR, 272 Json(json!({"error": "InternalError"})), 273 ) 274 .into_response(); 275 } 276 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 277 if let Err(e) = 278 crate::notifications::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname).await 279 { 280 warn!("Failed to enqueue account deletion notification: {:?}", e); 281 } 282 info!("Account deletion requested for user {}", did); 283 (StatusCode::OK, Json(json!({}))).into_response() 284} 285 286#[derive(Deserialize)] 287pub struct DeleteAccountInput { 288 pub did: String, 289 pub password: String, 290 pub token: String, 291} 292 293pub async fn delete_account( 294 State(state): State<AppState>, 295 Json(input): Json<DeleteAccountInput>, 296) -> Response { 297 let did = input.did.trim(); 298 let password = &input.password; 299 let token = input.token.trim(); 300 if did.is_empty() { 301 return ( 302 StatusCode::BAD_REQUEST, 303 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 304 ) 305 .into_response(); 306 } 307 if password.is_empty() { 308 return ( 309 StatusCode::BAD_REQUEST, 310 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 311 ) 312 .into_response(); 313 } 314 if token.is_empty() { 315 return ( 316 StatusCode::BAD_REQUEST, 317 Json(json!({"error": "InvalidToken", "message": "token is required"})), 318 ) 319 .into_response(); 320 } 321 let user = sqlx::query!( 322 "SELECT id, password_hash, handle FROM users WHERE did = $1", 323 did 324 ) 325 .fetch_optional(&state.db) 326 .await; 327 let (user_id, password_hash, handle) = match user { 328 Ok(Some(row)) => (row.id, row.password_hash, row.handle), 329 Ok(None) => { 330 return ( 331 StatusCode::BAD_REQUEST, 332 Json(json!({"error": "AccountNotFound", "message": "Account not found"})), 333 ) 334 .into_response(); 335 } 336 Err(e) => { 337 error!("DB error in delete_account: {:?}", e); 338 return ( 339 StatusCode::INTERNAL_SERVER_ERROR, 340 Json(json!({"error": "InternalError"})), 341 ) 342 .into_response(); 343 } 344 }; 345 let password_valid = if verify(password, &password_hash).unwrap_or(false) { 346 true 347 } else { 348 let app_pass_rows = sqlx::query!( 349 "SELECT password_hash FROM app_passwords WHERE user_id = $1", 350 user_id 351 ) 352 .fetch_all(&state.db) 353 .await 354 .unwrap_or_default(); 355 app_pass_rows 356 .iter() 357 .any(|row| verify(password, &row.password_hash).unwrap_or(false)) 358 }; 359 if !password_valid { 360 return ( 361 StatusCode::UNAUTHORIZED, 362 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})), 363 ) 364 .into_response(); 365 } 366 let deletion_request = sqlx::query!( 367 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1", 368 token 369 ) 370 .fetch_optional(&state.db) 371 .await; 372 let (token_did, expires_at) = match deletion_request { 373 Ok(Some(row)) => (row.did, row.expires_at), 374 Ok(None) => { 375 return ( 376 StatusCode::BAD_REQUEST, 377 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})), 378 ) 379 .into_response(); 380 } 381 Err(e) => { 382 error!("DB error fetching deletion token: {:?}", e); 383 return ( 384 StatusCode::INTERNAL_SERVER_ERROR, 385 Json(json!({"error": "InternalError"})), 386 ) 387 .into_response(); 388 } 389 }; 390 if token_did != did { 391 return ( 392 StatusCode::BAD_REQUEST, 393 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})), 394 ) 395 .into_response(); 396 } 397 if Utc::now() > expires_at { 398 let _ = sqlx::query!("DELETE FROM account_deletion_requests WHERE token = $1", token) 399 .execute(&state.db) 400 .await; 401 return ( 402 StatusCode::BAD_REQUEST, 403 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})), 404 ) 405 .into_response(); 406 } 407 let mut tx = match state.db.begin().await { 408 Ok(tx) => tx, 409 Err(e) => { 410 error!("Failed to begin transaction: {:?}", e); 411 return ( 412 StatusCode::INTERNAL_SERVER_ERROR, 413 Json(json!({"error": "InternalError"})), 414 ) 415 .into_response(); 416 } 417 }; 418 let deletion_result: Result<(), sqlx::Error> = async { 419 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did) 420 .execute(&mut *tx) 421 .await?; 422 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 423 .execute(&mut *tx) 424 .await?; 425 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id) 426 .execute(&mut *tx) 427 .await?; 428 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 429 .execute(&mut *tx) 430 .await?; 431 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id) 432 .execute(&mut *tx) 433 .await?; 434 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id) 435 .execute(&mut *tx) 436 .await?; 437 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did) 438 .execute(&mut *tx) 439 .await?; 440 sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 441 .execute(&mut *tx) 442 .await?; 443 Ok(()) 444 } 445 .await; 446 match deletion_result { 447 Ok(()) => { 448 if let Err(e) = tx.commit().await { 449 error!("Failed to commit account deletion transaction: {:?}", e); 450 return ( 451 StatusCode::INTERNAL_SERVER_ERROR, 452 Json(json!({"error": "InternalError"})), 453 ) 454 .into_response(); 455 } 456 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 457 info!("Account {} deleted successfully", did); 458 (StatusCode::OK, Json(json!({}))).into_response() 459 } 460 Err(e) => { 461 error!("DB error deleting account, rolling back: {:?}", e); 462 ( 463 StatusCode::INTERNAL_SERVER_ERROR, 464 Json(json!({"error": "InternalError"})), 465 ) 466 .into_response() 467 } 468 } 469}