this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") 17 && let Ok(value) = forwarded.to_str() 18 && let Some(first_ip) = value.split(',').next() 19 { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() 24 { 25 return value.trim().to_string(); 26 } 27 "unknown".to_string() 28} 29 30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 31 let identifier = identifier.trim(); 32 if identifier.contains('@') || identifier.starts_with("did:") { 33 identifier.to_string() 34 } else if !identifier.contains('.') { 35 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 36 } else { 37 identifier.to_lowercase() 38 } 39} 40 41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 42 stored_handle.to_string() 43} 44 45#[derive(Deserialize)] 46pub struct CreateSessionInput { 47 pub identifier: String, 48 pub password: String, 49} 50 51#[derive(Serialize)] 52#[serde(rename_all = "camelCase")] 53pub struct CreateSessionOutput { 54 pub access_jwt: String, 55 pub refresh_jwt: String, 56 pub handle: String, 57 pub did: String, 58} 59 60pub async fn create_session( 61 State(state): State<AppState>, 62 headers: HeaderMap, 63 Json(input): Json<CreateSessionInput>, 64) -> Response { 65 info!( 66 "create_session called with identifier: {}", 67 input.identifier 68 ); 69 let client_ip = extract_client_ip(&headers); 70 if !state 71 .check_rate_limit(RateLimitKind::Login, &client_ip) 72 .await 73 { 74 warn!(ip = %client_ip, "Login rate limit exceeded"); 75 return ( 76 StatusCode::TOO_MANY_REQUESTS, 77 Json(json!({ 78 "error": "RateLimitExceeded", 79 "message": "Too many login attempts. Please try again later." 80 })), 81 ) 82 .into_response(); 83 } 84 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 85 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 86 info!( 87 "Normalized identifier: {} -> {}", 88 input.identifier, normalized_identifier 89 ); 90 let row = match sqlx::query!( 91 r#"SELECT 92 u.id, u.did, u.handle, u.password_hash, 93 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 94 u.allow_legacy_login, 95 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 96 k.key_bytes, k.encryption_version, 97 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 98 FROM users u 99 JOIN user_keys k ON u.id = k.user_id 100 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 101 normalized_identifier 102 ) 103 .fetch_optional(&state.db) 104 .await 105 { 106 Ok(Some(row)) => row, 107 Ok(None) => { 108 let _ = verify( 109 &input.password, 110 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 111 ); 112 warn!("User not found for login attempt"); 113 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 114 .into_response(); 115 } 116 Err(e) => { 117 error!("Database error fetching user: {:?}", e); 118 return ApiError::InternalError.into_response(); 119 } 120 }; 121 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 122 Ok(k) => k, 123 Err(e) => { 124 error!("Failed to decrypt user key: {:?}", e); 125 return ApiError::InternalError.into_response(); 126 } 127 }; 128 let password_valid = if row 129 .password_hash 130 .as_ref() 131 .map(|h| verify(&input.password, h).unwrap_or(false)) 132 .unwrap_or(false) 133 { 134 true 135 } else { 136 let app_passwords = sqlx::query!( 137 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 138 row.id 139 ) 140 .fetch_all(&state.db) 141 .await 142 .unwrap_or_default(); 143 app_passwords 144 .iter() 145 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 146 }; 147 if !password_valid { 148 warn!("Password verification failed for login attempt"); 149 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 150 .into_response(); 151 } 152 let is_verified = 153 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 154 if !is_verified { 155 warn!("Login attempt for unverified account: {}", row.did); 156 return ( 157 StatusCode::FORBIDDEN, 158 Json(json!({ 159 "error": "AccountNotVerified", 160 "message": "Please verify your account before logging in", 161 "did": row.did 162 })), 163 ) 164 .into_response(); 165 } 166 let has_totp = row.totp_enabled.unwrap_or(false); 167 let is_legacy_login = has_totp; 168 if has_totp && !row.allow_legacy_login { 169 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 170 return ( 171 StatusCode::FORBIDDEN, 172 Json(json!({ 173 "error": "MfaRequired", 174 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 175 "did": row.did 176 })), 177 ) 178 .into_response(); 179 } 180 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 181 Ok(m) => m, 182 Err(e) => { 183 error!("Failed to create access token: {:?}", e); 184 return ApiError::InternalError.into_response(); 185 } 186 }; 187 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 188 Ok(m) => m, 189 Err(e) => { 190 error!("Failed to create refresh token: {:?}", e); 191 return ApiError::InternalError.into_response(); 192 } 193 }; 194 if let Err(e) = sqlx::query!( 195 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)", 196 row.did, 197 access_meta.jti, 198 refresh_meta.jti, 199 access_meta.expires_at, 200 refresh_meta.expires_at, 201 is_legacy_login, 202 false 203 ) 204 .execute(&state.db) 205 .await 206 { 207 error!("Failed to insert session: {:?}", e); 208 return ApiError::InternalError.into_response(); 209 } 210 if is_legacy_login { 211 warn!( 212 did = %row.did, 213 ip = %client_ip, 214 "Legacy login on TOTP-enabled account - sending notification" 215 ); 216 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 217 if let Err(e) = crate::comms::queue_legacy_login_notification( 218 &state.db, 219 row.id, 220 &hostname, 221 &client_ip, 222 row.preferred_comms_channel, 223 ) 224 .await 225 { 226 error!("Failed to queue legacy login notification: {:?}", e); 227 } 228 } 229 let handle = full_handle(&row.handle, &pds_hostname); 230 Json(CreateSessionOutput { 231 access_jwt: access_meta.token, 232 refresh_jwt: refresh_meta.token, 233 handle, 234 did: row.did, 235 }) 236 .into_response() 237} 238 239pub async fn get_session( 240 State(state): State<AppState>, 241 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 242) -> Response { 243 let permissions = auth_user.permissions(); 244 let can_read_email = permissions.allows_email_read(); 245 246 match sqlx::query!( 247 r#"SELECT 248 handle, email, email_verified, is_admin, deactivated_at, preferred_locale, 249 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 250 discord_verified, telegram_verified, signal_verified 251 FROM users WHERE did = $1"#, 252 auth_user.did 253 ) 254 .fetch_optional(&state.db) 255 .await 256 { 257 Ok(Some(row)) => { 258 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 259 crate::comms::CommsChannel::Email => ("email", row.email_verified), 260 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 261 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 262 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 263 }; 264 let pds_hostname = 265 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 266 let handle = full_handle(&row.handle, &pds_hostname); 267 let is_active = row.deactivated_at.is_none(); 268 let email_value = if can_read_email { 269 row.email.clone() 270 } else { 271 None 272 }; 273 let email_verified_value = can_read_email && row.email_verified; 274 Json(json!({ 275 "handle": handle, 276 "did": auth_user.did, 277 "email": email_value, 278 "emailVerified": email_verified_value, 279 "preferredChannel": preferred_channel, 280 "preferredChannelVerified": preferred_channel_verified, 281 "preferredLocale": row.preferred_locale, 282 "isAdmin": row.is_admin, 283 "active": is_active, 284 "status": if is_active { "active" } else { "deactivated" }, 285 "didDoc": {} 286 })) 287 .into_response() 288 } 289 Ok(None) => ApiError::AuthenticationFailed.into_response(), 290 Err(e) => { 291 error!("Database error in get_session: {:?}", e); 292 ApiError::InternalError.into_response() 293 } 294 } 295} 296 297pub async fn delete_session( 298 State(state): State<AppState>, 299 headers: axum::http::HeaderMap, 300) -> Response { 301 let token = match crate::auth::extract_bearer_token_from_header( 302 headers.get("Authorization").and_then(|h| h.to_str().ok()), 303 ) { 304 Some(t) => t, 305 None => return ApiError::AuthenticationRequired.into_response(), 306 }; 307 let jti = match crate::auth::get_jti_from_token(&token) { 308 Ok(jti) => jti, 309 Err(_) => return ApiError::AuthenticationFailed.into_response(), 310 }; 311 let did = crate::auth::get_did_from_token(&token).ok(); 312 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 313 .execute(&state.db) 314 .await 315 { 316 Ok(res) if res.rows_affected() > 0 => { 317 if let Some(did) = did { 318 let session_cache_key = format!("auth:session:{}:{}", did, jti); 319 let _ = state.cache.delete(&session_cache_key).await; 320 } 321 Json(json!({})).into_response() 322 } 323 Ok(_) => ApiError::AuthenticationFailed.into_response(), 324 Err(e) => { 325 error!("Database error in delete_session: {:?}", e); 326 ApiError::AuthenticationFailed.into_response() 327 } 328 } 329} 330 331pub async fn refresh_session( 332 State(state): State<AppState>, 333 headers: axum::http::HeaderMap, 334) -> Response { 335 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 336 if !state 337 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 338 .await 339 { 340 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 341 return ( 342 axum::http::StatusCode::TOO_MANY_REQUESTS, 343 axum::Json(serde_json::json!({ 344 "error": "RateLimitExceeded", 345 "message": "Too many requests. Please try again later." 346 })), 347 ) 348 .into_response(); 349 } 350 let refresh_token = match crate::auth::extract_bearer_token_from_header( 351 headers.get("Authorization").and_then(|h| h.to_str().ok()), 352 ) { 353 Some(t) => t, 354 None => return ApiError::AuthenticationRequired.into_response(), 355 }; 356 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 357 Ok(jti) => jti, 358 Err(_) => { 359 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 360 .into_response(); 361 } 362 }; 363 let mut tx = match state.db.begin().await { 364 Ok(tx) => tx, 365 Err(e) => { 366 error!("Failed to begin transaction: {:?}", e); 367 return ApiError::InternalError.into_response(); 368 } 369 }; 370 if let Ok(Some(session_id)) = sqlx::query_scalar!( 371 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 372 refresh_jti 373 ) 374 .fetch_optional(&mut *tx) 375 .await 376 { 377 warn!( 378 "Refresh token reuse detected! Revoking token family for session_id: {}", 379 session_id 380 ); 381 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 382 .execute(&mut *tx) 383 .await; 384 let _ = tx.commit().await; 385 return ApiError::ExpiredTokenMsg( 386 "Refresh token has been revoked due to suspected compromise".into(), 387 ) 388 .into_response(); 389 } 390 let session_row = match sqlx::query!( 391 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 392 FROM session_tokens st 393 JOIN users u ON st.did = u.did 394 JOIN user_keys k ON u.id = k.user_id 395 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 396 FOR UPDATE OF st"#, 397 refresh_jti 398 ) 399 .fetch_optional(&mut *tx) 400 .await 401 { 402 Ok(Some(row)) => row, 403 Ok(None) => { 404 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 405 .into_response(); 406 } 407 Err(e) => { 408 error!("Database error fetching session: {:?}", e); 409 return ApiError::InternalError.into_response(); 410 } 411 }; 412 let key_bytes = 413 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 414 Ok(k) => k, 415 Err(e) => { 416 error!("Failed to decrypt user key: {:?}", e); 417 return ApiError::InternalError.into_response(); 418 } 419 }; 420 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 421 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 422 } 423 let new_access_meta = 424 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 425 Ok(m) => m, 426 Err(e) => { 427 error!("Failed to create access token: {:?}", e); 428 return ApiError::InternalError.into_response(); 429 } 430 }; 431 let new_refresh_meta = 432 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 433 Ok(m) => m, 434 Err(e) => { 435 error!("Failed to create refresh token: {:?}", e); 436 return ApiError::InternalError.into_response(); 437 } 438 }; 439 match sqlx::query!( 440 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 441 refresh_jti, 442 session_row.id 443 ) 444 .execute(&mut *tx) 445 .await 446 { 447 Ok(result) if result.rows_affected() == 0 => { 448 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 449 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 450 .execute(&mut *tx) 451 .await; 452 let _ = tx.commit().await; 453 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 454 } 455 Err(e) => { 456 error!("Failed to record used refresh token: {:?}", e); 457 return ApiError::InternalError.into_response(); 458 } 459 Ok(_) => {} 460 } 461 if let Err(e) = sqlx::query!( 462 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 463 new_access_meta.jti, 464 new_refresh_meta.jti, 465 new_access_meta.expires_at, 466 new_refresh_meta.expires_at, 467 session_row.id 468 ) 469 .execute(&mut *tx) 470 .await 471 { 472 error!("Database error updating session: {:?}", e); 473 return ApiError::InternalError.into_response(); 474 } 475 if let Err(e) = tx.commit().await { 476 error!("Failed to commit transaction: {:?}", e); 477 return ApiError::InternalError.into_response(); 478 } 479 match sqlx::query!( 480 r#"SELECT 481 handle, email, email_verified, is_admin, preferred_locale, 482 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 483 discord_verified, telegram_verified, signal_verified 484 FROM users WHERE did = $1"#, 485 session_row.did 486 ) 487 .fetch_optional(&state.db) 488 .await 489 { 490 Ok(Some(u)) => { 491 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 492 crate::comms::CommsChannel::Email => ("email", u.email_verified), 493 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 494 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 495 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 496 }; 497 let pds_hostname = 498 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 499 let handle = full_handle(&u.handle, &pds_hostname); 500 Json(json!({ 501 "accessJwt": new_access_meta.token, 502 "refreshJwt": new_refresh_meta.token, 503 "handle": handle, 504 "did": session_row.did, 505 "email": u.email, 506 "emailVerified": u.email_verified, 507 "preferredChannel": preferred_channel, 508 "preferredChannelVerified": preferred_channel_verified, 509 "preferredLocale": u.preferred_locale, 510 "isAdmin": u.is_admin, 511 "active": true 512 })) 513 .into_response() 514 } 515 Ok(None) => { 516 error!("User not found for existing session: {}", session_row.did); 517 ApiError::InternalError.into_response() 518 } 519 Err(e) => { 520 error!("Database error fetching user: {:?}", e); 521 ApiError::InternalError.into_response() 522 } 523 } 524} 525 526#[derive(Deserialize)] 527#[serde(rename_all = "camelCase")] 528pub struct ConfirmSignupInput { 529 pub did: String, 530 pub verification_code: String, 531} 532 533#[derive(Serialize)] 534#[serde(rename_all = "camelCase")] 535pub struct ConfirmSignupOutput { 536 pub access_jwt: String, 537 pub refresh_jwt: String, 538 pub handle: String, 539 pub did: String, 540 pub email: Option<String>, 541 pub email_verified: bool, 542 pub preferred_channel: String, 543 pub preferred_channel_verified: bool, 544} 545 546pub async fn confirm_signup( 547 State(state): State<AppState>, 548 Json(input): Json<ConfirmSignupInput>, 549) -> Response { 550 info!("confirm_signup called for DID: {}", input.did); 551 let row = match sqlx::query!( 552 r#"SELECT 553 u.id, u.did, u.handle, u.email, 554 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 555 u.discord_id, u.telegram_username, u.signal_number, 556 k.key_bytes, k.encryption_version 557 FROM users u 558 JOIN user_keys k ON u.id = k.user_id 559 WHERE u.did = $1"#, 560 input.did 561 ) 562 .fetch_optional(&state.db) 563 .await 564 { 565 Ok(Some(row)) => row, 566 Ok(None) => { 567 warn!("User not found for confirm_signup: {}", input.did); 568 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 569 .into_response(); 570 } 571 Err(e) => { 572 error!("Database error in confirm_signup: {:?}", e); 573 return ApiError::InternalError.into_response(); 574 } 575 }; 576 577 let (channel_str, identifier) = match row.channel { 578 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 579 crate::comms::CommsChannel::Discord => { 580 ("discord", row.discord_id.clone().unwrap_or_default()) 581 } 582 crate::comms::CommsChannel::Telegram => ( 583 "telegram", 584 row.telegram_username.clone().unwrap_or_default(), 585 ), 586 crate::comms::CommsChannel::Signal => { 587 ("signal", row.signal_number.clone().unwrap_or_default()) 588 } 589 }; 590 591 let normalized_token = 592 crate::auth::verification_token::normalize_token_input(&input.verification_code); 593 match crate::auth::verification_token::verify_signup_token( 594 &normalized_token, 595 channel_str, 596 &identifier, 597 ) { 598 Ok(token_data) => { 599 if token_data.did != input.did { 600 warn!( 601 "Token DID mismatch for confirm_signup: expected {}, got {}", 602 input.did, token_data.did 603 ); 604 return ApiError::InvalidRequest("Invalid verification code".into()) 605 .into_response(); 606 } 607 } 608 Err(crate::auth::verification_token::VerifyError::Expired) => { 609 warn!("Verification code expired for user: {}", input.did); 610 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 611 .into_response(); 612 } 613 Err(e) => { 614 warn!("Invalid verification code for user {}: {:?}", input.did, e); 615 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 616 } 617 } 618 619 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 620 Ok(k) => k, 621 Err(e) => { 622 error!("Failed to decrypt user key: {:?}", e); 623 return ApiError::InternalError.into_response(); 624 } 625 }; 626 let verified_column = match row.channel { 627 crate::comms::CommsChannel::Email => "email_verified", 628 crate::comms::CommsChannel::Discord => "discord_verified", 629 crate::comms::CommsChannel::Telegram => "telegram_verified", 630 crate::comms::CommsChannel::Signal => "signal_verified", 631 }; 632 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 633 if let Err(e) = sqlx::query(&update_query) 634 .bind(&input.did) 635 .execute(&state.db) 636 .await 637 { 638 error!("Failed to update verification status: {:?}", e); 639 return ApiError::InternalError.into_response(); 640 } 641 642 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 643 Ok(m) => m, 644 Err(e) => { 645 error!("Failed to create access token: {:?}", e); 646 return ApiError::InternalError.into_response(); 647 } 648 }; 649 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 650 Ok(m) => m, 651 Err(e) => { 652 error!("Failed to create refresh token: {:?}", e); 653 return ApiError::InternalError.into_response(); 654 } 655 }; 656 if let Err(e) = sqlx::query!( 657 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)", 658 row.did, 659 access_meta.jti, 660 refresh_meta.jti, 661 access_meta.expires_at, 662 refresh_meta.expires_at, 663 false, 664 false 665 ) 666 .execute(&state.db) 667 .await 668 { 669 error!("Failed to insert session: {:?}", e); 670 return ApiError::InternalError.into_response(); 671 } 672 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 673 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 674 warn!("Failed to enqueue welcome notification: {:?}", e); 675 } 676 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 677 let preferred_channel = match row.channel { 678 crate::comms::CommsChannel::Email => "email", 679 crate::comms::CommsChannel::Discord => "discord", 680 crate::comms::CommsChannel::Telegram => "telegram", 681 crate::comms::CommsChannel::Signal => "signal", 682 }; 683 Json(ConfirmSignupOutput { 684 access_jwt: access_meta.token, 685 refresh_jwt: refresh_meta.token, 686 handle: row.handle, 687 did: row.did, 688 email: row.email, 689 email_verified, 690 preferred_channel: preferred_channel.to_string(), 691 preferred_channel_verified: true, 692 }) 693 .into_response() 694} 695 696#[derive(Deserialize)] 697#[serde(rename_all = "camelCase")] 698pub struct ResendVerificationInput { 699 pub did: String, 700} 701 702pub async fn resend_verification( 703 State(state): State<AppState>, 704 Json(input): Json<ResendVerificationInput>, 705) -> Response { 706 info!("resend_verification called for DID: {}", input.did); 707 let row = match sqlx::query!( 708 r#"SELECT 709 id, handle, email, 710 preferred_comms_channel as "channel: crate::comms::CommsChannel", 711 discord_id, telegram_username, signal_number, 712 email_verified, discord_verified, telegram_verified, signal_verified 713 FROM users 714 WHERE did = $1"#, 715 input.did 716 ) 717 .fetch_optional(&state.db) 718 .await 719 { 720 Ok(Some(row)) => row, 721 Ok(None) => { 722 return ApiError::InvalidRequest("User not found".into()).into_response(); 723 } 724 Err(e) => { 725 error!("Database error in resend_verification: {:?}", e); 726 return ApiError::InternalError.into_response(); 727 } 728 }; 729 let is_verified = 730 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 731 if is_verified { 732 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 733 } 734 735 let (channel_str, recipient) = match row.channel { 736 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 737 crate::comms::CommsChannel::Discord => { 738 ("discord", row.discord_id.clone().unwrap_or_default()) 739 } 740 crate::comms::CommsChannel::Telegram => ( 741 "telegram", 742 row.telegram_username.clone().unwrap_or_default(), 743 ), 744 crate::comms::CommsChannel::Signal => { 745 ("signal", row.signal_number.clone().unwrap_or_default()) 746 } 747 }; 748 749 let verification_token = 750 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 751 let formatted_token = 752 crate::auth::verification_token::format_token_for_display(&verification_token); 753 754 if let Err(e) = crate::comms::enqueue_signup_verification( 755 &state.db, 756 row.id, 757 channel_str, 758 &recipient, 759 &formatted_token, 760 None, 761 ) 762 .await 763 { 764 warn!("Failed to enqueue verification notification: {:?}", e); 765 } 766 Json(json!({"success": true})).into_response() 767} 768 769#[derive(Serialize)] 770#[serde(rename_all = "camelCase")] 771pub struct SessionInfo { 772 pub id: String, 773 pub session_type: String, 774 pub client_name: Option<String>, 775 pub created_at: String, 776 pub expires_at: String, 777 pub is_current: bool, 778} 779 780#[derive(Serialize)] 781#[serde(rename_all = "camelCase")] 782pub struct ListSessionsOutput { 783 pub sessions: Vec<SessionInfo>, 784} 785 786pub async fn list_sessions( 787 State(state): State<AppState>, 788 headers: HeaderMap, 789 auth: BearerAuth, 790) -> Response { 791 let current_jti = headers 792 .get("authorization") 793 .and_then(|v| v.to_str().ok()) 794 .and_then(|v| v.strip_prefix("Bearer ")) 795 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 796 797 let mut sessions: Vec<SessionInfo> = Vec::new(); 798 799 let jwt_result = sqlx::query_as::< 800 _, 801 ( 802 i32, 803 String, 804 chrono::DateTime<chrono::Utc>, 805 chrono::DateTime<chrono::Utc>, 806 ), 807 >( 808 r#" 809 SELECT id, access_jti, created_at, refresh_expires_at 810 FROM session_tokens 811 WHERE did = $1 AND refresh_expires_at > NOW() 812 ORDER BY created_at DESC 813 "#, 814 ) 815 .bind(&auth.0.did) 816 .fetch_all(&state.db) 817 .await; 818 819 match jwt_result { 820 Ok(rows) => { 821 for (id, access_jti, created_at, expires_at) in rows { 822 sessions.push(SessionInfo { 823 id: format!("jwt:{}", id), 824 session_type: "legacy".to_string(), 825 client_name: None, 826 created_at: created_at.to_rfc3339(), 827 expires_at: expires_at.to_rfc3339(), 828 is_current: current_jti.as_ref() == Some(&access_jti), 829 }); 830 } 831 } 832 Err(e) => { 833 error!("DB error fetching JWT sessions: {:?}", e); 834 return ( 835 StatusCode::INTERNAL_SERVER_ERROR, 836 Json(json!({"error": "InternalError"})), 837 ) 838 .into_response(); 839 } 840 } 841 842 let oauth_result = sqlx::query_as::< 843 _, 844 ( 845 i32, 846 String, 847 chrono::DateTime<chrono::Utc>, 848 chrono::DateTime<chrono::Utc>, 849 String, 850 ), 851 >( 852 r#" 853 SELECT id, token_id, created_at, expires_at, client_id 854 FROM oauth_token 855 WHERE did = $1 AND expires_at > NOW() 856 ORDER BY created_at DESC 857 "#, 858 ) 859 .bind(&auth.0.did) 860 .fetch_all(&state.db) 861 .await; 862 863 match oauth_result { 864 Ok(rows) => { 865 for (id, token_id, created_at, expires_at, client_id) in rows { 866 let client_name = extract_client_name(&client_id); 867 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 868 sessions.push(SessionInfo { 869 id: format!("oauth:{}", id), 870 session_type: "oauth".to_string(), 871 client_name: Some(client_name), 872 created_at: created_at.to_rfc3339(), 873 expires_at: expires_at.to_rfc3339(), 874 is_current: is_current_oauth, 875 }); 876 } 877 } 878 Err(e) => { 879 error!("DB error fetching OAuth sessions: {:?}", e); 880 return ( 881 StatusCode::INTERNAL_SERVER_ERROR, 882 Json(json!({"error": "InternalError"})), 883 ) 884 .into_response(); 885 } 886 } 887 888 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 889 890 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 891} 892 893fn extract_client_name(client_id: &str) -> String { 894 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 895 "Localhost App".to_string() 896 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 897 parsed.host_str().unwrap_or("Unknown App").to_string() 898 } else { 899 client_id.to_string() 900 } 901} 902 903#[derive(Deserialize)] 904#[serde(rename_all = "camelCase")] 905pub struct RevokeSessionInput { 906 pub session_id: String, 907} 908 909pub async fn revoke_session( 910 State(state): State<AppState>, 911 auth: BearerAuth, 912 Json(input): Json<RevokeSessionInput>, 913) -> Response { 914 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 915 let session_id: i32 = match jwt_id.parse() { 916 Ok(id) => id, 917 Err(_) => { 918 return ( 919 StatusCode::BAD_REQUEST, 920 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 921 ) 922 .into_response(); 923 } 924 }; 925 let session = sqlx::query_as::<_, (String,)>( 926 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 927 ) 928 .bind(session_id) 929 .bind(&auth.0.did) 930 .fetch_optional(&state.db) 931 .await; 932 let access_jti = match session { 933 Ok(Some((jti,))) => jti, 934 Ok(None) => { 935 return ( 936 StatusCode::NOT_FOUND, 937 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 938 ) 939 .into_response(); 940 } 941 Err(e) => { 942 error!("DB error in revoke_session: {:?}", e); 943 return ( 944 StatusCode::INTERNAL_SERVER_ERROR, 945 Json(json!({"error": "InternalError"})), 946 ) 947 .into_response(); 948 } 949 }; 950 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 951 .bind(session_id) 952 .execute(&state.db) 953 .await 954 { 955 error!("DB error deleting session: {:?}", e); 956 return ( 957 StatusCode::INTERNAL_SERVER_ERROR, 958 Json(json!({"error": "InternalError"})), 959 ) 960 .into_response(); 961 } 962 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 963 if let Err(e) = state.cache.delete(&cache_key).await { 964 warn!("Failed to invalidate session cache: {:?}", e); 965 } 966 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 967 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 968 let session_id: i32 = match oauth_id.parse() { 969 Ok(id) => id, 970 Err(_) => { 971 return ( 972 StatusCode::BAD_REQUEST, 973 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 974 ) 975 .into_response(); 976 } 977 }; 978 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 979 .bind(session_id) 980 .bind(&auth.0.did) 981 .execute(&state.db) 982 .await; 983 match result { 984 Ok(r) if r.rows_affected() == 0 => { 985 return ( 986 StatusCode::NOT_FOUND, 987 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 988 ) 989 .into_response(); 990 } 991 Err(e) => { 992 error!("DB error deleting OAuth session: {:?}", e); 993 return ( 994 StatusCode::INTERNAL_SERVER_ERROR, 995 Json(json!({"error": "InternalError"})), 996 ) 997 .into_response(); 998 } 999 _ => {} 1000 } 1001 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1002 } else { 1003 return ( 1004 StatusCode::BAD_REQUEST, 1005 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1006 ) 1007 .into_response(); 1008 } 1009 (StatusCode::OK, Json(json!({}))).into_response() 1010} 1011 1012pub async fn revoke_all_sessions( 1013 State(state): State<AppState>, 1014 headers: HeaderMap, 1015 auth: BearerAuth, 1016) -> Response { 1017 let current_jti = headers 1018 .get("authorization") 1019 .and_then(|v| v.to_str().ok()) 1020 .and_then(|v| v.strip_prefix("Bearer ")) 1021 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1022 1023 if let Some(ref jti) = current_jti { 1024 if auth.0.is_oauth { 1025 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1026 .bind(&auth.0.did) 1027 .execute(&state.db) 1028 .await 1029 { 1030 error!("DB error revoking JWT sessions: {:?}", e); 1031 return ( 1032 StatusCode::INTERNAL_SERVER_ERROR, 1033 Json(json!({"error": "InternalError"})), 1034 ) 1035 .into_response(); 1036 } 1037 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1038 .bind(&auth.0.did) 1039 .bind(jti) 1040 .execute(&state.db) 1041 .await 1042 { 1043 error!("DB error revoking OAuth sessions: {:?}", e); 1044 return ( 1045 StatusCode::INTERNAL_SERVER_ERROR, 1046 Json(json!({"error": "InternalError"})), 1047 ) 1048 .into_response(); 1049 } 1050 } else { 1051 if let Err(e) = 1052 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1053 .bind(&auth.0.did) 1054 .bind(jti) 1055 .execute(&state.db) 1056 .await 1057 { 1058 error!("DB error revoking JWT sessions: {:?}", e); 1059 return ( 1060 StatusCode::INTERNAL_SERVER_ERROR, 1061 Json(json!({"error": "InternalError"})), 1062 ) 1063 .into_response(); 1064 } 1065 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1066 .bind(&auth.0.did) 1067 .execute(&state.db) 1068 .await 1069 { 1070 error!("DB error revoking OAuth sessions: {:?}", e); 1071 return ( 1072 StatusCode::INTERNAL_SERVER_ERROR, 1073 Json(json!({"error": "InternalError"})), 1074 ) 1075 .into_response(); 1076 } 1077 } 1078 } else { 1079 return ( 1080 StatusCode::BAD_REQUEST, 1081 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1082 ) 1083 .into_response(); 1084 } 1085 1086 info!(did = %auth.0.did, "All other sessions revoked"); 1087 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1088} 1089 1090#[derive(Serialize)] 1091#[serde(rename_all = "camelCase")] 1092pub struct LegacyLoginPreferenceOutput { 1093 pub allow_legacy_login: bool, 1094 pub has_mfa: bool, 1095} 1096 1097pub async fn get_legacy_login_preference( 1098 State(state): State<AppState>, 1099 auth: BearerAuth, 1100) -> Response { 1101 let result = sqlx::query!( 1102 r#"SELECT 1103 u.allow_legacy_login, 1104 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1105 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1106 FROM users u WHERE u.did = $1"#, 1107 auth.0.did 1108 ) 1109 .fetch_optional(&state.db) 1110 .await; 1111 1112 match result { 1113 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1114 allow_legacy_login: row.allow_legacy_login, 1115 has_mfa: row.has_mfa, 1116 }) 1117 .into_response(), 1118 Ok(None) => ( 1119 StatusCode::NOT_FOUND, 1120 Json(json!({"error": "AccountNotFound"})), 1121 ) 1122 .into_response(), 1123 Err(e) => { 1124 error!("DB error: {:?}", e); 1125 ( 1126 StatusCode::INTERNAL_SERVER_ERROR, 1127 Json(json!({"error": "InternalError"})), 1128 ) 1129 .into_response() 1130 } 1131 } 1132} 1133 1134#[derive(Deserialize)] 1135#[serde(rename_all = "camelCase")] 1136pub struct UpdateLegacyLoginInput { 1137 pub allow_legacy_login: bool, 1138} 1139 1140pub async fn update_legacy_login_preference( 1141 State(state): State<AppState>, 1142 auth: BearerAuth, 1143 Json(input): Json<UpdateLegacyLoginInput>, 1144) -> Response { 1145 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1146 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1147 .await; 1148 } 1149 1150 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1151 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1152 } 1153 1154 let result = sqlx::query!( 1155 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1156 input.allow_legacy_login, 1157 auth.0.did 1158 ) 1159 .fetch_optional(&state.db) 1160 .await; 1161 1162 match result { 1163 Ok(Some(_)) => { 1164 info!( 1165 did = %auth.0.did, 1166 allow_legacy_login = input.allow_legacy_login, 1167 "Legacy login preference updated" 1168 ); 1169 Json(json!({ 1170 "allowLegacyLogin": input.allow_legacy_login 1171 })) 1172 .into_response() 1173 } 1174 Ok(None) => ( 1175 StatusCode::NOT_FOUND, 1176 Json(json!({"error": "AccountNotFound"})), 1177 ) 1178 .into_response(), 1179 Err(e) => { 1180 error!("DB error: {:?}", e); 1181 ( 1182 StatusCode::INTERNAL_SERVER_ERROR, 1183 Json(json!({"error": "InternalError"})), 1184 ) 1185 .into_response() 1186 } 1187 } 1188} 1189 1190use crate::comms::locale::VALID_LOCALES; 1191 1192#[derive(Deserialize)] 1193#[serde(rename_all = "camelCase")] 1194pub struct UpdateLocaleInput { 1195 pub preferred_locale: String, 1196} 1197 1198pub async fn update_locale( 1199 State(state): State<AppState>, 1200 auth: BearerAuth, 1201 Json(input): Json<UpdateLocaleInput>, 1202) -> Response { 1203 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1204 return ( 1205 StatusCode::BAD_REQUEST, 1206 Json(json!({ 1207 "error": "InvalidRequest", 1208 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1209 })), 1210 ) 1211 .into_response(); 1212 } 1213 1214 let result = sqlx::query!( 1215 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1216 input.preferred_locale, 1217 auth.0.did 1218 ) 1219 .fetch_optional(&state.db) 1220 .await; 1221 1222 match result { 1223 Ok(Some(_)) => { 1224 info!( 1225 did = %auth.0.did, 1226 locale = %input.preferred_locale, 1227 "User locale preference updated" 1228 ); 1229 Json(json!({ 1230 "preferredLocale": input.preferred_locale 1231 })) 1232 .into_response() 1233 } 1234 Ok(None) => ( 1235 StatusCode::NOT_FOUND, 1236 Json(json!({"error": "AccountNotFound"})), 1237 ) 1238 .into_response(), 1239 Err(e) => { 1240 error!("DB error updating locale: {:?}", e); 1241 ( 1242 StatusCode::INTERNAL_SERVER_ERROR, 1243 Json(json!({"error": "InternalError"})), 1244 ) 1245 .into_response() 1246 } 1247 } 1248}