this repo has no description
1use crate::api::ApiError; 2use crate::auth::BearerAuth; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") { 17 if let Ok(value) = forwarded.to_str() { 18 if let Some(first_ip) = value.split(',').next() { 19 return first_ip.trim().to_string(); 20 } 21 } 22 } 23 if let Some(real_ip) = headers.get("x-real-ip") { 24 if let Ok(value) = real_ip.to_str() { 25 return value.trim().to_string(); 26 } 27 } 28 "unknown".to_string() 29} 30#[derive(Deserialize)] 31pub struct CreateSessionInput { 32 pub identifier: String, 33 pub password: String, 34} 35#[derive(Serialize)] 36#[serde(rename_all = "camelCase")] 37pub struct CreateSessionOutput { 38 pub access_jwt: String, 39 pub refresh_jwt: String, 40 pub handle: String, 41 pub did: String, 42} 43pub async fn create_session( 44 State(state): State<AppState>, 45 headers: HeaderMap, 46 Json(input): Json<CreateSessionInput>, 47) -> Response { 48 info!("create_session called"); 49 let client_ip = extract_client_ip(&headers); 50 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await { 51 warn!(ip = %client_ip, "Login rate limit exceeded"); 52 return ( 53 StatusCode::TOO_MANY_REQUESTS, 54 Json(json!({ 55 "error": "RateLimitExceeded", 56 "message": "Too many login attempts. Please try again later." 57 })), 58 ) 59 .into_response(); 60 } 61 let row = match sqlx::query!( 62 r#"SELECT 63 u.id, u.did, u.handle, u.password_hash, 64 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified, 65 k.key_bytes, k.encryption_version 66 FROM users u 67 JOIN user_keys k ON u.id = k.user_id 68 WHERE u.handle = $1 OR u.email = $1"#, 69 input.identifier 70 ) 71 .fetch_optional(&state.db) 72 .await 73 { 74 Ok(Some(row)) => row, 75 Ok(None) => { 76 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK"); 77 warn!("User not found for login attempt"); 78 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 79 } 80 Err(e) => { 81 error!("Database error fetching user: {:?}", e); 82 return ApiError::InternalError.into_response(); 83 } 84 }; 85 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 86 Ok(k) => k, 87 Err(e) => { 88 error!("Failed to decrypt user key: {:?}", e); 89 return ApiError::InternalError.into_response(); 90 } 91 }; 92 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) { 93 true 94 } else { 95 let app_passwords = sqlx::query!( 96 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 97 row.id 98 ) 99 .fetch_all(&state.db) 100 .await 101 .unwrap_or_default(); 102 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 103 }; 104 if !password_valid { 105 warn!("Password verification failed for login attempt"); 106 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response(); 107 } 108 let is_verified = row.email_confirmed 109 || row.discord_verified 110 || row.telegram_verified 111 || row.signal_verified; 112 if !is_verified { 113 warn!("Login attempt for unverified account: {}", row.did); 114 return ( 115 StatusCode::FORBIDDEN, 116 Json(json!({ 117 "error": "AccountNotVerified", 118 "message": "Please verify your account before logging in", 119 "did": row.did 120 })), 121 ).into_response(); 122 } 123 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 124 Ok(m) => m, 125 Err(e) => { 126 error!("Failed to create access token: {:?}", e); 127 return ApiError::InternalError.into_response(); 128 } 129 }; 130 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 131 Ok(m) => m, 132 Err(e) => { 133 error!("Failed to create refresh token: {:?}", e); 134 return ApiError::InternalError.into_response(); 135 } 136 }; 137 if let Err(e) = sqlx::query!( 138 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 139 row.did, 140 access_meta.jti, 141 refresh_meta.jti, 142 access_meta.expires_at, 143 refresh_meta.expires_at 144 ) 145 .execute(&state.db) 146 .await 147 { 148 error!("Failed to insert session: {:?}", e); 149 return ApiError::InternalError.into_response(); 150 } 151 Json(CreateSessionOutput { 152 access_jwt: access_meta.token, 153 refresh_jwt: refresh_meta.token, 154 handle: row.handle, 155 did: row.did, 156 }).into_response() 157} 158pub async fn get_session( 159 State(state): State<AppState>, 160 BearerAuth(auth_user): BearerAuth, 161) -> Response { 162 match sqlx::query!( 163 r#"SELECT 164 handle, email, email_confirmed, 165 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel", 166 discord_verified, telegram_verified, signal_verified 167 FROM users WHERE did = $1"#, 168 auth_user.did 169 ) 170 .fetch_optional(&state.db) 171 .await 172 { 173 Ok(Some(row)) => { 174 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 175 crate::notifications::NotificationChannel::Email => ("email", row.email_confirmed), 176 crate::notifications::NotificationChannel::Discord => ("discord", row.discord_verified), 177 crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified), 178 crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified), 179 }; 180 Json(json!({ 181 "handle": row.handle, 182 "did": auth_user.did, 183 "email": row.email, 184 "emailConfirmed": row.email_confirmed, 185 "preferredChannel": preferred_channel, 186 "preferredChannelVerified": preferred_channel_verified, 187 "didDoc": {} 188 })).into_response() 189 } 190 Ok(None) => ApiError::AuthenticationFailed.into_response(), 191 Err(e) => { 192 error!("Database error in get_session: {:?}", e); 193 ApiError::InternalError.into_response() 194 } 195 } 196} 197pub async fn delete_session( 198 State(state): State<AppState>, 199 headers: axum::http::HeaderMap, 200) -> Response { 201 let token = match crate::auth::extract_bearer_token_from_header( 202 headers.get("Authorization").and_then(|h| h.to_str().ok()) 203 ) { 204 Some(t) => t, 205 None => return ApiError::AuthenticationRequired.into_response(), 206 }; 207 let jti = match crate::auth::get_jti_from_token(&token) { 208 Ok(jti) => jti, 209 Err(_) => return ApiError::AuthenticationFailed.into_response(), 210 }; 211 let did = crate::auth::get_did_from_token(&token).ok(); 212 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 213 .execute(&state.db) 214 .await 215 { 216 Ok(res) if res.rows_affected() > 0 => { 217 if let Some(did) = did { 218 let session_cache_key = format!("auth:session:{}:{}", did, jti); 219 let _ = state.cache.delete(&session_cache_key).await; 220 } 221 Json(json!({})).into_response() 222 } 223 Ok(_) => ApiError::AuthenticationFailed.into_response(), 224 Err(e) => { 225 error!("Database error in delete_session: {:?}", e); 226 ApiError::AuthenticationFailed.into_response() 227 } 228 } 229} 230pub async fn refresh_session( 231 State(state): State<AppState>, 232 headers: axum::http::HeaderMap, 233) -> Response { 234 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 235 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await { 236 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 237 return ( 238 axum::http::StatusCode::TOO_MANY_REQUESTS, 239 axum::Json(serde_json::json!({ 240 "error": "RateLimitExceeded", 241 "message": "Too many requests. Please try again later." 242 })), 243 ).into_response(); 244 } 245 let refresh_token = match crate::auth::extract_bearer_token_from_header( 246 headers.get("Authorization").and_then(|h| h.to_str().ok()) 247 ) { 248 Some(t) => t, 249 None => return ApiError::AuthenticationRequired.into_response(), 250 }; 251 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 252 Ok(jti) => jti, 253 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(), 254 }; 255 let mut tx = match state.db.begin().await { 256 Ok(tx) => tx, 257 Err(e) => { 258 error!("Failed to begin transaction: {:?}", e); 259 return ApiError::InternalError.into_response(); 260 } 261 }; 262 if let Ok(Some(session_id)) = sqlx::query_scalar!( 263 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 264 refresh_jti 265 ) 266 .fetch_optional(&mut *tx) 267 .await 268 { 269 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id); 270 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 271 .execute(&mut *tx) 272 .await; 273 let _ = tx.commit().await; 274 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 275 } 276 let session_row = match sqlx::query!( 277 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 278 FROM session_tokens st 279 JOIN users u ON st.did = u.did 280 JOIN user_keys k ON u.id = k.user_id 281 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 282 FOR UPDATE OF st"#, 283 refresh_jti 284 ) 285 .fetch_optional(&mut *tx) 286 .await 287 { 288 Ok(Some(row)) => row, 289 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(), 290 Err(e) => { 291 error!("Database error fetching session: {:?}", e); 292 return ApiError::InternalError.into_response(); 293 } 294 }; 295 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 296 Ok(k) => k, 297 Err(e) => { 298 error!("Failed to decrypt user key: {:?}", e); 299 return ApiError::InternalError.into_response(); 300 } 301 }; 302 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 303 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 304 } 305 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 306 Ok(m) => m, 307 Err(e) => { 308 error!("Failed to create access token: {:?}", e); 309 return ApiError::InternalError.into_response(); 310 } 311 }; 312 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 313 Ok(m) => m, 314 Err(e) => { 315 error!("Failed to create refresh token: {:?}", e); 316 return ApiError::InternalError.into_response(); 317 } 318 }; 319 match sqlx::query!( 320 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 321 refresh_jti, 322 session_row.id 323 ) 324 .execute(&mut *tx) 325 .await 326 { 327 Ok(result) if result.rows_affected() == 0 => { 328 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 329 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 330 .execute(&mut *tx) 331 .await; 332 let _ = tx.commit().await; 333 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 334 } 335 Err(e) => { 336 error!("Failed to record used refresh token: {:?}", e); 337 return ApiError::InternalError.into_response(); 338 } 339 Ok(_) => {} 340 } 341 if let Err(e) = sqlx::query!( 342 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 343 new_access_meta.jti, 344 new_refresh_meta.jti, 345 new_access_meta.expires_at, 346 new_refresh_meta.expires_at, 347 session_row.id 348 ) 349 .execute(&mut *tx) 350 .await 351 { 352 error!("Database error updating session: {:?}", e); 353 return ApiError::InternalError.into_response(); 354 } 355 if let Err(e) = tx.commit().await { 356 error!("Failed to commit transaction: {:?}", e); 357 return ApiError::InternalError.into_response(); 358 } 359 match sqlx::query!( 360 r#"SELECT 361 handle, email, email_confirmed, 362 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel", 363 discord_verified, telegram_verified, signal_verified 364 FROM users WHERE did = $1"#, 365 session_row.did 366 ) 367 .fetch_optional(&state.db) 368 .await 369 { 370 Ok(Some(u)) => { 371 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 372 crate::notifications::NotificationChannel::Email => ("email", u.email_confirmed), 373 crate::notifications::NotificationChannel::Discord => ("discord", u.discord_verified), 374 crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified), 375 crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified), 376 }; 377 Json(json!({ 378 "accessJwt": new_access_meta.token, 379 "refreshJwt": new_refresh_meta.token, 380 "handle": u.handle, 381 "did": session_row.did, 382 "email": u.email, 383 "emailConfirmed": u.email_confirmed, 384 "preferredChannel": preferred_channel, 385 "preferredChannelVerified": preferred_channel_verified 386 })).into_response() 387 } 388 Ok(None) => { 389 error!("User not found for existing session: {}", session_row.did); 390 ApiError::InternalError.into_response() 391 } 392 Err(e) => { 393 error!("Database error fetching user: {:?}", e); 394 ApiError::InternalError.into_response() 395 } 396 } 397} 398#[derive(Deserialize)] 399#[serde(rename_all = "camelCase")] 400pub struct ConfirmSignupInput { 401 pub did: String, 402 pub verification_code: String, 403} 404#[derive(Serialize)] 405#[serde(rename_all = "camelCase")] 406pub struct ConfirmSignupOutput { 407 pub access_jwt: String, 408 pub refresh_jwt: String, 409 pub handle: String, 410 pub did: String, 411 pub email: Option<String>, 412 pub email_confirmed: bool, 413 pub preferred_channel: String, 414 pub preferred_channel_verified: bool, 415} 416pub async fn confirm_signup( 417 State(state): State<AppState>, 418 Json(input): Json<ConfirmSignupInput>, 419) -> Response { 420 info!("confirm_signup called for DID: {}", input.did); 421 let row = match sqlx::query!( 422 r#"SELECT 423 u.id, u.did, u.handle, u.email, 424 u.email_confirmation_code, 425 u.email_confirmation_code_expires_at, 426 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 427 k.key_bytes, k.encryption_version 428 FROM users u 429 JOIN user_keys k ON u.id = k.user_id 430 WHERE u.did = $1"#, 431 input.did 432 ) 433 .fetch_optional(&state.db) 434 .await 435 { 436 Ok(Some(row)) => row, 437 Ok(None) => { 438 warn!("User not found for confirm_signup: {}", input.did); 439 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response(); 440 } 441 Err(e) => { 442 error!("Database error in confirm_signup: {:?}", e); 443 return ApiError::InternalError.into_response(); 444 } 445 }; 446 let stored_code = match &row.email_confirmation_code { 447 Some(code) => code, 448 None => { 449 warn!("No verification code found for user: {}", input.did); 450 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 451 } 452 }; 453 if stored_code != &input.verification_code { 454 warn!("Invalid verification code for user: {}", input.did); 455 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 456 } 457 if let Some(expires_at) = row.email_confirmation_code_expires_at { 458 if expires_at < Utc::now() { 459 warn!("Verification code expired for user: {}", input.did); 460 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 461 } 462 } 463 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 464 Ok(k) => k, 465 Err(e) => { 466 error!("Failed to decrypt user key: {:?}", e); 467 return ApiError::InternalError.into_response(); 468 } 469 }; 470 let verified_column = match row.channel { 471 crate::notifications::NotificationChannel::Email => "email_confirmed", 472 crate::notifications::NotificationChannel::Discord => "discord_verified", 473 crate::notifications::NotificationChannel::Telegram => "telegram_verified", 474 crate::notifications::NotificationChannel::Signal => "signal_verified", 475 }; 476 let update_query = format!( 477 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1", 478 verified_column 479 ); 480 if let Err(e) = sqlx::query(&update_query) 481 .bind(&input.did) 482 .execute(&state.db) 483 .await 484 { 485 error!("Failed to update verification status: {:?}", e); 486 return ApiError::InternalError.into_response(); 487 } 488 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 489 Ok(m) => m, 490 Err(e) => { 491 error!("Failed to create access token: {:?}", e); 492 return ApiError::InternalError.into_response(); 493 } 494 }; 495 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 496 Ok(m) => m, 497 Err(e) => { 498 error!("Failed to create refresh token: {:?}", e); 499 return ApiError::InternalError.into_response(); 500 } 501 }; 502 if let Err(e) = sqlx::query!( 503 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 504 row.did, 505 access_meta.jti, 506 refresh_meta.jti, 507 access_meta.expires_at, 508 refresh_meta.expires_at 509 ) 510 .execute(&state.db) 511 .await 512 { 513 error!("Failed to insert session: {:?}", e); 514 return ApiError::InternalError.into_response(); 515 } 516 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 517 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await { 518 warn!("Failed to enqueue welcome notification: {:?}", e); 519 } 520 let email_confirmed = matches!(row.channel, crate::notifications::NotificationChannel::Email); 521 let preferred_channel = match row.channel { 522 crate::notifications::NotificationChannel::Email => "email", 523 crate::notifications::NotificationChannel::Discord => "discord", 524 crate::notifications::NotificationChannel::Telegram => "telegram", 525 crate::notifications::NotificationChannel::Signal => "signal", 526 }; 527 Json(ConfirmSignupOutput { 528 access_jwt: access_meta.token, 529 refresh_jwt: refresh_meta.token, 530 handle: row.handle, 531 did: row.did, 532 email: row.email, 533 email_confirmed, 534 preferred_channel: preferred_channel.to_string(), 535 preferred_channel_verified: true, 536 }).into_response() 537} 538#[derive(Deserialize)] 539#[serde(rename_all = "camelCase")] 540pub struct ResendVerificationInput { 541 pub did: String, 542} 543pub async fn resend_verification( 544 State(state): State<AppState>, 545 Json(input): Json<ResendVerificationInput>, 546) -> Response { 547 info!("resend_verification called for DID: {}", input.did); 548 let row = match sqlx::query!( 549 r#"SELECT 550 id, handle, email, 551 preferred_notification_channel as "channel: crate::notifications::NotificationChannel", 552 discord_id, telegram_username, signal_number, 553 email_confirmed, discord_verified, telegram_verified, signal_verified 554 FROM users 555 WHERE did = $1"#, 556 input.did 557 ) 558 .fetch_optional(&state.db) 559 .await 560 { 561 Ok(Some(row)) => row, 562 Ok(None) => { 563 return ApiError::InvalidRequest("User not found".into()).into_response(); 564 } 565 Err(e) => { 566 error!("Database error in resend_verification: {:?}", e); 567 return ApiError::InternalError.into_response(); 568 } 569 }; 570 let is_verified = row.email_confirmed 571 || row.discord_verified 572 || row.telegram_verified 573 || row.signal_verified; 574 if is_verified { 575 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 576 } 577 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 578 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 579 if let Err(e) = sqlx::query!( 580 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3", 581 verification_code, 582 code_expires_at, 583 input.did 584 ) 585 .execute(&state.db) 586 .await 587 { 588 error!("Failed to update verification code: {:?}", e); 589 return ApiError::InternalError.into_response(); 590 } 591 let (channel_str, recipient) = match row.channel { 592 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()), 593 crate::notifications::NotificationChannel::Discord => { 594 ("discord", row.discord_id.unwrap_or_default()) 595 } 596 crate::notifications::NotificationChannel::Telegram => { 597 ("telegram", row.telegram_username.unwrap_or_default()) 598 } 599 crate::notifications::NotificationChannel::Signal => { 600 ("signal", row.signal_number.unwrap_or_default()) 601 } 602 }; 603 if let Err(e) = crate::notifications::enqueue_signup_verification( 604 &state.db, 605 row.id, 606 channel_str, 607 &recipient, 608 &verification_code, 609 ).await { 610 warn!("Failed to enqueue verification notification: {:?}", e); 611 } 612 Json(json!({"success": true})).into_response() 613}