this repo has no description
1use crate::api::ApiError; 2use crate::auth::BearerAuth; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") { 18 if let Ok(value) = forwarded.to_str() { 19 if let Some(first_ip) = value.split(',').next() { 20 return first_ip.trim().to_string(); 21 } 22 } 23 } 24 if let Some(real_ip) = headers.get("x-real-ip") { 25 if let Ok(value) = real_ip.to_str() { 26 return value.trim().to_string(); 27 } 28 } 29 "unknown".to_string() 30} 31 32#[derive(Deserialize)] 33pub struct CreateSessionInput { 34 pub identifier: String, 35 pub password: String, 36} 37 38#[derive(Serialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateSessionOutput { 41 pub access_jwt: String, 42 pub refresh_jwt: String, 43 pub handle: String, 44 pub did: String, 45} 46 47pub async fn create_session( 48 State(state): State<AppState>, 49 headers: HeaderMap, 50 Json(input): Json<CreateSessionInput>, 51) -> Response { 52 info!("create_session called"); 53 54 let client_ip = extract_client_ip(&headers); 55 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await { 56 warn!(ip = %client_ip, "Login rate limit exceeded"); 57 return ( 58 StatusCode::TOO_MANY_REQUESTS, 59 Json(json!({ 60 "error": "RateLimitExceeded", 61 "message": "Too many login attempts. Please try again later." 62 })), 63 ) 64 .into_response(); 65 } 66 67 let row = match sqlx::query!( 68 r#"SELECT 69 u.id, u.did, u.handle, u.password_hash, 70 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified, 71 k.key_bytes, k.encryption_version 72 FROM users u 73 JOIN user_keys k ON u.id = k.user_id 74 WHERE u.handle = $1 OR u.email = $1"#, 75 input.identifier 76 ) 77 .fetch_optional(&state.db) 78 .await 79 { 80 Ok(Some(row)) => row, 81 Ok(None) => { 82 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK"); 83 warn!("User not found for login attempt"); 84 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 85 } 86 Err(e) => { 87 error!("Database error fetching user: {:?}", e); 88 return ApiError::InternalError.into_response(); 89 } 90 }; 91 92 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 93 Ok(k) => k, 94 Err(e) => { 95 error!("Failed to decrypt user key: {:?}", e); 96 return ApiError::InternalError.into_response(); 97 } 98 }; 99 100 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) { 101 true 102 } else { 103 let app_passwords = sqlx::query!( 104 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 105 row.id 106 ) 107 .fetch_all(&state.db) 108 .await 109 .unwrap_or_default(); 110 111 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 112 }; 113 114 if !password_valid { 115 warn!("Password verification failed for login attempt"); 116 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 117 } 118 119 let is_verified = row.email_confirmed 120 || row.discord_verified 121 || row.telegram_verified 122 || row.signal_verified; 123 124 if !is_verified { 125 warn!("Login attempt for unverified account: {}", row.did); 126 return ( 127 StatusCode::FORBIDDEN, 128 Json(json!({ 129 "error": "AccountNotVerified", 130 "message": "Please verify your account before logging in", 131 "did": row.did 132 })), 133 ).into_response(); 134 } 135 136 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 137 Ok(m) => m, 138 Err(e) => { 139 error!("Failed to create access token: {:?}", e); 140 return ApiError::InternalError.into_response(); 141 } 142 }; 143 144 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 145 Ok(m) => m, 146 Err(e) => { 147 error!("Failed to create refresh token: {:?}", e); 148 return ApiError::InternalError.into_response(); 149 } 150 }; 151 152 if let Err(e) = sqlx::query!( 153 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 154 row.did, 155 access_meta.jti, 156 refresh_meta.jti, 157 access_meta.expires_at, 158 refresh_meta.expires_at 159 ) 160 .execute(&state.db) 161 .await 162 { 163 error!("Failed to insert session: {:?}", e); 164 return ApiError::InternalError.into_response(); 165 } 166 167 Json(CreateSessionOutput { 168 access_jwt: access_meta.token, 169 refresh_jwt: refresh_meta.token, 170 handle: row.handle, 171 did: row.did, 172 }).into_response() 173} 174 175pub async fn get_session( 176 State(state): State<AppState>, 177 BearerAuth(auth_user): BearerAuth, 178) -> Response { 179 match sqlx::query!("SELECT handle, email FROM users WHERE did = $1", auth_user.did) 180 .fetch_optional(&state.db) 181 .await 182 { 183 Ok(Some(row)) => Json(json!({ 184 "handle": row.handle, 185 "did": auth_user.did, 186 "email": row.email, 187 "didDoc": {} 188 })).into_response(), 189 Ok(None) => ApiError::AuthenticationFailed.into_response(), 190 Err(e) => { 191 error!("Database error in get_session: {:?}", e); 192 ApiError::InternalError.into_response() 193 } 194 } 195} 196 197pub async fn delete_session( 198 State(state): State<AppState>, 199 headers: axum::http::HeaderMap, 200) -> Response { 201 let token = match crate::auth::extract_bearer_token_from_header( 202 headers.get("Authorization").and_then(|h| h.to_str().ok()) 203 ) { 204 Some(t) => t, 205 None => return ApiError::AuthenticationRequired.into_response(), 206 }; 207 208 let jti = match crate::auth::get_jti_from_token(&token) { 209 Ok(jti) => jti, 210 Err(_) => return ApiError::AuthenticationFailed.into_response(), 211 }; 212 213 let did = crate::auth::get_did_from_token(&token).ok(); 214 215 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 216 .execute(&state.db) 217 .await 218 { 219 Ok(res) if res.rows_affected() > 0 => { 220 if let Some(did) = did { 221 let session_cache_key = format!("auth:session:{}:{}", did, jti); 222 let _ = state.cache.delete(&session_cache_key).await; 223 } 224 Json(json!({})).into_response() 225 } 226 Ok(_) => ApiError::AuthenticationFailed.into_response(), 227 Err(e) => { 228 error!("Database error in delete_session: {:?}", e); 229 ApiError::AuthenticationFailed.into_response() 230 } 231 } 232} 233 234pub async fn refresh_session( 235 State(state): State<AppState>, 236 headers: axum::http::HeaderMap, 237) -> Response { 238 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 239 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await { 240 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 241 return ( 242 axum::http::StatusCode::TOO_MANY_REQUESTS, 243 axum::Json(serde_json::json!({ 244 "error": "RateLimitExceeded", 245 "message": "Too many requests. Please try again later." 246 })), 247 ).into_response(); 248 } 249 250 let refresh_token = match crate::auth::extract_bearer_token_from_header( 251 headers.get("Authorization").and_then(|h| h.to_str().ok()) 252 ) { 253 Some(t) => t, 254 None => return ApiError::AuthenticationRequired.into_response(), 255 }; 256 257 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 258 Ok(jti) => jti, 259 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(), 260 }; 261 262 let mut tx = match state.db.begin().await { 263 Ok(tx) => tx, 264 Err(e) => { 265 error!("Failed to begin transaction: {:?}", e); 266 return ApiError::InternalError.into_response(); 267 } 268 }; 269 270 if let Ok(Some(session_id)) = sqlx::query_scalar!( 271 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 272 refresh_jti 273 ) 274 .fetch_optional(&mut *tx) 275 .await 276 { 277 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id); 278 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 279 .execute(&mut *tx) 280 .await; 281 let _ = tx.commit().await; 282 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 283 } 284 285 let session_row = match sqlx::query!( 286 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 287 FROM session_tokens st 288 JOIN users u ON st.did = u.did 289 JOIN user_keys k ON u.id = k.user_id 290 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 291 FOR UPDATE OF st"#, 292 refresh_jti 293 ) 294 .fetch_optional(&mut *tx) 295 .await 296 { 297 Ok(Some(row)) => row, 298 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(), 299 Err(e) => { 300 error!("Database error fetching session: {:?}", e); 301 return ApiError::InternalError.into_response(); 302 } 303 }; 304 305 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 306 Ok(k) => k, 307 Err(e) => { 308 error!("Failed to decrypt user key: {:?}", e); 309 return ApiError::InternalError.into_response(); 310 } 311 }; 312 313 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 314 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 315 } 316 317 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 318 Ok(m) => m, 319 Err(e) => { 320 error!("Failed to create access token: {:?}", e); 321 return ApiError::InternalError.into_response(); 322 } 323 }; 324 325 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 326 Ok(m) => m, 327 Err(e) => { 328 error!("Failed to create refresh token: {:?}", e); 329 return ApiError::InternalError.into_response(); 330 } 331 }; 332 333 match sqlx::query!( 334 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 335 refresh_jti, 336 session_row.id 337 ) 338 .execute(&mut *tx) 339 .await 340 { 341 Ok(result) if result.rows_affected() == 0 => { 342 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 343 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 344 .execute(&mut *tx) 345 .await; 346 let _ = tx.commit().await; 347 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 348 } 349 Err(e) => { 350 error!("Failed to record used refresh token: {:?}", e); 351 return ApiError::InternalError.into_response(); 352 } 353 Ok(_) => {} 354 } 355 356 if let Err(e) = sqlx::query!( 357 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 358 new_access_meta.jti, 359 new_refresh_meta.jti, 360 new_access_meta.expires_at, 361 new_refresh_meta.expires_at, 362 session_row.id 363 ) 364 .execute(&mut *tx) 365 .await 366 { 367 error!("Database error updating session: {:?}", e); 368 return ApiError::InternalError.into_response(); 369 } 370 371 if let Err(e) = tx.commit().await { 372 error!("Failed to commit transaction: {:?}", e); 373 return ApiError::InternalError.into_response(); 374 } 375 376 match sqlx::query!("SELECT handle FROM users WHERE did = $1", session_row.did) 377 .fetch_optional(&state.db) 378 .await 379 { 380 Ok(Some(u)) => Json(json!({ 381 "accessJwt": new_access_meta.token, 382 "refreshJwt": new_refresh_meta.token, 383 "handle": u.handle, 384 "did": session_row.did 385 })).into_response(), 386 Ok(None) => { 387 error!("User not found for existing session: {}", session_row.did); 388 ApiError::InternalError.into_response() 389 } 390 Err(e) => { 391 error!("Database error fetching user: {:?}", e); 392 ApiError::InternalError.into_response() 393 } 394 } 395} 396 397#[derive(Deserialize)] 398#[serde(rename_all = "camelCase")] 399pub struct ConfirmSignupInput { 400 pub did: String, 401 pub verification_code: String, 402} 403 404#[derive(Serialize)] 405#[serde(rename_all = "camelCase")] 406pub struct ConfirmSignupOutput { 407 pub access_jwt: String, 408 pub refresh_jwt: String, 409 pub handle: String, 410 pub did: String, 411} 412 413pub async fn confirm_signup( 414 State(state): State<AppState>, 415 Json(input): Json<ConfirmSignupInput>, 416) -> Response { 417 info!("confirm_signup called for DID: {}", input.did); 418 419 let row = match sqlx::query!( 420 r#"SELECT 421 u.id, u.did, u.handle, 422 u.email_confirmation_code, 423 u.email_confirmation_code_expires_at, 424 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 425 k.key_bytes, k.encryption_version 426 FROM users u 427 JOIN user_keys k ON u.id = k.user_id 428 WHERE u.did = $1"#, 429 input.did 430 ) 431 .fetch_optional(&state.db) 432 .await 433 { 434 Ok(Some(row)) => row, 435 Ok(None) => { 436 warn!("User not found for confirm_signup: {}", input.did); 437 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response(); 438 } 439 Err(e) => { 440 error!("Database error in confirm_signup: {:?}", e); 441 return ApiError::InternalError.into_response(); 442 } 443 }; 444 445 let stored_code = match &row.email_confirmation_code { 446 Some(code) => code, 447 None => { 448 warn!("No verification code found for user: {}", input.did); 449 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 450 } 451 }; 452 453 if stored_code != &input.verification_code { 454 warn!("Invalid verification code for user: {}", input.did); 455 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 456 } 457 458 if let Some(expires_at) = row.email_confirmation_code_expires_at { 459 if expires_at < Utc::now() { 460 warn!("Verification code expired for user: {}", input.did); 461 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 462 } 463 } 464 465 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 466 Ok(k) => k, 467 Err(e) => { 468 error!("Failed to decrypt user key: {:?}", e); 469 return ApiError::InternalError.into_response(); 470 } 471 }; 472 473 let verified_column = match row.channel { 474 crate::notifications::NotificationChannel::Email => "email_confirmed", 475 crate::notifications::NotificationChannel::Discord => "discord_verified", 476 crate::notifications::NotificationChannel::Telegram => "telegram_verified", 477 crate::notifications::NotificationChannel::Signal => "signal_verified", 478 }; 479 480 let update_query = format!( 481 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1", 482 verified_column 483 ); 484 485 if let Err(e) = sqlx::query(&update_query) 486 .bind(&input.did) 487 .execute(&state.db) 488 .await 489 { 490 error!("Failed to update verification status: {:?}", e); 491 return ApiError::InternalError.into_response(); 492 } 493 494 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 495 Ok(m) => m, 496 Err(e) => { 497 error!("Failed to create access token: {:?}", e); 498 return ApiError::InternalError.into_response(); 499 } 500 }; 501 502 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 503 Ok(m) => m, 504 Err(e) => { 505 error!("Failed to create refresh token: {:?}", e); 506 return ApiError::InternalError.into_response(); 507 } 508 }; 509 510 if let Err(e) = sqlx::query!( 511 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 512 row.did, 513 access_meta.jti, 514 refresh_meta.jti, 515 access_meta.expires_at, 516 refresh_meta.expires_at 517 ) 518 .execute(&state.db) 519 .await 520 { 521 error!("Failed to insert session: {:?}", e); 522 return ApiError::InternalError.into_response(); 523 } 524 525 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 526 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await { 527 warn!("Failed to enqueue welcome notification: {:?}", e); 528 } 529 530 Json(ConfirmSignupOutput { 531 access_jwt: access_meta.token, 532 refresh_jwt: refresh_meta.token, 533 handle: row.handle, 534 did: row.did, 535 }).into_response() 536} 537 538#[derive(Deserialize)] 539#[serde(rename_all = "camelCase")] 540pub struct ResendVerificationInput { 541 pub did: String, 542} 543 544pub async fn resend_verification( 545 State(state): State<AppState>, 546 Json(input): Json<ResendVerificationInput>, 547) -> Response { 548 info!("resend_verification called for DID: {}", input.did); 549 550 let row = match sqlx::query!( 551 r#"SELECT 552 id, handle, email, 553 preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 554 discord_id, telegram_username, signal_number, 555 email_confirmed, discord_verified, telegram_verified, signal_verified 556 FROM users 557 WHERE did = $1"#, 558 input.did 559 ) 560 .fetch_optional(&state.db) 561 .await 562 { 563 Ok(Some(row)) => row, 564 Ok(None) => { 565 return ApiError::InvalidRequest("User not found".into()).into_response(); 566 } 567 Err(e) => { 568 error!("Database error in resend_verification: {:?}", e); 569 return ApiError::InternalError.into_response(); 570 } 571 }; 572 573 let is_verified = row.email_confirmed 574 || row.discord_verified 575 || row.telegram_verified 576 || row.signal_verified; 577 578 if is_verified { 579 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 580 } 581 582 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 583 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 584 585 if let Err(e) = sqlx::query!( 586 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3", 587 verification_code, 588 code_expires_at, 589 input.did 590 ) 591 .execute(&state.db) 592 .await 593 { 594 error!("Failed to update verification code: {:?}", e); 595 return ApiError::InternalError.into_response(); 596 } 597 598 let (channel_str, recipient) = match row.channel { 599 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()), 600 crate::notifications::NotificationChannel::Discord => { 601 ("discord", row.discord_id.unwrap_or_default()) 602 } 603 crate::notifications::NotificationChannel::Telegram => { 604 ("telegram", row.telegram_username.unwrap_or_default()) 605 } 606 crate::notifications::NotificationChannel::Signal => { 607 ("signal", row.signal_number.unwrap_or_default()) 608 } 609 }; 610 611 if let Err(e) = crate::notifications::enqueue_signup_verification( 612 &state.db, 613 row.id, 614 channel_str, 615 &recipient, 616 &verification_code, 617 ).await { 618 warn!("Failed to enqueue verification notification: {:?}", e); 619 } 620 621 Json(json!({"success": true})).into_response() 622}