this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") 18 && let Ok(value) = forwarded.to_str() 19 && let Some(first_ip) = value.split(',').next() 20 { 21 return first_ip.trim().to_string(); 22 } 23 if let Some(real_ip) = headers.get("x-real-ip") 24 && let Ok(value) = real_ip.to_str() 25 { 26 return value.trim().to_string(); 27 } 28 "unknown".to_string() 29} 30 31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 32 let identifier = identifier.trim(); 33 if identifier.contains('@') || identifier.starts_with("did:") { 34 identifier.to_string() 35 } else if !identifier.contains('.') { 36 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 37 } else { 38 identifier.to_lowercase() 39 } 40} 41 42fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 43 stored_handle.to_string() 44} 45 46#[derive(Deserialize)] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateSessionOutput { 55 pub access_jwt: String, 56 pub refresh_jwt: String, 57 pub handle: String, 58 pub did: String, 59} 60 61pub async fn create_session( 62 State(state): State<AppState>, 63 headers: HeaderMap, 64 Json(input): Json<CreateSessionInput>, 65) -> Response { 66 info!( 67 "create_session called with identifier: {}", 68 input.identifier 69 ); 70 let client_ip = extract_client_ip(&headers); 71 if !state 72 .check_rate_limit(RateLimitKind::Login, &client_ip) 73 .await 74 { 75 warn!(ip = %client_ip, "Login rate limit exceeded"); 76 return ( 77 StatusCode::TOO_MANY_REQUESTS, 78 Json(json!({ 79 "error": "RateLimitExceeded", 80 "message": "Too many login attempts. Please try again later." 81 })), 82 ) 83 .into_response(); 84 } 85 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 86 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 87 info!( 88 "Normalized identifier: {} -> {}", 89 input.identifier, normalized_identifier 90 ); 91 let row = match sqlx::query!( 92 r#"SELECT 93 u.id, u.did, u.handle, u.password_hash, 94 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 95 u.allow_legacy_login, 96 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 97 k.key_bytes, k.encryption_version, 98 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 99 FROM users u 100 JOIN user_keys k ON u.id = k.user_id 101 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 102 normalized_identifier 103 ) 104 .fetch_optional(&state.db) 105 .await 106 { 107 Ok(Some(row)) => row, 108 Ok(None) => { 109 let _ = verify( 110 &input.password, 111 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 112 ); 113 warn!("User not found for login attempt"); 114 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 115 .into_response(); 116 } 117 Err(e) => { 118 error!("Database error fetching user: {:?}", e); 119 return ApiError::InternalError.into_response(); 120 } 121 }; 122 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 123 Ok(k) => k, 124 Err(e) => { 125 error!("Failed to decrypt user key: {:?}", e); 126 return ApiError::InternalError.into_response(); 127 } 128 }; 129 let password_valid = if row 130 .password_hash 131 .as_ref() 132 .map(|h| verify(&input.password, h).unwrap_or(false)) 133 .unwrap_or(false) 134 { 135 true 136 } else { 137 let app_passwords = sqlx::query!( 138 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 139 row.id 140 ) 141 .fetch_all(&state.db) 142 .await 143 .unwrap_or_default(); 144 app_passwords 145 .iter() 146 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 147 }; 148 if !password_valid { 149 warn!("Password verification failed for login attempt"); 150 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 151 .into_response(); 152 } 153 let is_verified = 154 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 155 if !is_verified { 156 warn!("Login attempt for unverified account: {}", row.did); 157 return ( 158 StatusCode::FORBIDDEN, 159 Json(json!({ 160 "error": "AccountNotVerified", 161 "message": "Please verify your account before logging in", 162 "did": row.did 163 })), 164 ) 165 .into_response(); 166 } 167 let has_totp = row.totp_enabled.unwrap_or(false); 168 let is_legacy_login = has_totp; 169 if has_totp && !row.allow_legacy_login { 170 warn!( 171 "Legacy login blocked for TOTP-enabled account: {}", 172 row.did 173 ); 174 return ( 175 StatusCode::FORBIDDEN, 176 Json(json!({ 177 "error": "MfaRequired", 178 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 179 "did": row.did 180 })), 181 ) 182 .into_response(); 183 } 184 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 185 Ok(m) => m, 186 Err(e) => { 187 error!("Failed to create access token: {:?}", e); 188 return ApiError::InternalError.into_response(); 189 } 190 }; 191 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 192 Ok(m) => m, 193 Err(e) => { 194 error!("Failed to create refresh token: {:?}", e); 195 return ApiError::InternalError.into_response(); 196 } 197 }; 198 if let Err(e) = sqlx::query!( 199 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)", 200 row.did, 201 access_meta.jti, 202 refresh_meta.jti, 203 access_meta.expires_at, 204 refresh_meta.expires_at, 205 is_legacy_login, 206 false 207 ) 208 .execute(&state.db) 209 .await 210 { 211 error!("Failed to insert session: {:?}", e); 212 return ApiError::InternalError.into_response(); 213 } 214 if is_legacy_login { 215 warn!( 216 did = %row.did, 217 ip = %client_ip, 218 "Legacy login on TOTP-enabled account - sending notification" 219 ); 220 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 221 if let Err(e) = crate::comms::queue_legacy_login_notification( 222 &state.db, 223 row.id, 224 &hostname, 225 &client_ip, 226 row.preferred_comms_channel, 227 ) 228 .await 229 { 230 error!("Failed to queue legacy login notification: {:?}", e); 231 } 232 } 233 let handle = full_handle(&row.handle, &pds_hostname); 234 Json(CreateSessionOutput { 235 access_jwt: access_meta.token, 236 refresh_jwt: refresh_meta.token, 237 handle, 238 did: row.did, 239 }) 240 .into_response() 241} 242 243pub async fn get_session( 244 State(state): State<AppState>, 245 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 246) -> Response { 247 let permissions = auth_user.permissions(); 248 let can_read_email = permissions.allows_email_read(); 249 250 match sqlx::query!( 251 r#"SELECT 252 handle, email, email_verified, is_admin, deactivated_at, preferred_locale, 253 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 254 discord_verified, telegram_verified, signal_verified 255 FROM users WHERE did = $1"#, 256 auth_user.did 257 ) 258 .fetch_optional(&state.db) 259 .await 260 { 261 Ok(Some(row)) => { 262 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 263 crate::comms::CommsChannel::Email => ("email", row.email_verified), 264 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 265 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 266 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 267 }; 268 let pds_hostname = 269 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 270 let handle = full_handle(&row.handle, &pds_hostname); 271 let is_active = row.deactivated_at.is_none(); 272 let email_value = if can_read_email { 273 row.email.clone() 274 } else { 275 None 276 }; 277 let email_verified_value = can_read_email && row.email_verified; 278 Json(json!({ 279 "handle": handle, 280 "did": auth_user.did, 281 "email": email_value, 282 "emailVerified": email_verified_value, 283 "preferredChannel": preferred_channel, 284 "preferredChannelVerified": preferred_channel_verified, 285 "preferredLocale": row.preferred_locale, 286 "isAdmin": row.is_admin, 287 "active": is_active, 288 "status": if is_active { "active" } else { "deactivated" }, 289 "didDoc": {} 290 })) 291 .into_response() 292 } 293 Ok(None) => ApiError::AuthenticationFailed.into_response(), 294 Err(e) => { 295 error!("Database error in get_session: {:?}", e); 296 ApiError::InternalError.into_response() 297 } 298 } 299} 300 301pub async fn delete_session( 302 State(state): State<AppState>, 303 headers: axum::http::HeaderMap, 304) -> Response { 305 let token = match crate::auth::extract_bearer_token_from_header( 306 headers.get("Authorization").and_then(|h| h.to_str().ok()), 307 ) { 308 Some(t) => t, 309 None => return ApiError::AuthenticationRequired.into_response(), 310 }; 311 let jti = match crate::auth::get_jti_from_token(&token) { 312 Ok(jti) => jti, 313 Err(_) => return ApiError::AuthenticationFailed.into_response(), 314 }; 315 let did = crate::auth::get_did_from_token(&token).ok(); 316 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 317 .execute(&state.db) 318 .await 319 { 320 Ok(res) if res.rows_affected() > 0 => { 321 if let Some(did) = did { 322 let session_cache_key = format!("auth:session:{}:{}", did, jti); 323 let _ = state.cache.delete(&session_cache_key).await; 324 } 325 Json(json!({})).into_response() 326 } 327 Ok(_) => ApiError::AuthenticationFailed.into_response(), 328 Err(e) => { 329 error!("Database error in delete_session: {:?}", e); 330 ApiError::AuthenticationFailed.into_response() 331 } 332 } 333} 334 335pub async fn refresh_session( 336 State(state): State<AppState>, 337 headers: axum::http::HeaderMap, 338) -> Response { 339 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 340 if !state 341 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 342 .await 343 { 344 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 345 return ( 346 axum::http::StatusCode::TOO_MANY_REQUESTS, 347 axum::Json(serde_json::json!({ 348 "error": "RateLimitExceeded", 349 "message": "Too many requests. Please try again later." 350 })), 351 ) 352 .into_response(); 353 } 354 let refresh_token = match crate::auth::extract_bearer_token_from_header( 355 headers.get("Authorization").and_then(|h| h.to_str().ok()), 356 ) { 357 Some(t) => t, 358 None => return ApiError::AuthenticationRequired.into_response(), 359 }; 360 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 361 Ok(jti) => jti, 362 Err(_) => { 363 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 364 .into_response(); 365 } 366 }; 367 let mut tx = match state.db.begin().await { 368 Ok(tx) => tx, 369 Err(e) => { 370 error!("Failed to begin transaction: {:?}", e); 371 return ApiError::InternalError.into_response(); 372 } 373 }; 374 if let Ok(Some(session_id)) = sqlx::query_scalar!( 375 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 376 refresh_jti 377 ) 378 .fetch_optional(&mut *tx) 379 .await 380 { 381 warn!( 382 "Refresh token reuse detected! Revoking token family for session_id: {}", 383 session_id 384 ); 385 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 386 .execute(&mut *tx) 387 .await; 388 let _ = tx.commit().await; 389 return ApiError::ExpiredTokenMsg( 390 "Refresh token has been revoked due to suspected compromise".into(), 391 ) 392 .into_response(); 393 } 394 let session_row = match sqlx::query!( 395 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 396 FROM session_tokens st 397 JOIN users u ON st.did = u.did 398 JOIN user_keys k ON u.id = k.user_id 399 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 400 FOR UPDATE OF st"#, 401 refresh_jti 402 ) 403 .fetch_optional(&mut *tx) 404 .await 405 { 406 Ok(Some(row)) => row, 407 Ok(None) => { 408 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 409 .into_response(); 410 } 411 Err(e) => { 412 error!("Database error fetching session: {:?}", e); 413 return ApiError::InternalError.into_response(); 414 } 415 }; 416 let key_bytes = 417 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 418 Ok(k) => k, 419 Err(e) => { 420 error!("Failed to decrypt user key: {:?}", e); 421 return ApiError::InternalError.into_response(); 422 } 423 }; 424 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 425 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 426 } 427 let new_access_meta = 428 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 429 Ok(m) => m, 430 Err(e) => { 431 error!("Failed to create access token: {:?}", e); 432 return ApiError::InternalError.into_response(); 433 } 434 }; 435 let new_refresh_meta = 436 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 437 Ok(m) => m, 438 Err(e) => { 439 error!("Failed to create refresh token: {:?}", e); 440 return ApiError::InternalError.into_response(); 441 } 442 }; 443 match sqlx::query!( 444 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 445 refresh_jti, 446 session_row.id 447 ) 448 .execute(&mut *tx) 449 .await 450 { 451 Ok(result) if result.rows_affected() == 0 => { 452 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 453 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 454 .execute(&mut *tx) 455 .await; 456 let _ = tx.commit().await; 457 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 458 } 459 Err(e) => { 460 error!("Failed to record used refresh token: {:?}", e); 461 return ApiError::InternalError.into_response(); 462 } 463 Ok(_) => {} 464 } 465 if let Err(e) = sqlx::query!( 466 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 467 new_access_meta.jti, 468 new_refresh_meta.jti, 469 new_access_meta.expires_at, 470 new_refresh_meta.expires_at, 471 session_row.id 472 ) 473 .execute(&mut *tx) 474 .await 475 { 476 error!("Database error updating session: {:?}", e); 477 return ApiError::InternalError.into_response(); 478 } 479 if let Err(e) = tx.commit().await { 480 error!("Failed to commit transaction: {:?}", e); 481 return ApiError::InternalError.into_response(); 482 } 483 match sqlx::query!( 484 r#"SELECT 485 handle, email, email_verified, is_admin, preferred_locale, 486 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 487 discord_verified, telegram_verified, signal_verified 488 FROM users WHERE did = $1"#, 489 session_row.did 490 ) 491 .fetch_optional(&state.db) 492 .await 493 { 494 Ok(Some(u)) => { 495 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 496 crate::comms::CommsChannel::Email => ("email", u.email_verified), 497 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 498 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 499 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 500 }; 501 let pds_hostname = 502 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 503 let handle = full_handle(&u.handle, &pds_hostname); 504 Json(json!({ 505 "accessJwt": new_access_meta.token, 506 "refreshJwt": new_refresh_meta.token, 507 "handle": handle, 508 "did": session_row.did, 509 "email": u.email, 510 "emailVerified": u.email_verified, 511 "preferredChannel": preferred_channel, 512 "preferredChannelVerified": preferred_channel_verified, 513 "preferredLocale": u.preferred_locale, 514 "isAdmin": u.is_admin, 515 "active": true 516 })) 517 .into_response() 518 } 519 Ok(None) => { 520 error!("User not found for existing session: {}", session_row.did); 521 ApiError::InternalError.into_response() 522 } 523 Err(e) => { 524 error!("Database error fetching user: {:?}", e); 525 ApiError::InternalError.into_response() 526 } 527 } 528} 529 530#[derive(Deserialize)] 531#[serde(rename_all = "camelCase")] 532pub struct ConfirmSignupInput { 533 pub did: String, 534 pub verification_code: String, 535} 536 537#[derive(Serialize)] 538#[serde(rename_all = "camelCase")] 539pub struct ConfirmSignupOutput { 540 pub access_jwt: String, 541 pub refresh_jwt: String, 542 pub handle: String, 543 pub did: String, 544 pub email: Option<String>, 545 pub email_verified: bool, 546 pub preferred_channel: String, 547 pub preferred_channel_verified: bool, 548} 549 550pub async fn confirm_signup( 551 State(state): State<AppState>, 552 Json(input): Json<ConfirmSignupInput>, 553) -> Response { 554 info!("confirm_signup called for DID: {}", input.did); 555 let row = match sqlx::query!( 556 r#"SELECT 557 u.id, u.did, u.handle, u.email, 558 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 559 k.key_bytes, k.encryption_version 560 FROM users u 561 JOIN user_keys k ON u.id = k.user_id 562 WHERE u.did = $1"#, 563 input.did 564 ) 565 .fetch_optional(&state.db) 566 .await 567 { 568 Ok(Some(row)) => row, 569 Ok(None) => { 570 warn!("User not found for confirm_signup: {}", input.did); 571 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 572 .into_response(); 573 } 574 Err(e) => { 575 error!("Database error in confirm_signup: {:?}", e); 576 return ApiError::InternalError.into_response(); 577 } 578 }; 579 580 let channel_str = match row.channel { 581 crate::comms::CommsChannel::Email => "email", 582 crate::comms::CommsChannel::Discord => "discord", 583 crate::comms::CommsChannel::Telegram => "telegram", 584 crate::comms::CommsChannel::Signal => "signal", 585 }; 586 let verification = match sqlx::query!( 587 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel", 588 row.id, 589 channel_str as _ 590 ) 591 .fetch_optional(&state.db) 592 .await 593 { 594 Ok(Some(v)) => v, 595 Ok(None) => { 596 warn!("No verification code found for user: {}", input.did); 597 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 598 } 599 Err(e) => { 600 error!("Database error fetching verification: {:?}", e); 601 return ApiError::InternalError.into_response(); 602 } 603 }; 604 605 if verification.code != input.verification_code { 606 warn!("Invalid verification code for user: {}", input.did); 607 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 608 } 609 if verification.expires_at < Utc::now() { 610 warn!("Verification code expired for user: {}", input.did); 611 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 612 } 613 614 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 615 Ok(k) => k, 616 Err(e) => { 617 error!("Failed to decrypt user key: {:?}", e); 618 return ApiError::InternalError.into_response(); 619 } 620 }; 621 let verified_column = match row.channel { 622 crate::comms::CommsChannel::Email => "email_verified", 623 crate::comms::CommsChannel::Discord => "discord_verified", 624 crate::comms::CommsChannel::Telegram => "telegram_verified", 625 crate::comms::CommsChannel::Signal => "signal_verified", 626 }; 627 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 628 if let Err(e) = sqlx::query(&update_query) 629 .bind(&input.did) 630 .execute(&state.db) 631 .await 632 { 633 error!("Failed to update verification status: {:?}", e); 634 return ApiError::InternalError.into_response(); 635 } 636 637 if let Err(e) = sqlx::query!( 638 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel", 639 row.id, 640 channel_str as _ 641 ) 642 .execute(&state.db) 643 .await 644 { 645 error!("Failed to delete verification record: {:?}", e); 646 } 647 648 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 649 Ok(m) => m, 650 Err(e) => { 651 error!("Failed to create access token: {:?}", e); 652 return ApiError::InternalError.into_response(); 653 } 654 }; 655 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 656 Ok(m) => m, 657 Err(e) => { 658 error!("Failed to create refresh token: {:?}", e); 659 return ApiError::InternalError.into_response(); 660 } 661 }; 662 if let Err(e) = sqlx::query!( 663 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)", 664 row.did, 665 access_meta.jti, 666 refresh_meta.jti, 667 access_meta.expires_at, 668 refresh_meta.expires_at, 669 false, 670 false 671 ) 672 .execute(&state.db) 673 .await 674 { 675 error!("Failed to insert session: {:?}", e); 676 return ApiError::InternalError.into_response(); 677 } 678 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 679 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 680 warn!("Failed to enqueue welcome notification: {:?}", e); 681 } 682 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 683 let preferred_channel = match row.channel { 684 crate::comms::CommsChannel::Email => "email", 685 crate::comms::CommsChannel::Discord => "discord", 686 crate::comms::CommsChannel::Telegram => "telegram", 687 crate::comms::CommsChannel::Signal => "signal", 688 }; 689 Json(ConfirmSignupOutput { 690 access_jwt: access_meta.token, 691 refresh_jwt: refresh_meta.token, 692 handle: row.handle, 693 did: row.did, 694 email: row.email, 695 email_verified, 696 preferred_channel: preferred_channel.to_string(), 697 preferred_channel_verified: true, 698 }) 699 .into_response() 700} 701 702#[derive(Deserialize)] 703#[serde(rename_all = "camelCase")] 704pub struct ResendVerificationInput { 705 pub did: String, 706} 707 708pub async fn resend_verification( 709 State(state): State<AppState>, 710 Json(input): Json<ResendVerificationInput>, 711) -> Response { 712 info!("resend_verification called for DID: {}", input.did); 713 let row = match sqlx::query!( 714 r#"SELECT 715 id, handle, email, 716 preferred_comms_channel as "channel: crate::comms::CommsChannel", 717 discord_id, telegram_username, signal_number, 718 email_verified, discord_verified, telegram_verified, signal_verified 719 FROM users 720 WHERE did = $1"#, 721 input.did 722 ) 723 .fetch_optional(&state.db) 724 .await 725 { 726 Ok(Some(row)) => row, 727 Ok(None) => { 728 return ApiError::InvalidRequest("User not found".into()).into_response(); 729 } 730 Err(e) => { 731 error!("Database error in resend_verification: {:?}", e); 732 return ApiError::InternalError.into_response(); 733 } 734 }; 735 let is_verified = 736 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 737 if is_verified { 738 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 739 } 740 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 741 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 742 743 let (channel_str, recipient) = match row.channel { 744 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 745 crate::comms::CommsChannel::Discord => { 746 ("discord", row.discord_id.clone().unwrap_or_default()) 747 } 748 crate::comms::CommsChannel::Telegram => ( 749 "telegram", 750 row.telegram_username.clone().unwrap_or_default(), 751 ), 752 crate::comms::CommsChannel::Signal => { 753 ("signal", row.signal_number.clone().unwrap_or_default()) 754 } 755 }; 756 757 if let Err(e) = sqlx::query!( 758 r#" 759 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 760 VALUES ($1, $2::comms_channel, $3, $4, $5) 761 ON CONFLICT (user_id, channel) DO UPDATE 762 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW() 763 "#, 764 row.id, 765 channel_str as _, 766 verification_code, 767 recipient, 768 code_expires_at 769 ) 770 .execute(&state.db) 771 .await 772 { 773 error!("Failed to update verification code: {:?}", e); 774 return ApiError::InternalError.into_response(); 775 } 776 if let Err(e) = crate::comms::enqueue_signup_verification( 777 &state.db, 778 row.id, 779 channel_str, 780 &recipient, 781 &verification_code, 782 None, 783 ) 784 .await 785 { 786 warn!("Failed to enqueue verification notification: {:?}", e); 787 } 788 Json(json!({"success": true})).into_response() 789} 790 791#[derive(Serialize)] 792#[serde(rename_all = "camelCase")] 793pub struct SessionInfo { 794 pub id: String, 795 pub session_type: String, 796 pub client_name: Option<String>, 797 pub created_at: String, 798 pub expires_at: String, 799 pub is_current: bool, 800} 801 802#[derive(Serialize)] 803#[serde(rename_all = "camelCase")] 804pub struct ListSessionsOutput { 805 pub sessions: Vec<SessionInfo>, 806} 807 808pub async fn list_sessions( 809 State(state): State<AppState>, 810 headers: HeaderMap, 811 auth: BearerAuth, 812) -> Response { 813 let current_jti = headers 814 .get("authorization") 815 .and_then(|v| v.to_str().ok()) 816 .and_then(|v| v.strip_prefix("Bearer ")) 817 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 818 819 let mut sessions: Vec<SessionInfo> = Vec::new(); 820 821 let jwt_result = sqlx::query_as::< 822 _, 823 ( 824 i32, 825 String, 826 chrono::DateTime<chrono::Utc>, 827 chrono::DateTime<chrono::Utc>, 828 ), 829 >( 830 r#" 831 SELECT id, access_jti, created_at, refresh_expires_at 832 FROM session_tokens 833 WHERE did = $1 AND refresh_expires_at > NOW() 834 ORDER BY created_at DESC 835 "#, 836 ) 837 .bind(&auth.0.did) 838 .fetch_all(&state.db) 839 .await; 840 841 match jwt_result { 842 Ok(rows) => { 843 for (id, access_jti, created_at, expires_at) in rows { 844 sessions.push(SessionInfo { 845 id: format!("jwt:{}", id), 846 session_type: "legacy".to_string(), 847 client_name: None, 848 created_at: created_at.to_rfc3339(), 849 expires_at: expires_at.to_rfc3339(), 850 is_current: current_jti.as_ref() == Some(&access_jti), 851 }); 852 } 853 } 854 Err(e) => { 855 error!("DB error fetching JWT sessions: {:?}", e); 856 return ( 857 StatusCode::INTERNAL_SERVER_ERROR, 858 Json(json!({"error": "InternalError"})), 859 ) 860 .into_response(); 861 } 862 } 863 864 let oauth_result = sqlx::query_as::< 865 _, 866 ( 867 i32, 868 String, 869 chrono::DateTime<chrono::Utc>, 870 chrono::DateTime<chrono::Utc>, 871 String, 872 ), 873 >( 874 r#" 875 SELECT id, token_id, created_at, expires_at, client_id 876 FROM oauth_token 877 WHERE did = $1 AND expires_at > NOW() 878 ORDER BY created_at DESC 879 "#, 880 ) 881 .bind(&auth.0.did) 882 .fetch_all(&state.db) 883 .await; 884 885 match oauth_result { 886 Ok(rows) => { 887 for (id, token_id, created_at, expires_at, client_id) in rows { 888 let client_name = extract_client_name(&client_id); 889 let is_current_oauth = auth.0.is_oauth 890 && current_jti.as_ref() == Some(&token_id); 891 sessions.push(SessionInfo { 892 id: format!("oauth:{}", id), 893 session_type: "oauth".to_string(), 894 client_name: Some(client_name), 895 created_at: created_at.to_rfc3339(), 896 expires_at: expires_at.to_rfc3339(), 897 is_current: is_current_oauth, 898 }); 899 } 900 } 901 Err(e) => { 902 error!("DB error fetching OAuth sessions: {:?}", e); 903 return ( 904 StatusCode::INTERNAL_SERVER_ERROR, 905 Json(json!({"error": "InternalError"})), 906 ) 907 .into_response(); 908 } 909 } 910 911 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 912 913 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 914} 915 916fn extract_client_name(client_id: &str) -> String { 917 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 918 "Localhost App".to_string() 919 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 920 parsed.host_str().unwrap_or("Unknown App").to_string() 921 } else { 922 client_id.to_string() 923 } 924} 925 926#[derive(Deserialize)] 927#[serde(rename_all = "camelCase")] 928pub struct RevokeSessionInput { 929 pub session_id: String, 930} 931 932pub async fn revoke_session( 933 State(state): State<AppState>, 934 auth: BearerAuth, 935 Json(input): Json<RevokeSessionInput>, 936) -> Response { 937 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 938 let session_id: i32 = match jwt_id.parse() { 939 Ok(id) => id, 940 Err(_) => { 941 return ( 942 StatusCode::BAD_REQUEST, 943 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 944 ) 945 .into_response(); 946 } 947 }; 948 let session = sqlx::query_as::<_, (String,)>( 949 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 950 ) 951 .bind(session_id) 952 .bind(&auth.0.did) 953 .fetch_optional(&state.db) 954 .await; 955 let access_jti = match session { 956 Ok(Some((jti,))) => jti, 957 Ok(None) => { 958 return ( 959 StatusCode::NOT_FOUND, 960 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 961 ) 962 .into_response(); 963 } 964 Err(e) => { 965 error!("DB error in revoke_session: {:?}", e); 966 return ( 967 StatusCode::INTERNAL_SERVER_ERROR, 968 Json(json!({"error": "InternalError"})), 969 ) 970 .into_response(); 971 } 972 }; 973 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 974 .bind(session_id) 975 .execute(&state.db) 976 .await 977 { 978 error!("DB error deleting session: {:?}", e); 979 return ( 980 StatusCode::INTERNAL_SERVER_ERROR, 981 Json(json!({"error": "InternalError"})), 982 ) 983 .into_response(); 984 } 985 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 986 if let Err(e) = state.cache.delete(&cache_key).await { 987 warn!("Failed to invalidate session cache: {:?}", e); 988 } 989 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 990 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 991 let session_id: i32 = match oauth_id.parse() { 992 Ok(id) => id, 993 Err(_) => { 994 return ( 995 StatusCode::BAD_REQUEST, 996 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 997 ) 998 .into_response(); 999 } 1000 }; 1001 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1002 .bind(session_id) 1003 .bind(&auth.0.did) 1004 .execute(&state.db) 1005 .await; 1006 match result { 1007 Ok(r) if r.rows_affected() == 0 => { 1008 return ( 1009 StatusCode::NOT_FOUND, 1010 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1011 ) 1012 .into_response(); 1013 } 1014 Err(e) => { 1015 error!("DB error deleting OAuth session: {:?}", e); 1016 return ( 1017 StatusCode::INTERNAL_SERVER_ERROR, 1018 Json(json!({"error": "InternalError"})), 1019 ) 1020 .into_response(); 1021 } 1022 _ => {} 1023 } 1024 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1025 } else { 1026 return ( 1027 StatusCode::BAD_REQUEST, 1028 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1029 ) 1030 .into_response(); 1031 } 1032 (StatusCode::OK, Json(json!({}))).into_response() 1033} 1034 1035pub async fn revoke_all_sessions( 1036 State(state): State<AppState>, 1037 headers: HeaderMap, 1038 auth: BearerAuth, 1039) -> Response { 1040 let current_jti = headers 1041 .get("authorization") 1042 .and_then(|v| v.to_str().ok()) 1043 .and_then(|v| v.strip_prefix("Bearer ")) 1044 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1045 1046 if let Some(ref jti) = current_jti { 1047 if auth.0.is_oauth { 1048 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1049 .bind(&auth.0.did) 1050 .execute(&state.db) 1051 .await 1052 { 1053 error!("DB error revoking JWT sessions: {:?}", e); 1054 return ( 1055 StatusCode::INTERNAL_SERVER_ERROR, 1056 Json(json!({"error": "InternalError"})), 1057 ) 1058 .into_response(); 1059 } 1060 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1061 .bind(&auth.0.did) 1062 .bind(jti) 1063 .execute(&state.db) 1064 .await 1065 { 1066 error!("DB error revoking OAuth sessions: {:?}", e); 1067 return ( 1068 StatusCode::INTERNAL_SERVER_ERROR, 1069 Json(json!({"error": "InternalError"})), 1070 ) 1071 .into_response(); 1072 } 1073 } else { 1074 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1075 .bind(&auth.0.did) 1076 .bind(jti) 1077 .execute(&state.db) 1078 .await 1079 { 1080 error!("DB error revoking JWT sessions: {:?}", e); 1081 return ( 1082 StatusCode::INTERNAL_SERVER_ERROR, 1083 Json(json!({"error": "InternalError"})), 1084 ) 1085 .into_response(); 1086 } 1087 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1088 .bind(&auth.0.did) 1089 .execute(&state.db) 1090 .await 1091 { 1092 error!("DB error revoking OAuth sessions: {:?}", e); 1093 return ( 1094 StatusCode::INTERNAL_SERVER_ERROR, 1095 Json(json!({"error": "InternalError"})), 1096 ) 1097 .into_response(); 1098 } 1099 } 1100 } else { 1101 return ( 1102 StatusCode::BAD_REQUEST, 1103 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1104 ) 1105 .into_response(); 1106 } 1107 1108 info!(did = %auth.0.did, "All other sessions revoked"); 1109 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1110} 1111 1112#[derive(Serialize)] 1113#[serde(rename_all = "camelCase")] 1114pub struct LegacyLoginPreferenceOutput { 1115 pub allow_legacy_login: bool, 1116 pub has_mfa: bool, 1117} 1118 1119pub async fn get_legacy_login_preference( 1120 State(state): State<AppState>, 1121 auth: BearerAuth, 1122) -> Response { 1123 let result = sqlx::query!( 1124 r#"SELECT 1125 u.allow_legacy_login, 1126 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1127 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1128 FROM users u WHERE u.did = $1"#, 1129 auth.0.did 1130 ) 1131 .fetch_optional(&state.db) 1132 .await; 1133 1134 match result { 1135 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1136 allow_legacy_login: row.allow_legacy_login, 1137 has_mfa: row.has_mfa, 1138 }) 1139 .into_response(), 1140 Ok(None) => ( 1141 StatusCode::NOT_FOUND, 1142 Json(json!({"error": "AccountNotFound"})), 1143 ) 1144 .into_response(), 1145 Err(e) => { 1146 error!("DB error: {:?}", e); 1147 ( 1148 StatusCode::INTERNAL_SERVER_ERROR, 1149 Json(json!({"error": "InternalError"})), 1150 ) 1151 .into_response() 1152 } 1153 } 1154} 1155 1156#[derive(Deserialize)] 1157#[serde(rename_all = "camelCase")] 1158pub struct UpdateLegacyLoginInput { 1159 pub allow_legacy_login: bool, 1160} 1161 1162pub async fn update_legacy_login_preference( 1163 State(state): State<AppState>, 1164 auth: BearerAuth, 1165 Json(input): Json<UpdateLegacyLoginInput>, 1166) -> Response { 1167 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1168 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1169 .await; 1170 } 1171 1172 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1173 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1174 } 1175 1176 let result = sqlx::query!( 1177 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1178 input.allow_legacy_login, 1179 auth.0.did 1180 ) 1181 .fetch_optional(&state.db) 1182 .await; 1183 1184 match result { 1185 Ok(Some(_)) => { 1186 info!( 1187 did = %auth.0.did, 1188 allow_legacy_login = input.allow_legacy_login, 1189 "Legacy login preference updated" 1190 ); 1191 Json(json!({ 1192 "allowLegacyLogin": input.allow_legacy_login 1193 })) 1194 .into_response() 1195 } 1196 Ok(None) => ( 1197 StatusCode::NOT_FOUND, 1198 Json(json!({"error": "AccountNotFound"})), 1199 ) 1200 .into_response(), 1201 Err(e) => { 1202 error!("DB error: {:?}", e); 1203 ( 1204 StatusCode::INTERNAL_SERVER_ERROR, 1205 Json(json!({"error": "InternalError"})), 1206 ) 1207 .into_response() 1208 } 1209 } 1210} 1211 1212use crate::comms::locale::VALID_LOCALES; 1213 1214#[derive(Deserialize)] 1215#[serde(rename_all = "camelCase")] 1216pub struct UpdateLocaleInput { 1217 pub preferred_locale: String, 1218} 1219 1220pub async fn update_locale( 1221 State(state): State<AppState>, 1222 auth: BearerAuth, 1223 Json(input): Json<UpdateLocaleInput>, 1224) -> Response { 1225 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1226 return ( 1227 StatusCode::BAD_REQUEST, 1228 Json(json!({ 1229 "error": "InvalidRequest", 1230 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1231 })), 1232 ) 1233 .into_response(); 1234 } 1235 1236 let result = sqlx::query!( 1237 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1238 input.preferred_locale, 1239 auth.0.did 1240 ) 1241 .fetch_optional(&state.db) 1242 .await; 1243 1244 match result { 1245 Ok(Some(_)) => { 1246 info!( 1247 did = %auth.0.did, 1248 locale = %input.preferred_locale, 1249 "User locale preference updated" 1250 ); 1251 Json(json!({ 1252 "preferredLocale": input.preferred_locale 1253 })) 1254 .into_response() 1255 } 1256 Ok(None) => ( 1257 StatusCode::NOT_FOUND, 1258 Json(json!({"error": "AccountNotFound"})), 1259 ) 1260 .into_response(), 1261 Err(e) => { 1262 error!("DB error updating locale: {:?}", e); 1263 ( 1264 StatusCode::INTERNAL_SERVER_ERROR, 1265 Json(json!({"error": "InternalError"})), 1266 ) 1267 .into_response() 1268 } 1269 } 1270}