this repo has no description
1use crate::api::ApiError; 2use crate::auth::BearerAuth; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") { 18 if let Ok(value) = forwarded.to_str() { 19 if let Some(first_ip) = value.split(',').next() { 20 return first_ip.trim().to_string(); 21 } 22 } 23 } 24 if let Some(real_ip) = headers.get("x-real-ip") { 25 if let Ok(value) = real_ip.to_str() { 26 return value.trim().to_string(); 27 } 28 } 29 "unknown".to_string() 30} 31 32#[derive(Deserialize)] 33pub struct CreateSessionInput { 34 pub identifier: String, 35 pub password: String, 36} 37 38#[derive(Serialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateSessionOutput { 41 pub access_jwt: String, 42 pub refresh_jwt: String, 43 pub handle: String, 44 pub did: String, 45} 46 47pub async fn create_session( 48 State(state): State<AppState>, 49 headers: HeaderMap, 50 Json(input): Json<CreateSessionInput>, 51) -> Response { 52 info!("create_session called"); 53 let client_ip = extract_client_ip(&headers); 54 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await { 55 warn!(ip = %client_ip, "Login rate limit exceeded"); 56 return ( 57 StatusCode::TOO_MANY_REQUESTS, 58 Json(json!({ 59 "error": "RateLimitExceeded", 60 "message": "Too many login attempts. Please try again later." 61 })), 62 ) 63 .into_response(); 64 } 65 let row = match sqlx::query!( 66 r#"SELECT 67 u.id, u.did, u.handle, u.password_hash, 68 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified, 69 k.key_bytes, k.encryption_version 70 FROM users u 71 JOIN user_keys k ON u.id = k.user_id 72 WHERE u.handle = $1 OR u.email = $1"#, 73 input.identifier 74 ) 75 .fetch_optional(&state.db) 76 .await 77 { 78 Ok(Some(row)) => row, 79 Ok(None) => { 80 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK"); 81 warn!("User not found for login attempt"); 82 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 83 } 84 Err(e) => { 85 error!("Database error fetching user: {:?}", e); 86 return ApiError::InternalError.into_response(); 87 } 88 }; 89 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 90 Ok(k) => k, 91 Err(e) => { 92 error!("Failed to decrypt user key: {:?}", e); 93 return ApiError::InternalError.into_response(); 94 } 95 }; 96 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) { 97 true 98 } else { 99 let app_passwords = sqlx::query!( 100 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 101 row.id 102 ) 103 .fetch_all(&state.db) 104 .await 105 .unwrap_or_default(); 106 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 107 }; 108 if !password_valid { 109 warn!("Password verification failed for login attempt"); 110 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 111 } 112 let is_verified = row.email_confirmed 113 || row.discord_verified 114 || row.telegram_verified 115 || row.signal_verified; 116 if !is_verified { 117 warn!("Login attempt for unverified account: {}", row.did); 118 return ( 119 StatusCode::FORBIDDEN, 120 Json(json!({ 121 "error": "AccountNotVerified", 122 "message": "Please verify your account before logging in", 123 "did": row.did 124 })), 125 ).into_response(); 126 } 127 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 128 Ok(m) => m, 129 Err(e) => { 130 error!("Failed to create access token: {:?}", e); 131 return ApiError::InternalError.into_response(); 132 } 133 }; 134 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 135 Ok(m) => m, 136 Err(e) => { 137 error!("Failed to create refresh token: {:?}", e); 138 return ApiError::InternalError.into_response(); 139 } 140 }; 141 if let Err(e) = sqlx::query!( 142 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 143 row.did, 144 access_meta.jti, 145 refresh_meta.jti, 146 access_meta.expires_at, 147 refresh_meta.expires_at 148 ) 149 .execute(&state.db) 150 .await 151 { 152 error!("Failed to insert session: {:?}", e); 153 return ApiError::InternalError.into_response(); 154 } 155 Json(CreateSessionOutput { 156 access_jwt: access_meta.token, 157 refresh_jwt: refresh_meta.token, 158 handle: row.handle, 159 did: row.did, 160 }).into_response() 161} 162 163pub async fn get_session( 164 State(state): State<AppState>, 165 BearerAuth(auth_user): BearerAuth, 166) -> Response { 167 match sqlx::query!( 168 r#"SELECT 169 handle, email, email_confirmed, 170 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel", 171 discord_verified, telegram_verified, signal_verified 172 FROM users WHERE did = $1"#, 173 auth_user.did 174 ) 175 .fetch_optional(&state.db) 176 .await 177 { 178 Ok(Some(row)) => { 179 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 180 crate::notifications::NotificationChannel::Email => ("email", row.email_confirmed), 181 crate::notifications::NotificationChannel::Discord => ("discord", row.discord_verified), 182 crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified), 183 crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified), 184 }; 185 Json(json!({ 186 "handle": row.handle, 187 "did": auth_user.did, 188 "email": row.email, 189 "emailConfirmed": row.email_confirmed, 190 "preferredChannel": preferred_channel, 191 "preferredChannelVerified": preferred_channel_verified, 192 "didDoc": {} 193 })).into_response() 194 } 195 Ok(None) => ApiError::AuthenticationFailed.into_response(), 196 Err(e) => { 197 error!("Database error in get_session: {:?}", e); 198 ApiError::InternalError.into_response() 199 } 200 } 201} 202 203pub async fn delete_session( 204 State(state): State<AppState>, 205 headers: axum::http::HeaderMap, 206) -> Response { 207 let token = match crate::auth::extract_bearer_token_from_header( 208 headers.get("Authorization").and_then(|h| h.to_str().ok()) 209 ) { 210 Some(t) => t, 211 None => return ApiError::AuthenticationRequired.into_response(), 212 }; 213 let jti = match crate::auth::get_jti_from_token(&token) { 214 Ok(jti) => jti, 215 Err(_) => return ApiError::AuthenticationFailed.into_response(), 216 }; 217 let did = crate::auth::get_did_from_token(&token).ok(); 218 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 219 .execute(&state.db) 220 .await 221 { 222 Ok(res) if res.rows_affected() > 0 => { 223 if let Some(did) = did { 224 let session_cache_key = format!("auth:session:{}:{}", did, jti); 225 let _ = state.cache.delete(&session_cache_key).await; 226 } 227 Json(json!({})).into_response() 228 } 229 Ok(_) => ApiError::AuthenticationFailed.into_response(), 230 Err(e) => { 231 error!("Database error in delete_session: {:?}", e); 232 ApiError::AuthenticationFailed.into_response() 233 } 234 } 235} 236 237pub async fn refresh_session( 238 State(state): State<AppState>, 239 headers: axum::http::HeaderMap, 240) -> Response { 241 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 242 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await { 243 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 244 return ( 245 axum::http::StatusCode::TOO_MANY_REQUESTS, 246 axum::Json(serde_json::json!({ 247 "error": "RateLimitExceeded", 248 "message": "Too many requests. Please try again later." 249 })), 250 ).into_response(); 251 } 252 let refresh_token = match crate::auth::extract_bearer_token_from_header( 253 headers.get("Authorization").and_then(|h| h.to_str().ok()) 254 ) { 255 Some(t) => t, 256 None => return ApiError::AuthenticationRequired.into_response(), 257 }; 258 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 259 Ok(jti) => jti, 260 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(), 261 }; 262 let mut tx = match state.db.begin().await { 263 Ok(tx) => tx, 264 Err(e) => { 265 error!("Failed to begin transaction: {:?}", e); 266 return ApiError::InternalError.into_response(); 267 } 268 }; 269 if let Ok(Some(session_id)) = sqlx::query_scalar!( 270 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 271 refresh_jti 272 ) 273 .fetch_optional(&mut *tx) 274 .await 275 { 276 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id); 277 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 278 .execute(&mut *tx) 279 .await; 280 let _ = tx.commit().await; 281 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 282 } 283 let session_row = match sqlx::query!( 284 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 285 FROM session_tokens st 286 JOIN users u ON st.did = u.did 287 JOIN user_keys k ON u.id = k.user_id 288 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 289 FOR UPDATE OF st"#, 290 refresh_jti 291 ) 292 .fetch_optional(&mut *tx) 293 .await 294 { 295 Ok(Some(row)) => row, 296 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(), 297 Err(e) => { 298 error!("Database error fetching session: {:?}", e); 299 return ApiError::InternalError.into_response(); 300 } 301 }; 302 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 303 Ok(k) => k, 304 Err(e) => { 305 error!("Failed to decrypt user key: {:?}", e); 306 return ApiError::InternalError.into_response(); 307 } 308 }; 309 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 310 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 311 } 312 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 313 Ok(m) => m, 314 Err(e) => { 315 error!("Failed to create access token: {:?}", e); 316 return ApiError::InternalError.into_response(); 317 } 318 }; 319 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 320 Ok(m) => m, 321 Err(e) => { 322 error!("Failed to create refresh token: {:?}", e); 323 return ApiError::InternalError.into_response(); 324 } 325 }; 326 match sqlx::query!( 327 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 328 refresh_jti, 329 session_row.id 330 ) 331 .execute(&mut *tx) 332 .await 333 { 334 Ok(result) if result.rows_affected() == 0 => { 335 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 336 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 337 .execute(&mut *tx) 338 .await; 339 let _ = tx.commit().await; 340 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 341 } 342 Err(e) => { 343 error!("Failed to record used refresh token: {:?}", e); 344 return ApiError::InternalError.into_response(); 345 } 346 Ok(_) => {} 347 } 348 if let Err(e) = sqlx::query!( 349 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 350 new_access_meta.jti, 351 new_refresh_meta.jti, 352 new_access_meta.expires_at, 353 new_refresh_meta.expires_at, 354 session_row.id 355 ) 356 .execute(&mut *tx) 357 .await 358 { 359 error!("Database error updating session: {:?}", e); 360 return ApiError::InternalError.into_response(); 361 } 362 if let Err(e) = tx.commit().await { 363 error!("Failed to commit transaction: {:?}", e); 364 return ApiError::InternalError.into_response(); 365 } 366 match sqlx::query!( 367 r#"SELECT 368 handle, email, email_confirmed, 369 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel", 370 discord_verified, telegram_verified, signal_verified 371 FROM users WHERE did = $1"#, 372 session_row.did 373 ) 374 .fetch_optional(&state.db) 375 .await 376 { 377 Ok(Some(u)) => { 378 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 379 crate::notifications::NotificationChannel::Email => ("email", u.email_confirmed), 380 crate::notifications::NotificationChannel::Discord => ("discord", u.discord_verified), 381 crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified), 382 crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified), 383 }; 384 Json(json!({ 385 "accessJwt": new_access_meta.token, 386 "refreshJwt": new_refresh_meta.token, 387 "handle": u.handle, 388 "did": session_row.did, 389 "email": u.email, 390 "emailConfirmed": u.email_confirmed, 391 "preferredChannel": preferred_channel, 392 "preferredChannelVerified": preferred_channel_verified 393 })).into_response() 394 } 395 Ok(None) => { 396 error!("User not found for existing session: {}", session_row.did); 397 ApiError::InternalError.into_response() 398 } 399 Err(e) => { 400 error!("Database error fetching user: {:?}", e); 401 ApiError::InternalError.into_response() 402 } 403 } 404} 405 406#[derive(Deserialize)] 407#[serde(rename_all = "camelCase")] 408pub struct ConfirmSignupInput { 409 pub did: String, 410 pub verification_code: String, 411} 412 413#[derive(Serialize)] 414#[serde(rename_all = "camelCase")] 415pub struct ConfirmSignupOutput { 416 pub access_jwt: String, 417 pub refresh_jwt: String, 418 pub handle: String, 419 pub did: String, 420 pub email: Option<String>, 421 pub email_confirmed: bool, 422 pub preferred_channel: String, 423 pub preferred_channel_verified: bool, 424} 425 426pub async fn confirm_signup( 427 State(state): State<AppState>, 428 Json(input): Json<ConfirmSignupInput>, 429) -> Response { 430 info!("confirm_signup called for DID: {}", input.did); 431 let row = match sqlx::query!( 432 r#"SELECT 433 u.id, u.did, u.handle, u.email, 434 u.email_confirmation_code, 435 u.email_confirmation_code_expires_at, 436 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 437 k.key_bytes, k.encryption_version 438 FROM users u 439 JOIN user_keys k ON u.id = k.user_id 440 WHERE u.did = $1"#, 441 input.did 442 ) 443 .fetch_optional(&state.db) 444 .await 445 { 446 Ok(Some(row)) => row, 447 Ok(None) => { 448 warn!("User not found for confirm_signup: {}", input.did); 449 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response(); 450 } 451 Err(e) => { 452 error!("Database error in confirm_signup: {:?}", e); 453 return ApiError::InternalError.into_response(); 454 } 455 }; 456 let stored_code = match &row.email_confirmation_code { 457 Some(code) => code, 458 None => { 459 warn!("No verification code found for user: {}", input.did); 460 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 461 } 462 }; 463 if stored_code != &input.verification_code { 464 warn!("Invalid verification code for user: {}", input.did); 465 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 466 } 467 if let Some(expires_at) = row.email_confirmation_code_expires_at { 468 if expires_at < Utc::now() { 469 warn!("Verification code expired for user: {}", input.did); 470 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 471 } 472 } 473 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 474 Ok(k) => k, 475 Err(e) => { 476 error!("Failed to decrypt user key: {:?}", e); 477 return ApiError::InternalError.into_response(); 478 } 479 }; 480 let verified_column = match row.channel { 481 crate::notifications::NotificationChannel::Email => "email_confirmed", 482 crate::notifications::NotificationChannel::Discord => "discord_verified", 483 crate::notifications::NotificationChannel::Telegram => "telegram_verified", 484 crate::notifications::NotificationChannel::Signal => "signal_verified", 485 }; 486 let update_query = format!( 487 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1", 488 verified_column 489 ); 490 if let Err(e) = sqlx::query(&update_query) 491 .bind(&input.did) 492 .execute(&state.db) 493 .await 494 { 495 error!("Failed to update verification status: {:?}", e); 496 return ApiError::InternalError.into_response(); 497 } 498 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 499 Ok(m) => m, 500 Err(e) => { 501 error!("Failed to create access token: {:?}", e); 502 return ApiError::InternalError.into_response(); 503 } 504 }; 505 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 506 Ok(m) => m, 507 Err(e) => { 508 error!("Failed to create refresh token: {:?}", e); 509 return ApiError::InternalError.into_response(); 510 } 511 }; 512 if let Err(e) = sqlx::query!( 513 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 514 row.did, 515 access_meta.jti, 516 refresh_meta.jti, 517 access_meta.expires_at, 518 refresh_meta.expires_at 519 ) 520 .execute(&state.db) 521 .await 522 { 523 error!("Failed to insert session: {:?}", e); 524 return ApiError::InternalError.into_response(); 525 } 526 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 527 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await { 528 warn!("Failed to enqueue welcome notification: {:?}", e); 529 } 530 let email_confirmed = matches!(row.channel, crate::notifications::NotificationChannel::Email); 531 let preferred_channel = match row.channel { 532 crate::notifications::NotificationChannel::Email => "email", 533 crate::notifications::NotificationChannel::Discord => "discord", 534 crate::notifications::NotificationChannel::Telegram => "telegram", 535 crate::notifications::NotificationChannel::Signal => "signal", 536 }; 537 Json(ConfirmSignupOutput { 538 access_jwt: access_meta.token, 539 refresh_jwt: refresh_meta.token, 540 handle: row.handle, 541 did: row.did, 542 email: row.email, 543 email_confirmed, 544 preferred_channel: preferred_channel.to_string(), 545 preferred_channel_verified: true, 546 }).into_response() 547} 548 549#[derive(Deserialize)] 550#[serde(rename_all = "camelCase")] 551pub struct ResendVerificationInput { 552 pub did: String, 553} 554 555pub async fn resend_verification( 556 State(state): State<AppState>, 557 Json(input): Json<ResendVerificationInput>, 558) -> Response { 559 info!("resend_verification called for DID: {}", input.did); 560 let row = match sqlx::query!( 561 r#"SELECT 562 id, handle, email, 563 preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 564 discord_id, telegram_username, signal_number, 565 email_confirmed, discord_verified, telegram_verified, signal_verified 566 FROM users 567 WHERE did = $1"#, 568 input.did 569 ) 570 .fetch_optional(&state.db) 571 .await 572 { 573 Ok(Some(row)) => row, 574 Ok(None) => { 575 return ApiError::InvalidRequest("User not found".into()).into_response(); 576 } 577 Err(e) => { 578 error!("Database error in resend_verification: {:?}", e); 579 return ApiError::InternalError.into_response(); 580 } 581 }; 582 let is_verified = row.email_confirmed 583 || row.discord_verified 584 || row.telegram_verified 585 || row.signal_verified; 586 if is_verified { 587 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 588 } 589 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 590 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 591 if let Err(e) = sqlx::query!( 592 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3", 593 verification_code, 594 code_expires_at, 595 input.did 596 ) 597 .execute(&state.db) 598 .await 599 { 600 error!("Failed to update verification code: {:?}", e); 601 return ApiError::InternalError.into_response(); 602 } 603 let (channel_str, recipient) = match row.channel { 604 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()), 605 crate::notifications::NotificationChannel::Discord => { 606 ("discord", row.discord_id.unwrap_or_default()) 607 } 608 crate::notifications::NotificationChannel::Telegram => { 609 ("telegram", row.telegram_username.unwrap_or_default()) 610 } 611 crate::notifications::NotificationChannel::Signal => { 612 ("signal", row.signal_number.unwrap_or_default()) 613 } 614 }; 615 if let Err(e) = crate::notifications::enqueue_signup_verification( 616 &state.db, 617 row.id, 618 channel_str, 619 &recipient, 620 &verification_code, 621 ).await { 622 warn!("Failed to enqueue verification notification: {:?}", e); 623 } 624 Json(json!({"success": true})).into_response() 625}