this repo has no description
1use crate::api::ApiError; 2use crate::auth::BearerAuth; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") { 18 if let Ok(value) = forwarded.to_str() { 19 if let Some(first_ip) = value.split(',').next() { 20 return first_ip.trim().to_string(); 21 } 22 } 23 } 24 if let Some(real_ip) = headers.get("x-real-ip") { 25 if let Ok(value) = real_ip.to_str() { 26 return value.trim().to_string(); 27 } 28 } 29 "unknown".to_string() 30} 31 32#[derive(Deserialize)] 33pub struct CreateSessionInput { 34 pub identifier: String, 35 pub password: String, 36} 37 38#[derive(Serialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateSessionOutput { 41 pub access_jwt: String, 42 pub refresh_jwt: String, 43 pub handle: String, 44 pub did: String, 45} 46 47pub async fn create_session( 48 State(state): State<AppState>, 49 headers: HeaderMap, 50 Json(input): Json<CreateSessionInput>, 51) -> Response { 52 info!("create_session called"); 53 54 let client_ip = extract_client_ip(&headers); 55 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await { 56 warn!(ip = %client_ip, "Login rate limit exceeded"); 57 return ( 58 StatusCode::TOO_MANY_REQUESTS, 59 Json(json!({ 60 "error": "RateLimitExceeded", 61 "message": "Too many login attempts. Please try again later." 62 })), 63 ) 64 .into_response(); 65 } 66 67 let row = match sqlx::query!( 68 r#"SELECT 69 u.id, u.did, u.handle, u.password_hash, 70 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified, 71 k.key_bytes, k.encryption_version 72 FROM users u 73 JOIN user_keys k ON u.id = k.user_id 74 WHERE u.handle = $1 OR u.email = $1"#, 75 input.identifier 76 ) 77 .fetch_optional(&state.db) 78 .await 79 { 80 Ok(Some(row)) => row, 81 Ok(None) => { 82 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK"); 83 warn!("User not found for login attempt"); 84 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 85 } 86 Err(e) => { 87 error!("Database error fetching user: {:?}", e); 88 return ApiError::InternalError.into_response(); 89 } 90 }; 91 92 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 93 Ok(k) => k, 94 Err(e) => { 95 error!("Failed to decrypt user key: {:?}", e); 96 return ApiError::InternalError.into_response(); 97 } 98 }; 99 100 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) { 101 true 102 } else { 103 let app_passwords = sqlx::query!( 104 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 105 row.id 106 ) 107 .fetch_all(&state.db) 108 .await 109 .unwrap_or_default(); 110 111 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 112 }; 113 114 if !password_valid { 115 warn!("Password verification failed for login attempt"); 116 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 117 } 118 119 let is_verified = row.email_confirmed 120 || row.discord_verified 121 || row.telegram_verified 122 || row.signal_verified; 123 124 if !is_verified { 125 warn!("Login attempt for unverified account: {}", row.did); 126 return ( 127 StatusCode::FORBIDDEN, 128 Json(json!({ 129 "error": "AccountNotVerified", 130 "message": "Please verify your account before logging in", 131 "did": row.did 132 })), 133 ).into_response(); 134 } 135 136 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 137 Ok(m) => m, 138 Err(e) => { 139 error!("Failed to create access token: {:?}", e); 140 return ApiError::InternalError.into_response(); 141 } 142 }; 143 144 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 145 Ok(m) => m, 146 Err(e) => { 147 error!("Failed to create refresh token: {:?}", e); 148 return ApiError::InternalError.into_response(); 149 } 150 }; 151 152 if let Err(e) = sqlx::query!( 153 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 154 row.did, 155 access_meta.jti, 156 refresh_meta.jti, 157 access_meta.expires_at, 158 refresh_meta.expires_at 159 ) 160 .execute(&state.db) 161 .await 162 { 163 error!("Failed to insert session: {:?}", e); 164 return ApiError::InternalError.into_response(); 165 } 166 167 Json(CreateSessionOutput { 168 access_jwt: access_meta.token, 169 refresh_jwt: refresh_meta.token, 170 handle: row.handle, 171 did: row.did, 172 }).into_response() 173} 174 175pub async fn get_session( 176 State(state): State<AppState>, 177 BearerAuth(auth_user): BearerAuth, 178) -> Response { 179 match sqlx::query!( 180 r#"SELECT 181 handle, email, email_confirmed, 182 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel", 183 discord_verified, telegram_verified, signal_verified 184 FROM users WHERE did = $1"#, 185 auth_user.did 186 ) 187 .fetch_optional(&state.db) 188 .await 189 { 190 Ok(Some(row)) => { 191 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 192 crate::notifications::NotificationChannel::Email => ("email", row.email_confirmed), 193 crate::notifications::NotificationChannel::Discord => ("discord", row.discord_verified), 194 crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified), 195 crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified), 196 }; 197 Json(json!({ 198 "handle": row.handle, 199 "did": auth_user.did, 200 "email": row.email, 201 "emailConfirmed": row.email_confirmed, 202 "preferredChannel": preferred_channel, 203 "preferredChannelVerified": preferred_channel_verified, 204 "didDoc": {} 205 })).into_response() 206 } 207 Ok(None) => ApiError::AuthenticationFailed.into_response(), 208 Err(e) => { 209 error!("Database error in get_session: {:?}", e); 210 ApiError::InternalError.into_response() 211 } 212 } 213} 214 215pub async fn delete_session( 216 State(state): State<AppState>, 217 headers: axum::http::HeaderMap, 218) -> Response { 219 let token = match crate::auth::extract_bearer_token_from_header( 220 headers.get("Authorization").and_then(|h| h.to_str().ok()) 221 ) { 222 Some(t) => t, 223 None => return ApiError::AuthenticationRequired.into_response(), 224 }; 225 226 let jti = match crate::auth::get_jti_from_token(&token) { 227 Ok(jti) => jti, 228 Err(_) => return ApiError::AuthenticationFailed.into_response(), 229 }; 230 231 let did = crate::auth::get_did_from_token(&token).ok(); 232 233 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 234 .execute(&state.db) 235 .await 236 { 237 Ok(res) if res.rows_affected() > 0 => { 238 if let Some(did) = did { 239 let session_cache_key = format!("auth:session:{}:{}", did, jti); 240 let _ = state.cache.delete(&session_cache_key).await; 241 } 242 Json(json!({})).into_response() 243 } 244 Ok(_) => ApiError::AuthenticationFailed.into_response(), 245 Err(e) => { 246 error!("Database error in delete_session: {:?}", e); 247 ApiError::AuthenticationFailed.into_response() 248 } 249 } 250} 251 252pub async fn refresh_session( 253 State(state): State<AppState>, 254 headers: axum::http::HeaderMap, 255) -> Response { 256 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 257 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await { 258 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 259 return ( 260 axum::http::StatusCode::TOO_MANY_REQUESTS, 261 axum::Json(serde_json::json!({ 262 "error": "RateLimitExceeded", 263 "message": "Too many requests. Please try again later." 264 })), 265 ).into_response(); 266 } 267 268 let refresh_token = match crate::auth::extract_bearer_token_from_header( 269 headers.get("Authorization").and_then(|h| h.to_str().ok()) 270 ) { 271 Some(t) => t, 272 None => return ApiError::AuthenticationRequired.into_response(), 273 }; 274 275 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 276 Ok(jti) => jti, 277 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(), 278 }; 279 280 let mut tx = match state.db.begin().await { 281 Ok(tx) => tx, 282 Err(e) => { 283 error!("Failed to begin transaction: {:?}", e); 284 return ApiError::InternalError.into_response(); 285 } 286 }; 287 288 if let Ok(Some(session_id)) = sqlx::query_scalar!( 289 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 290 refresh_jti 291 ) 292 .fetch_optional(&mut *tx) 293 .await 294 { 295 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id); 296 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 297 .execute(&mut *tx) 298 .await; 299 let _ = tx.commit().await; 300 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 301 } 302 303 let session_row = match sqlx::query!( 304 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 305 FROM session_tokens st 306 JOIN users u ON st.did = u.did 307 JOIN user_keys k ON u.id = k.user_id 308 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 309 FOR UPDATE OF st"#, 310 refresh_jti 311 ) 312 .fetch_optional(&mut *tx) 313 .await 314 { 315 Ok(Some(row)) => row, 316 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(), 317 Err(e) => { 318 error!("Database error fetching session: {:?}", e); 319 return ApiError::InternalError.into_response(); 320 } 321 }; 322 323 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 324 Ok(k) => k, 325 Err(e) => { 326 error!("Failed to decrypt user key: {:?}", e); 327 return ApiError::InternalError.into_response(); 328 } 329 }; 330 331 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 332 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 333 } 334 335 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 336 Ok(m) => m, 337 Err(e) => { 338 error!("Failed to create access token: {:?}", e); 339 return ApiError::InternalError.into_response(); 340 } 341 }; 342 343 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 344 Ok(m) => m, 345 Err(e) => { 346 error!("Failed to create refresh token: {:?}", e); 347 return ApiError::InternalError.into_response(); 348 } 349 }; 350 351 match sqlx::query!( 352 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 353 refresh_jti, 354 session_row.id 355 ) 356 .execute(&mut *tx) 357 .await 358 { 359 Ok(result) if result.rows_affected() == 0 => { 360 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 361 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 362 .execute(&mut *tx) 363 .await; 364 let _ = tx.commit().await; 365 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 366 } 367 Err(e) => { 368 error!("Failed to record used refresh token: {:?}", e); 369 return ApiError::InternalError.into_response(); 370 } 371 Ok(_) => {} 372 } 373 374 if let Err(e) = sqlx::query!( 375 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 376 new_access_meta.jti, 377 new_refresh_meta.jti, 378 new_access_meta.expires_at, 379 new_refresh_meta.expires_at, 380 session_row.id 381 ) 382 .execute(&mut *tx) 383 .await 384 { 385 error!("Database error updating session: {:?}", e); 386 return ApiError::InternalError.into_response(); 387 } 388 389 if let Err(e) = tx.commit().await { 390 error!("Failed to commit transaction: {:?}", e); 391 return ApiError::InternalError.into_response(); 392 } 393 394 match sqlx::query!( 395 r#"SELECT 396 handle, email, email_confirmed, 397 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel", 398 discord_verified, telegram_verified, signal_verified 399 FROM users WHERE did = $1"#, 400 session_row.did 401 ) 402 .fetch_optional(&state.db) 403 .await 404 { 405 Ok(Some(u)) => { 406 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 407 crate::notifications::NotificationChannel::Email => ("email", u.email_confirmed), 408 crate::notifications::NotificationChannel::Discord => ("discord", u.discord_verified), 409 crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified), 410 crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified), 411 }; 412 Json(json!({ 413 "accessJwt": new_access_meta.token, 414 "refreshJwt": new_refresh_meta.token, 415 "handle": u.handle, 416 "did": session_row.did, 417 "email": u.email, 418 "emailConfirmed": u.email_confirmed, 419 "preferredChannel": preferred_channel, 420 "preferredChannelVerified": preferred_channel_verified 421 })).into_response() 422 } 423 Ok(None) => { 424 error!("User not found for existing session: {}", session_row.did); 425 ApiError::InternalError.into_response() 426 } 427 Err(e) => { 428 error!("Database error fetching user: {:?}", e); 429 ApiError::InternalError.into_response() 430 } 431 } 432} 433 434#[derive(Deserialize)] 435#[serde(rename_all = "camelCase")] 436pub struct ConfirmSignupInput { 437 pub did: String, 438 pub verification_code: String, 439} 440 441#[derive(Serialize)] 442#[serde(rename_all = "camelCase")] 443pub struct ConfirmSignupOutput { 444 pub access_jwt: String, 445 pub refresh_jwt: String, 446 pub handle: String, 447 pub did: String, 448 pub email: Option<String>, 449 pub email_confirmed: bool, 450 pub preferred_channel: String, 451 pub preferred_channel_verified: bool, 452} 453 454pub async fn confirm_signup( 455 State(state): State<AppState>, 456 Json(input): Json<ConfirmSignupInput>, 457) -> Response { 458 info!("confirm_signup called for DID: {}", input.did); 459 460 let row = match sqlx::query!( 461 r#"SELECT 462 u.id, u.did, u.handle, u.email, 463 u.email_confirmation_code, 464 u.email_confirmation_code_expires_at, 465 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 466 k.key_bytes, k.encryption_version 467 FROM users u 468 JOIN user_keys k ON u.id = k.user_id 469 WHERE u.did = $1"#, 470 input.did 471 ) 472 .fetch_optional(&state.db) 473 .await 474 { 475 Ok(Some(row)) => row, 476 Ok(None) => { 477 warn!("User not found for confirm_signup: {}", input.did); 478 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response(); 479 } 480 Err(e) => { 481 error!("Database error in confirm_signup: {:?}", e); 482 return ApiError::InternalError.into_response(); 483 } 484 }; 485 486 let stored_code = match &row.email_confirmation_code { 487 Some(code) => code, 488 None => { 489 warn!("No verification code found for user: {}", input.did); 490 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 491 } 492 }; 493 494 if stored_code != &input.verification_code { 495 warn!("Invalid verification code for user: {}", input.did); 496 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 497 } 498 499 if let Some(expires_at) = row.email_confirmation_code_expires_at { 500 if expires_at < Utc::now() { 501 warn!("Verification code expired for user: {}", input.did); 502 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 503 } 504 } 505 506 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 507 Ok(k) => k, 508 Err(e) => { 509 error!("Failed to decrypt user key: {:?}", e); 510 return ApiError::InternalError.into_response(); 511 } 512 }; 513 514 let verified_column = match row.channel { 515 crate::notifications::NotificationChannel::Email => "email_confirmed", 516 crate::notifications::NotificationChannel::Discord => "discord_verified", 517 crate::notifications::NotificationChannel::Telegram => "telegram_verified", 518 crate::notifications::NotificationChannel::Signal => "signal_verified", 519 }; 520 521 let update_query = format!( 522 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1", 523 verified_column 524 ); 525 526 if let Err(e) = sqlx::query(&update_query) 527 .bind(&input.did) 528 .execute(&state.db) 529 .await 530 { 531 error!("Failed to update verification status: {:?}", e); 532 return ApiError::InternalError.into_response(); 533 } 534 535 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 536 Ok(m) => m, 537 Err(e) => { 538 error!("Failed to create access token: {:?}", e); 539 return ApiError::InternalError.into_response(); 540 } 541 }; 542 543 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 544 Ok(m) => m, 545 Err(e) => { 546 error!("Failed to create refresh token: {:?}", e); 547 return ApiError::InternalError.into_response(); 548 } 549 }; 550 551 if let Err(e) = sqlx::query!( 552 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 553 row.did, 554 access_meta.jti, 555 refresh_meta.jti, 556 access_meta.expires_at, 557 refresh_meta.expires_at 558 ) 559 .execute(&state.db) 560 .await 561 { 562 error!("Failed to insert session: {:?}", e); 563 return ApiError::InternalError.into_response(); 564 } 565 566 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 567 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await { 568 warn!("Failed to enqueue welcome notification: {:?}", e); 569 } 570 571 let email_confirmed = matches!(row.channel, crate::notifications::NotificationChannel::Email); 572 let preferred_channel = match row.channel { 573 crate::notifications::NotificationChannel::Email => "email", 574 crate::notifications::NotificationChannel::Discord => "discord", 575 crate::notifications::NotificationChannel::Telegram => "telegram", 576 crate::notifications::NotificationChannel::Signal => "signal", 577 }; 578 579 Json(ConfirmSignupOutput { 580 access_jwt: access_meta.token, 581 refresh_jwt: refresh_meta.token, 582 handle: row.handle, 583 did: row.did, 584 email: row.email, 585 email_confirmed, 586 preferred_channel: preferred_channel.to_string(), 587 preferred_channel_verified: true, 588 }).into_response() 589} 590 591#[derive(Deserialize)] 592#[serde(rename_all = "camelCase")] 593pub struct ResendVerificationInput { 594 pub did: String, 595} 596 597pub async fn resend_verification( 598 State(state): State<AppState>, 599 Json(input): Json<ResendVerificationInput>, 600) -> Response { 601 info!("resend_verification called for DID: {}", input.did); 602 603 let row = match sqlx::query!( 604 r#"SELECT 605 id, handle, email, 606 preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 607 discord_id, telegram_username, signal_number, 608 email_confirmed, discord_verified, telegram_verified, signal_verified 609 FROM users 610 WHERE did = $1"#, 611 input.did 612 ) 613 .fetch_optional(&state.db) 614 .await 615 { 616 Ok(Some(row)) => row, 617 Ok(None) => { 618 return ApiError::InvalidRequest("User not found".into()).into_response(); 619 } 620 Err(e) => { 621 error!("Database error in resend_verification: {:?}", e); 622 return ApiError::InternalError.into_response(); 623 } 624 }; 625 626 let is_verified = row.email_confirmed 627 || row.discord_verified 628 || row.telegram_verified 629 || row.signal_verified; 630 631 if is_verified { 632 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 633 } 634 635 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 636 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 637 638 if let Err(e) = sqlx::query!( 639 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3", 640 verification_code, 641 code_expires_at, 642 input.did 643 ) 644 .execute(&state.db) 645 .await 646 { 647 error!("Failed to update verification code: {:?}", e); 648 return ApiError::InternalError.into_response(); 649 } 650 651 let (channel_str, recipient) = match row.channel { 652 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()), 653 crate::notifications::NotificationChannel::Discord => { 654 ("discord", row.discord_id.unwrap_or_default()) 655 } 656 crate::notifications::NotificationChannel::Telegram => { 657 ("telegram", row.telegram_username.unwrap_or_default()) 658 } 659 crate::notifications::NotificationChannel::Signal => { 660 ("signal", row.signal_number.unwrap_or_default()) 661 } 662 }; 663 664 if let Err(e) = crate::notifications::enqueue_signup_verification( 665 &state.db, 666 row.id, 667 channel_str, 668 &recipient, 669 &verification_code, 670 ).await { 671 warn!("Failed to enqueue verification notification: {:?}", e); 672 } 673 674 Json(json!({"success": true})).into_response() 675}