this repo has no description
1use crate::api::ApiError; 2use crate::auth::BearerAuth; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") { 18 if let Ok(value) = forwarded.to_str() { 19 if let Some(first_ip) = value.split(',').next() { 20 return first_ip.trim().to_string(); 21 } 22 } 23 } 24 if let Some(real_ip) = headers.get("x-real-ip") { 25 if let Ok(value) = real_ip.to_str() { 26 return value.trim().to_string(); 27 } 28 } 29 "unknown".to_string() 30} 31 32#[derive(Deserialize)] 33pub struct CreateSessionInput { 34 pub identifier: String, 35 pub password: String, 36} 37 38#[derive(Serialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateSessionOutput { 41 pub access_jwt: String, 42 pub refresh_jwt: String, 43 pub handle: String, 44 pub did: String, 45} 46 47pub async fn create_session( 48 State(state): State<AppState>, 49 headers: HeaderMap, 50 Json(input): Json<CreateSessionInput>, 51) -> Response { 52 info!("create_session called"); 53 54 let client_ip = extract_client_ip(&headers); 55 if state.rate_limiters.login.check_key(&client_ip).is_err() { 56 warn!(ip = %client_ip, "Login rate limit exceeded"); 57 return ( 58 StatusCode::TOO_MANY_REQUESTS, 59 Json(json!({ 60 "error": "RateLimitExceeded", 61 "message": "Too many login attempts. Please try again later." 62 })), 63 ) 64 .into_response(); 65 } 66 67 let row = match sqlx::query!( 68 r#"SELECT 69 u.id, u.did, u.handle, u.password_hash, 70 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified, 71 k.key_bytes, k.encryption_version 72 FROM users u 73 JOIN user_keys k ON u.id = k.user_id 74 WHERE u.handle = $1 OR u.email = $1"#, 75 input.identifier 76 ) 77 .fetch_optional(&state.db) 78 .await 79 { 80 Ok(Some(row)) => row, 81 Ok(None) => { 82 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK"); 83 warn!("User not found for login attempt"); 84 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 85 } 86 Err(e) => { 87 error!("Database error fetching user: {:?}", e); 88 return ApiError::InternalError.into_response(); 89 } 90 }; 91 92 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 93 Ok(k) => k, 94 Err(e) => { 95 error!("Failed to decrypt user key: {:?}", e); 96 return ApiError::InternalError.into_response(); 97 } 98 }; 99 100 let password_valid = verify(&input.password, &row.password_hash).unwrap_or(false) 101 || sqlx::query!("SELECT password_hash FROM app_passwords WHERE user_id = $1", row.id) 102 .fetch_all(&state.db) 103 .await 104 .unwrap_or_default() 105 .iter() 106 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 107 108 if !password_valid { 109 warn!("Password verification failed for login attempt"); 110 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 111 } 112 113 let is_verified = row.email_confirmed 114 || row.discord_verified 115 || row.telegram_verified 116 || row.signal_verified; 117 118 if !is_verified { 119 warn!("Login attempt for unverified account: {}", row.did); 120 return ( 121 StatusCode::FORBIDDEN, 122 Json(json!({ 123 "error": "AccountNotVerified", 124 "message": "Please verify your account before logging in", 125 "did": row.did 126 })), 127 ).into_response(); 128 } 129 130 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 131 Ok(m) => m, 132 Err(e) => { 133 error!("Failed to create access token: {:?}", e); 134 return ApiError::InternalError.into_response(); 135 } 136 }; 137 138 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 139 Ok(m) => m, 140 Err(e) => { 141 error!("Failed to create refresh token: {:?}", e); 142 return ApiError::InternalError.into_response(); 143 } 144 }; 145 146 if let Err(e) = sqlx::query!( 147 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 148 row.did, 149 access_meta.jti, 150 refresh_meta.jti, 151 access_meta.expires_at, 152 refresh_meta.expires_at 153 ) 154 .execute(&state.db) 155 .await 156 { 157 error!("Failed to insert session: {:?}", e); 158 return ApiError::InternalError.into_response(); 159 } 160 161 Json(CreateSessionOutput { 162 access_jwt: access_meta.token, 163 refresh_jwt: refresh_meta.token, 164 handle: row.handle, 165 did: row.did, 166 }).into_response() 167} 168 169pub async fn get_session( 170 State(state): State<AppState>, 171 BearerAuth(auth_user): BearerAuth, 172) -> Response { 173 match sqlx::query!("SELECT handle, email FROM users WHERE did = $1", auth_user.did) 174 .fetch_optional(&state.db) 175 .await 176 { 177 Ok(Some(row)) => Json(json!({ 178 "handle": row.handle, 179 "did": auth_user.did, 180 "email": row.email, 181 "didDoc": {} 182 })).into_response(), 183 Ok(None) => ApiError::AuthenticationFailed.into_response(), 184 Err(e) => { 185 error!("Database error in get_session: {:?}", e); 186 ApiError::InternalError.into_response() 187 } 188 } 189} 190 191pub async fn delete_session( 192 State(state): State<AppState>, 193 headers: axum::http::HeaderMap, 194) -> Response { 195 let token = match crate::auth::extract_bearer_token_from_header( 196 headers.get("Authorization").and_then(|h| h.to_str().ok()) 197 ) { 198 Some(t) => t, 199 None => return ApiError::AuthenticationRequired.into_response(), 200 }; 201 202 let jti = match crate::auth::get_jti_from_token(&token) { 203 Ok(jti) => jti, 204 Err(_) => return ApiError::AuthenticationFailed.into_response(), 205 }; 206 207 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 208 .execute(&state.db) 209 .await 210 { 211 Ok(res) if res.rows_affected() > 0 => Json(json!({})).into_response(), 212 Ok(_) => ApiError::AuthenticationFailed.into_response(), 213 Err(e) => { 214 error!("Database error in delete_session: {:?}", e); 215 ApiError::AuthenticationFailed.into_response() 216 } 217 } 218} 219 220pub async fn refresh_session( 221 State(state): State<AppState>, 222 headers: axum::http::HeaderMap, 223) -> Response { 224 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 225 if !state.distributed_rate_limiter.check_rate_limit( 226 &format!("refresh_session:{}", client_ip), 227 60, 228 60_000, 229 ).await { 230 if state.rate_limiters.refresh_session.check_key(&client_ip).is_err() { 231 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 232 return ( 233 axum::http::StatusCode::TOO_MANY_REQUESTS, 234 axum::Json(serde_json::json!({ 235 "error": "RateLimitExceeded", 236 "message": "Too many requests. Please try again later." 237 })), 238 ).into_response(); 239 } 240 } 241 242 let refresh_token = match crate::auth::extract_bearer_token_from_header( 243 headers.get("Authorization").and_then(|h| h.to_str().ok()) 244 ) { 245 Some(t) => t, 246 None => return ApiError::AuthenticationRequired.into_response(), 247 }; 248 249 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 250 Ok(jti) => jti, 251 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(), 252 }; 253 254 let mut tx = match state.db.begin().await { 255 Ok(tx) => tx, 256 Err(e) => { 257 error!("Failed to begin transaction: {:?}", e); 258 return ApiError::InternalError.into_response(); 259 } 260 }; 261 262 if let Ok(Some(session_id)) = sqlx::query_scalar!( 263 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 264 refresh_jti 265 ) 266 .fetch_optional(&mut *tx) 267 .await 268 { 269 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id); 270 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 271 .execute(&mut *tx) 272 .await; 273 let _ = tx.commit().await; 274 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 275 } 276 277 let session_row = match sqlx::query!( 278 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 279 FROM session_tokens st 280 JOIN users u ON st.did = u.did 281 JOIN user_keys k ON u.id = k.user_id 282 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 283 FOR UPDATE OF st"#, 284 refresh_jti 285 ) 286 .fetch_optional(&mut *tx) 287 .await 288 { 289 Ok(Some(row)) => row, 290 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(), 291 Err(e) => { 292 error!("Database error fetching session: {:?}", e); 293 return ApiError::InternalError.into_response(); 294 } 295 }; 296 297 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 298 Ok(k) => k, 299 Err(e) => { 300 error!("Failed to decrypt user key: {:?}", e); 301 return ApiError::InternalError.into_response(); 302 } 303 }; 304 305 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 306 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 307 } 308 309 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 310 Ok(m) => m, 311 Err(e) => { 312 error!("Failed to create access token: {:?}", e); 313 return ApiError::InternalError.into_response(); 314 } 315 }; 316 317 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 318 Ok(m) => m, 319 Err(e) => { 320 error!("Failed to create refresh token: {:?}", e); 321 return ApiError::InternalError.into_response(); 322 } 323 }; 324 325 match sqlx::query!( 326 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 327 refresh_jti, 328 session_row.id 329 ) 330 .execute(&mut *tx) 331 .await 332 { 333 Ok(result) if result.rows_affected() == 0 => { 334 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 335 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 336 .execute(&mut *tx) 337 .await; 338 let _ = tx.commit().await; 339 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 340 } 341 Err(e) => { 342 error!("Failed to record used refresh token: {:?}", e); 343 return ApiError::InternalError.into_response(); 344 } 345 Ok(_) => {} 346 } 347 348 if let Err(e) = sqlx::query!( 349 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 350 new_access_meta.jti, 351 new_refresh_meta.jti, 352 new_access_meta.expires_at, 353 new_refresh_meta.expires_at, 354 session_row.id 355 ) 356 .execute(&mut *tx) 357 .await 358 { 359 error!("Database error updating session: {:?}", e); 360 return ApiError::InternalError.into_response(); 361 } 362 363 if let Err(e) = tx.commit().await { 364 error!("Failed to commit transaction: {:?}", e); 365 return ApiError::InternalError.into_response(); 366 } 367 368 match sqlx::query!("SELECT handle FROM users WHERE did = $1", session_row.did) 369 .fetch_optional(&state.db) 370 .await 371 { 372 Ok(Some(u)) => Json(json!({ 373 "accessJwt": new_access_meta.token, 374 "refreshJwt": new_refresh_meta.token, 375 "handle": u.handle, 376 "did": session_row.did 377 })).into_response(), 378 Ok(None) => { 379 error!("User not found for existing session: {}", session_row.did); 380 ApiError::InternalError.into_response() 381 } 382 Err(e) => { 383 error!("Database error fetching user: {:?}", e); 384 ApiError::InternalError.into_response() 385 } 386 } 387} 388 389#[derive(Deserialize)] 390#[serde(rename_all = "camelCase")] 391pub struct ConfirmSignupInput { 392 pub did: String, 393 pub verification_code: String, 394} 395 396#[derive(Serialize)] 397#[serde(rename_all = "camelCase")] 398pub struct ConfirmSignupOutput { 399 pub access_jwt: String, 400 pub refresh_jwt: String, 401 pub handle: String, 402 pub did: String, 403} 404 405pub async fn confirm_signup( 406 State(state): State<AppState>, 407 Json(input): Json<ConfirmSignupInput>, 408) -> Response { 409 info!("confirm_signup called for DID: {}", input.did); 410 411 let row = match sqlx::query!( 412 r#"SELECT 413 u.id, u.did, u.handle, 414 u.email_confirmation_code, 415 u.email_confirmation_code_expires_at, 416 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 417 k.key_bytes, k.encryption_version 418 FROM users u 419 JOIN user_keys k ON u.id = k.user_id 420 WHERE u.did = $1"#, 421 input.did 422 ) 423 .fetch_optional(&state.db) 424 .await 425 { 426 Ok(Some(row)) => row, 427 Ok(None) => { 428 warn!("User not found for confirm_signup: {}", input.did); 429 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response(); 430 } 431 Err(e) => { 432 error!("Database error in confirm_signup: {:?}", e); 433 return ApiError::InternalError.into_response(); 434 } 435 }; 436 437 let stored_code = match &row.email_confirmation_code { 438 Some(code) => code, 439 None => { 440 warn!("No verification code found for user: {}", input.did); 441 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 442 } 443 }; 444 445 if stored_code != &input.verification_code { 446 warn!("Invalid verification code for user: {}", input.did); 447 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 448 } 449 450 if let Some(expires_at) = row.email_confirmation_code_expires_at { 451 if expires_at < Utc::now() { 452 warn!("Verification code expired for user: {}", input.did); 453 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 454 } 455 } 456 457 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 458 Ok(k) => k, 459 Err(e) => { 460 error!("Failed to decrypt user key: {:?}", e); 461 return ApiError::InternalError.into_response(); 462 } 463 }; 464 465 let verified_column = match row.channel { 466 crate::notifications::NotificationChannel::Email => "email_confirmed", 467 crate::notifications::NotificationChannel::Discord => "discord_verified", 468 crate::notifications::NotificationChannel::Telegram => "telegram_verified", 469 crate::notifications::NotificationChannel::Signal => "signal_verified", 470 }; 471 472 let update_query = format!( 473 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1", 474 verified_column 475 ); 476 477 if let Err(e) = sqlx::query(&update_query) 478 .bind(&input.did) 479 .execute(&state.db) 480 .await 481 { 482 error!("Failed to update verification status: {:?}", e); 483 return ApiError::InternalError.into_response(); 484 } 485 486 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 487 Ok(m) => m, 488 Err(e) => { 489 error!("Failed to create access token: {:?}", e); 490 return ApiError::InternalError.into_response(); 491 } 492 }; 493 494 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 495 Ok(m) => m, 496 Err(e) => { 497 error!("Failed to create refresh token: {:?}", e); 498 return ApiError::InternalError.into_response(); 499 } 500 }; 501 502 if let Err(e) = sqlx::query!( 503 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 504 row.did, 505 access_meta.jti, 506 refresh_meta.jti, 507 access_meta.expires_at, 508 refresh_meta.expires_at 509 ) 510 .execute(&state.db) 511 .await 512 { 513 error!("Failed to insert session: {:?}", e); 514 return ApiError::InternalError.into_response(); 515 } 516 517 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 518 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await { 519 warn!("Failed to enqueue welcome notification: {:?}", e); 520 } 521 522 Json(ConfirmSignupOutput { 523 access_jwt: access_meta.token, 524 refresh_jwt: refresh_meta.token, 525 handle: row.handle, 526 did: row.did, 527 }).into_response() 528} 529 530#[derive(Deserialize)] 531#[serde(rename_all = "camelCase")] 532pub struct ResendVerificationInput { 533 pub did: String, 534} 535 536pub async fn resend_verification( 537 State(state): State<AppState>, 538 Json(input): Json<ResendVerificationInput>, 539) -> Response { 540 info!("resend_verification called for DID: {}", input.did); 541 542 let row = match sqlx::query!( 543 r#"SELECT 544 id, handle, email, 545 preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 546 discord_id, telegram_username, signal_number, 547 email_confirmed, discord_verified, telegram_verified, signal_verified 548 FROM users 549 WHERE did = $1"#, 550 input.did 551 ) 552 .fetch_optional(&state.db) 553 .await 554 { 555 Ok(Some(row)) => row, 556 Ok(None) => { 557 return ApiError::InvalidRequest("User not found".into()).into_response(); 558 } 559 Err(e) => { 560 error!("Database error in resend_verification: {:?}", e); 561 return ApiError::InternalError.into_response(); 562 } 563 }; 564 565 let is_verified = row.email_confirmed 566 || row.discord_verified 567 || row.telegram_verified 568 || row.signal_verified; 569 570 if is_verified { 571 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 572 } 573 574 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 575 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 576 577 if let Err(e) = sqlx::query!( 578 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3", 579 verification_code, 580 code_expires_at, 581 input.did 582 ) 583 .execute(&state.db) 584 .await 585 { 586 error!("Failed to update verification code: {:?}", e); 587 return ApiError::InternalError.into_response(); 588 } 589 590 let (channel_str, recipient) = match row.channel { 591 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()), 592 crate::notifications::NotificationChannel::Discord => { 593 ("discord", row.discord_id.unwrap_or_default()) 594 } 595 crate::notifications::NotificationChannel::Telegram => { 596 ("telegram", row.telegram_username.unwrap_or_default()) 597 } 598 crate::notifications::NotificationChannel::Signal => { 599 ("signal", row.signal_number.unwrap_or_default()) 600 } 601 }; 602 603 if let Err(e) = crate::notifications::enqueue_signup_verification( 604 &state.db, 605 row.id, 606 channel_str, 607 &recipient, 608 &verification_code, 609 ).await { 610 warn!("Failed to enqueue verification notification: {:?}", e); 611 } 612 613 Json(json!({"success": true})).into_response() 614}