this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") 18 && let Ok(value) = forwarded.to_str() 19 && let Some(first_ip) = value.split(',').next() 20 { 21 return first_ip.trim().to_string(); 22 } 23 if let Some(real_ip) = headers.get("x-real-ip") 24 && let Ok(value) = real_ip.to_str() 25 { 26 return value.trim().to_string(); 27 } 28 "unknown".to_string() 29} 30 31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 32 let identifier = identifier.trim(); 33 if identifier.contains('@') || identifier.starts_with("did:") { 34 identifier.to_string() 35 } else if !identifier.contains('.') { 36 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 37 } else { 38 identifier.to_lowercase() 39 } 40} 41 42fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 43 stored_handle.to_string() 44} 45 46#[derive(Deserialize)] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateSessionOutput { 55 pub access_jwt: String, 56 pub refresh_jwt: String, 57 pub handle: String, 58 pub did: String, 59} 60 61pub async fn create_session( 62 State(state): State<AppState>, 63 headers: HeaderMap, 64 Json(input): Json<CreateSessionInput>, 65) -> Response { 66 info!( 67 "create_session called with identifier: {}", 68 input.identifier 69 ); 70 let client_ip = extract_client_ip(&headers); 71 if !state 72 .check_rate_limit(RateLimitKind::Login, &client_ip) 73 .await 74 { 75 warn!(ip = %client_ip, "Login rate limit exceeded"); 76 return ( 77 StatusCode::TOO_MANY_REQUESTS, 78 Json(json!({ 79 "error": "RateLimitExceeded", 80 "message": "Too many login attempts. Please try again later." 81 })), 82 ) 83 .into_response(); 84 } 85 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 86 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 87 info!( 88 "Normalized identifier: {} -> {}", 89 input.identifier, normalized_identifier 90 ); 91 let row = match sqlx::query!( 92 r#"SELECT 93 u.id, u.did, u.handle, u.password_hash, 94 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 95 u.allow_legacy_login, 96 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 97 k.key_bytes, k.encryption_version, 98 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 99 FROM users u 100 JOIN user_keys k ON u.id = k.user_id 101 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 102 normalized_identifier 103 ) 104 .fetch_optional(&state.db) 105 .await 106 { 107 Ok(Some(row)) => row, 108 Ok(None) => { 109 let _ = verify( 110 &input.password, 111 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 112 ); 113 warn!("User not found for login attempt"); 114 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 115 .into_response(); 116 } 117 Err(e) => { 118 error!("Database error fetching user: {:?}", e); 119 return ApiError::InternalError.into_response(); 120 } 121 }; 122 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 123 Ok(k) => k, 124 Err(e) => { 125 error!("Failed to decrypt user key: {:?}", e); 126 return ApiError::InternalError.into_response(); 127 } 128 }; 129 let password_valid = if row 130 .password_hash 131 .as_ref() 132 .map(|h| verify(&input.password, h).unwrap_or(false)) 133 .unwrap_or(false) 134 { 135 true 136 } else { 137 let app_passwords = sqlx::query!( 138 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 139 row.id 140 ) 141 .fetch_all(&state.db) 142 .await 143 .unwrap_or_default(); 144 app_passwords 145 .iter() 146 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 147 }; 148 if !password_valid { 149 warn!("Password verification failed for login attempt"); 150 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 151 .into_response(); 152 } 153 let is_verified = 154 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 155 if !is_verified { 156 warn!("Login attempt for unverified account: {}", row.did); 157 return ( 158 StatusCode::FORBIDDEN, 159 Json(json!({ 160 "error": "AccountNotVerified", 161 "message": "Please verify your account before logging in", 162 "did": row.did 163 })), 164 ) 165 .into_response(); 166 } 167 let has_totp = row.totp_enabled.unwrap_or(false); 168 let is_legacy_login = has_totp; 169 if has_totp && !row.allow_legacy_login { 170 warn!( 171 "Legacy login blocked for TOTP-enabled account: {}", 172 row.did 173 ); 174 return ( 175 StatusCode::FORBIDDEN, 176 Json(json!({ 177 "error": "MfaRequired", 178 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 179 "did": row.did 180 })), 181 ) 182 .into_response(); 183 } 184 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 185 Ok(m) => m, 186 Err(e) => { 187 error!("Failed to create access token: {:?}", e); 188 return ApiError::InternalError.into_response(); 189 } 190 }; 191 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 192 Ok(m) => m, 193 Err(e) => { 194 error!("Failed to create refresh token: {:?}", e); 195 return ApiError::InternalError.into_response(); 196 } 197 }; 198 if let Err(e) = sqlx::query!( 199 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)", 200 row.did, 201 access_meta.jti, 202 refresh_meta.jti, 203 access_meta.expires_at, 204 refresh_meta.expires_at, 205 is_legacy_login, 206 false 207 ) 208 .execute(&state.db) 209 .await 210 { 211 error!("Failed to insert session: {:?}", e); 212 return ApiError::InternalError.into_response(); 213 } 214 if is_legacy_login { 215 warn!( 216 did = %row.did, 217 ip = %client_ip, 218 "Legacy login on TOTP-enabled account - sending notification" 219 ); 220 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 221 if let Err(e) = crate::comms::queue_legacy_login_notification( 222 &state.db, 223 row.id, 224 &hostname, 225 &client_ip, 226 row.preferred_comms_channel, 227 ) 228 .await 229 { 230 error!("Failed to queue legacy login notification: {:?}", e); 231 } 232 } 233 let handle = full_handle(&row.handle, &pds_hostname); 234 Json(CreateSessionOutput { 235 access_jwt: access_meta.token, 236 refresh_jwt: refresh_meta.token, 237 handle, 238 did: row.did, 239 }) 240 .into_response() 241} 242 243pub async fn get_session( 244 State(state): State<AppState>, 245 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 246) -> Response { 247 let permissions = auth_user.permissions(); 248 let can_read_email = permissions.allows_email_read(); 249 250 match sqlx::query!( 251 r#"SELECT 252 handle, email, email_verified, is_admin, deactivated_at, 253 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 254 discord_verified, telegram_verified, signal_verified 255 FROM users WHERE did = $1"#, 256 auth_user.did 257 ) 258 .fetch_optional(&state.db) 259 .await 260 { 261 Ok(Some(row)) => { 262 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 263 crate::comms::CommsChannel::Email => ("email", row.email_verified), 264 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 265 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 266 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 267 }; 268 let pds_hostname = 269 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 270 let handle = full_handle(&row.handle, &pds_hostname); 271 let is_active = row.deactivated_at.is_none(); 272 let email_value = if can_read_email { 273 row.email.clone() 274 } else { 275 None 276 }; 277 let email_verified_value = can_read_email && row.email_verified; 278 Json(json!({ 279 "handle": handle, 280 "did": auth_user.did, 281 "email": email_value, 282 "emailVerified": email_verified_value, 283 "preferredChannel": preferred_channel, 284 "preferredChannelVerified": preferred_channel_verified, 285 "isAdmin": row.is_admin, 286 "active": is_active, 287 "status": if is_active { "active" } else { "deactivated" }, 288 "didDoc": {} 289 })) 290 .into_response() 291 } 292 Ok(None) => ApiError::AuthenticationFailed.into_response(), 293 Err(e) => { 294 error!("Database error in get_session: {:?}", e); 295 ApiError::InternalError.into_response() 296 } 297 } 298} 299 300pub async fn delete_session( 301 State(state): State<AppState>, 302 headers: axum::http::HeaderMap, 303) -> Response { 304 let token = match crate::auth::extract_bearer_token_from_header( 305 headers.get("Authorization").and_then(|h| h.to_str().ok()), 306 ) { 307 Some(t) => t, 308 None => return ApiError::AuthenticationRequired.into_response(), 309 }; 310 let jti = match crate::auth::get_jti_from_token(&token) { 311 Ok(jti) => jti, 312 Err(_) => return ApiError::AuthenticationFailed.into_response(), 313 }; 314 let did = crate::auth::get_did_from_token(&token).ok(); 315 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 316 .execute(&state.db) 317 .await 318 { 319 Ok(res) if res.rows_affected() > 0 => { 320 if let Some(did) = did { 321 let session_cache_key = format!("auth:session:{}:{}", did, jti); 322 let _ = state.cache.delete(&session_cache_key).await; 323 } 324 Json(json!({})).into_response() 325 } 326 Ok(_) => ApiError::AuthenticationFailed.into_response(), 327 Err(e) => { 328 error!("Database error in delete_session: {:?}", e); 329 ApiError::AuthenticationFailed.into_response() 330 } 331 } 332} 333 334pub async fn refresh_session( 335 State(state): State<AppState>, 336 headers: axum::http::HeaderMap, 337) -> Response { 338 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 339 if !state 340 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 341 .await 342 { 343 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 344 return ( 345 axum::http::StatusCode::TOO_MANY_REQUESTS, 346 axum::Json(serde_json::json!({ 347 "error": "RateLimitExceeded", 348 "message": "Too many requests. Please try again later." 349 })), 350 ) 351 .into_response(); 352 } 353 let refresh_token = match crate::auth::extract_bearer_token_from_header( 354 headers.get("Authorization").and_then(|h| h.to_str().ok()), 355 ) { 356 Some(t) => t, 357 None => return ApiError::AuthenticationRequired.into_response(), 358 }; 359 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 360 Ok(jti) => jti, 361 Err(_) => { 362 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 363 .into_response(); 364 } 365 }; 366 let mut tx = match state.db.begin().await { 367 Ok(tx) => tx, 368 Err(e) => { 369 error!("Failed to begin transaction: {:?}", e); 370 return ApiError::InternalError.into_response(); 371 } 372 }; 373 if let Ok(Some(session_id)) = sqlx::query_scalar!( 374 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 375 refresh_jti 376 ) 377 .fetch_optional(&mut *tx) 378 .await 379 { 380 warn!( 381 "Refresh token reuse detected! Revoking token family for session_id: {}", 382 session_id 383 ); 384 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 385 .execute(&mut *tx) 386 .await; 387 let _ = tx.commit().await; 388 return ApiError::ExpiredTokenMsg( 389 "Refresh token has been revoked due to suspected compromise".into(), 390 ) 391 .into_response(); 392 } 393 let session_row = match sqlx::query!( 394 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 395 FROM session_tokens st 396 JOIN users u ON st.did = u.did 397 JOIN user_keys k ON u.id = k.user_id 398 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 399 FOR UPDATE OF st"#, 400 refresh_jti 401 ) 402 .fetch_optional(&mut *tx) 403 .await 404 { 405 Ok(Some(row)) => row, 406 Ok(None) => { 407 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 408 .into_response(); 409 } 410 Err(e) => { 411 error!("Database error fetching session: {:?}", e); 412 return ApiError::InternalError.into_response(); 413 } 414 }; 415 let key_bytes = 416 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 417 Ok(k) => k, 418 Err(e) => { 419 error!("Failed to decrypt user key: {:?}", e); 420 return ApiError::InternalError.into_response(); 421 } 422 }; 423 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 424 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 425 } 426 let new_access_meta = 427 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 428 Ok(m) => m, 429 Err(e) => { 430 error!("Failed to create access token: {:?}", e); 431 return ApiError::InternalError.into_response(); 432 } 433 }; 434 let new_refresh_meta = 435 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 436 Ok(m) => m, 437 Err(e) => { 438 error!("Failed to create refresh token: {:?}", e); 439 return ApiError::InternalError.into_response(); 440 } 441 }; 442 match sqlx::query!( 443 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 444 refresh_jti, 445 session_row.id 446 ) 447 .execute(&mut *tx) 448 .await 449 { 450 Ok(result) if result.rows_affected() == 0 => { 451 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 452 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 453 .execute(&mut *tx) 454 .await; 455 let _ = tx.commit().await; 456 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 457 } 458 Err(e) => { 459 error!("Failed to record used refresh token: {:?}", e); 460 return ApiError::InternalError.into_response(); 461 } 462 Ok(_) => {} 463 } 464 if let Err(e) = sqlx::query!( 465 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 466 new_access_meta.jti, 467 new_refresh_meta.jti, 468 new_access_meta.expires_at, 469 new_refresh_meta.expires_at, 470 session_row.id 471 ) 472 .execute(&mut *tx) 473 .await 474 { 475 error!("Database error updating session: {:?}", e); 476 return ApiError::InternalError.into_response(); 477 } 478 if let Err(e) = tx.commit().await { 479 error!("Failed to commit transaction: {:?}", e); 480 return ApiError::InternalError.into_response(); 481 } 482 match sqlx::query!( 483 r#"SELECT 484 handle, email, email_verified, is_admin, 485 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 486 discord_verified, telegram_verified, signal_verified 487 FROM users WHERE did = $1"#, 488 session_row.did 489 ) 490 .fetch_optional(&state.db) 491 .await 492 { 493 Ok(Some(u)) => { 494 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 495 crate::comms::CommsChannel::Email => ("email", u.email_verified), 496 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 497 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 498 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 499 }; 500 let pds_hostname = 501 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 502 let handle = full_handle(&u.handle, &pds_hostname); 503 Json(json!({ 504 "accessJwt": new_access_meta.token, 505 "refreshJwt": new_refresh_meta.token, 506 "handle": handle, 507 "did": session_row.did, 508 "email": u.email, 509 "emailVerified": u.email_verified, 510 "preferredChannel": preferred_channel, 511 "preferredChannelVerified": preferred_channel_verified, 512 "isAdmin": u.is_admin, 513 "active": true 514 })) 515 .into_response() 516 } 517 Ok(None) => { 518 error!("User not found for existing session: {}", session_row.did); 519 ApiError::InternalError.into_response() 520 } 521 Err(e) => { 522 error!("Database error fetching user: {:?}", e); 523 ApiError::InternalError.into_response() 524 } 525 } 526} 527 528#[derive(Deserialize)] 529#[serde(rename_all = "camelCase")] 530pub struct ConfirmSignupInput { 531 pub did: String, 532 pub verification_code: String, 533} 534 535#[derive(Serialize)] 536#[serde(rename_all = "camelCase")] 537pub struct ConfirmSignupOutput { 538 pub access_jwt: String, 539 pub refresh_jwt: String, 540 pub handle: String, 541 pub did: String, 542 pub email: Option<String>, 543 pub email_verified: bool, 544 pub preferred_channel: String, 545 pub preferred_channel_verified: bool, 546} 547 548pub async fn confirm_signup( 549 State(state): State<AppState>, 550 Json(input): Json<ConfirmSignupInput>, 551) -> Response { 552 info!("confirm_signup called for DID: {}", input.did); 553 let row = match sqlx::query!( 554 r#"SELECT 555 u.id, u.did, u.handle, u.email, 556 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 557 k.key_bytes, k.encryption_version 558 FROM users u 559 JOIN user_keys k ON u.id = k.user_id 560 WHERE u.did = $1"#, 561 input.did 562 ) 563 .fetch_optional(&state.db) 564 .await 565 { 566 Ok(Some(row)) => row, 567 Ok(None) => { 568 warn!("User not found for confirm_signup: {}", input.did); 569 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 570 .into_response(); 571 } 572 Err(e) => { 573 error!("Database error in confirm_signup: {:?}", e); 574 return ApiError::InternalError.into_response(); 575 } 576 }; 577 578 let channel_str = match row.channel { 579 crate::comms::CommsChannel::Email => "email", 580 crate::comms::CommsChannel::Discord => "discord", 581 crate::comms::CommsChannel::Telegram => "telegram", 582 crate::comms::CommsChannel::Signal => "signal", 583 }; 584 let verification = match sqlx::query!( 585 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel", 586 row.id, 587 channel_str as _ 588 ) 589 .fetch_optional(&state.db) 590 .await 591 { 592 Ok(Some(v)) => v, 593 Ok(None) => { 594 warn!("No verification code found for user: {}", input.did); 595 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 596 } 597 Err(e) => { 598 error!("Database error fetching verification: {:?}", e); 599 return ApiError::InternalError.into_response(); 600 } 601 }; 602 603 if verification.code != input.verification_code { 604 warn!("Invalid verification code for user: {}", input.did); 605 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 606 } 607 if verification.expires_at < Utc::now() { 608 warn!("Verification code expired for user: {}", input.did); 609 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 610 } 611 612 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 613 Ok(k) => k, 614 Err(e) => { 615 error!("Failed to decrypt user key: {:?}", e); 616 return ApiError::InternalError.into_response(); 617 } 618 }; 619 let verified_column = match row.channel { 620 crate::comms::CommsChannel::Email => "email_verified", 621 crate::comms::CommsChannel::Discord => "discord_verified", 622 crate::comms::CommsChannel::Telegram => "telegram_verified", 623 crate::comms::CommsChannel::Signal => "signal_verified", 624 }; 625 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 626 if let Err(e) = sqlx::query(&update_query) 627 .bind(&input.did) 628 .execute(&state.db) 629 .await 630 { 631 error!("Failed to update verification status: {:?}", e); 632 return ApiError::InternalError.into_response(); 633 } 634 635 if let Err(e) = sqlx::query!( 636 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel", 637 row.id, 638 channel_str as _ 639 ) 640 .execute(&state.db) 641 .await 642 { 643 error!("Failed to delete verification record: {:?}", e); 644 } 645 646 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 647 Ok(m) => m, 648 Err(e) => { 649 error!("Failed to create access token: {:?}", e); 650 return ApiError::InternalError.into_response(); 651 } 652 }; 653 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 654 Ok(m) => m, 655 Err(e) => { 656 error!("Failed to create refresh token: {:?}", e); 657 return ApiError::InternalError.into_response(); 658 } 659 }; 660 if let Err(e) = sqlx::query!( 661 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)", 662 row.did, 663 access_meta.jti, 664 refresh_meta.jti, 665 access_meta.expires_at, 666 refresh_meta.expires_at, 667 false, 668 false 669 ) 670 .execute(&state.db) 671 .await 672 { 673 error!("Failed to insert session: {:?}", e); 674 return ApiError::InternalError.into_response(); 675 } 676 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 677 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 678 warn!("Failed to enqueue welcome notification: {:?}", e); 679 } 680 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 681 let preferred_channel = match row.channel { 682 crate::comms::CommsChannel::Email => "email", 683 crate::comms::CommsChannel::Discord => "discord", 684 crate::comms::CommsChannel::Telegram => "telegram", 685 crate::comms::CommsChannel::Signal => "signal", 686 }; 687 Json(ConfirmSignupOutput { 688 access_jwt: access_meta.token, 689 refresh_jwt: refresh_meta.token, 690 handle: row.handle, 691 did: row.did, 692 email: row.email, 693 email_verified, 694 preferred_channel: preferred_channel.to_string(), 695 preferred_channel_verified: true, 696 }) 697 .into_response() 698} 699 700#[derive(Deserialize)] 701#[serde(rename_all = "camelCase")] 702pub struct ResendVerificationInput { 703 pub did: String, 704} 705 706pub async fn resend_verification( 707 State(state): State<AppState>, 708 Json(input): Json<ResendVerificationInput>, 709) -> Response { 710 info!("resend_verification called for DID: {}", input.did); 711 let row = match sqlx::query!( 712 r#"SELECT 713 id, handle, email, 714 preferred_comms_channel as "channel: crate::comms::CommsChannel", 715 discord_id, telegram_username, signal_number, 716 email_verified, discord_verified, telegram_verified, signal_verified 717 FROM users 718 WHERE did = $1"#, 719 input.did 720 ) 721 .fetch_optional(&state.db) 722 .await 723 { 724 Ok(Some(row)) => row, 725 Ok(None) => { 726 return ApiError::InvalidRequest("User not found".into()).into_response(); 727 } 728 Err(e) => { 729 error!("Database error in resend_verification: {:?}", e); 730 return ApiError::InternalError.into_response(); 731 } 732 }; 733 let is_verified = 734 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 735 if is_verified { 736 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 737 } 738 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 739 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 740 741 let (channel_str, recipient) = match row.channel { 742 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 743 crate::comms::CommsChannel::Discord => { 744 ("discord", row.discord_id.clone().unwrap_or_default()) 745 } 746 crate::comms::CommsChannel::Telegram => ( 747 "telegram", 748 row.telegram_username.clone().unwrap_or_default(), 749 ), 750 crate::comms::CommsChannel::Signal => { 751 ("signal", row.signal_number.clone().unwrap_or_default()) 752 } 753 }; 754 755 if let Err(e) = sqlx::query!( 756 r#" 757 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 758 VALUES ($1, $2::comms_channel, $3, $4, $5) 759 ON CONFLICT (user_id, channel) DO UPDATE 760 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW() 761 "#, 762 row.id, 763 channel_str as _, 764 verification_code, 765 recipient, 766 code_expires_at 767 ) 768 .execute(&state.db) 769 .await 770 { 771 error!("Failed to update verification code: {:?}", e); 772 return ApiError::InternalError.into_response(); 773 } 774 if let Err(e) = crate::comms::enqueue_signup_verification( 775 &state.db, 776 row.id, 777 channel_str, 778 &recipient, 779 &verification_code, 780 ) 781 .await 782 { 783 warn!("Failed to enqueue verification notification: {:?}", e); 784 } 785 Json(json!({"success": true})).into_response() 786} 787 788#[derive(Serialize)] 789#[serde(rename_all = "camelCase")] 790pub struct SessionInfo { 791 pub id: String, 792 pub session_type: String, 793 pub client_name: Option<String>, 794 pub created_at: String, 795 pub expires_at: String, 796 pub is_current: bool, 797} 798 799#[derive(Serialize)] 800#[serde(rename_all = "camelCase")] 801pub struct ListSessionsOutput { 802 pub sessions: Vec<SessionInfo>, 803} 804 805pub async fn list_sessions( 806 State(state): State<AppState>, 807 headers: HeaderMap, 808 auth: BearerAuth, 809) -> Response { 810 let current_jti = headers 811 .get("authorization") 812 .and_then(|v| v.to_str().ok()) 813 .and_then(|v| v.strip_prefix("Bearer ")) 814 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 815 816 let mut sessions: Vec<SessionInfo> = Vec::new(); 817 818 let jwt_result = sqlx::query_as::< 819 _, 820 ( 821 i32, 822 String, 823 chrono::DateTime<chrono::Utc>, 824 chrono::DateTime<chrono::Utc>, 825 ), 826 >( 827 r#" 828 SELECT id, access_jti, created_at, refresh_expires_at 829 FROM session_tokens 830 WHERE did = $1 AND refresh_expires_at > NOW() 831 ORDER BY created_at DESC 832 "#, 833 ) 834 .bind(&auth.0.did) 835 .fetch_all(&state.db) 836 .await; 837 838 match jwt_result { 839 Ok(rows) => { 840 for (id, access_jti, created_at, expires_at) in rows { 841 sessions.push(SessionInfo { 842 id: format!("jwt:{}", id), 843 session_type: "legacy".to_string(), 844 client_name: None, 845 created_at: created_at.to_rfc3339(), 846 expires_at: expires_at.to_rfc3339(), 847 is_current: current_jti.as_ref() == Some(&access_jti), 848 }); 849 } 850 } 851 Err(e) => { 852 error!("DB error fetching JWT sessions: {:?}", e); 853 return ( 854 StatusCode::INTERNAL_SERVER_ERROR, 855 Json(json!({"error": "InternalError"})), 856 ) 857 .into_response(); 858 } 859 } 860 861 let oauth_result = sqlx::query_as::< 862 _, 863 ( 864 i32, 865 String, 866 chrono::DateTime<chrono::Utc>, 867 chrono::DateTime<chrono::Utc>, 868 String, 869 ), 870 >( 871 r#" 872 SELECT id, token_id, created_at, expires_at, client_id 873 FROM oauth_token 874 WHERE did = $1 AND expires_at > NOW() 875 ORDER BY created_at DESC 876 "#, 877 ) 878 .bind(&auth.0.did) 879 .fetch_all(&state.db) 880 .await; 881 882 match oauth_result { 883 Ok(rows) => { 884 for (id, token_id, created_at, expires_at, client_id) in rows { 885 let client_name = extract_client_name(&client_id); 886 let is_current_oauth = auth.0.is_oauth 887 && current_jti.as_ref() == Some(&token_id); 888 sessions.push(SessionInfo { 889 id: format!("oauth:{}", id), 890 session_type: "oauth".to_string(), 891 client_name: Some(client_name), 892 created_at: created_at.to_rfc3339(), 893 expires_at: expires_at.to_rfc3339(), 894 is_current: is_current_oauth, 895 }); 896 } 897 } 898 Err(e) => { 899 error!("DB error fetching OAuth sessions: {:?}", e); 900 return ( 901 StatusCode::INTERNAL_SERVER_ERROR, 902 Json(json!({"error": "InternalError"})), 903 ) 904 .into_response(); 905 } 906 } 907 908 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 909 910 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 911} 912 913fn extract_client_name(client_id: &str) -> String { 914 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 915 "Localhost App".to_string() 916 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 917 parsed.host_str().unwrap_or("Unknown App").to_string() 918 } else { 919 client_id.to_string() 920 } 921} 922 923#[derive(Deserialize)] 924#[serde(rename_all = "camelCase")] 925pub struct RevokeSessionInput { 926 pub session_id: String, 927} 928 929pub async fn revoke_session( 930 State(state): State<AppState>, 931 auth: BearerAuth, 932 Json(input): Json<RevokeSessionInput>, 933) -> Response { 934 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 935 let session_id: i32 = match jwt_id.parse() { 936 Ok(id) => id, 937 Err(_) => { 938 return ( 939 StatusCode::BAD_REQUEST, 940 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 941 ) 942 .into_response(); 943 } 944 }; 945 let session = sqlx::query_as::<_, (String,)>( 946 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 947 ) 948 .bind(session_id) 949 .bind(&auth.0.did) 950 .fetch_optional(&state.db) 951 .await; 952 let access_jti = match session { 953 Ok(Some((jti,))) => jti, 954 Ok(None) => { 955 return ( 956 StatusCode::NOT_FOUND, 957 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 958 ) 959 .into_response(); 960 } 961 Err(e) => { 962 error!("DB error in revoke_session: {:?}", e); 963 return ( 964 StatusCode::INTERNAL_SERVER_ERROR, 965 Json(json!({"error": "InternalError"})), 966 ) 967 .into_response(); 968 } 969 }; 970 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 971 .bind(session_id) 972 .execute(&state.db) 973 .await 974 { 975 error!("DB error deleting session: {:?}", e); 976 return ( 977 StatusCode::INTERNAL_SERVER_ERROR, 978 Json(json!({"error": "InternalError"})), 979 ) 980 .into_response(); 981 } 982 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 983 if let Err(e) = state.cache.delete(&cache_key).await { 984 warn!("Failed to invalidate session cache: {:?}", e); 985 } 986 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 987 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 988 let session_id: i32 = match oauth_id.parse() { 989 Ok(id) => id, 990 Err(_) => { 991 return ( 992 StatusCode::BAD_REQUEST, 993 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 994 ) 995 .into_response(); 996 } 997 }; 998 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 999 .bind(session_id) 1000 .bind(&auth.0.did) 1001 .execute(&state.db) 1002 .await; 1003 match result { 1004 Ok(r) if r.rows_affected() == 0 => { 1005 return ( 1006 StatusCode::NOT_FOUND, 1007 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1008 ) 1009 .into_response(); 1010 } 1011 Err(e) => { 1012 error!("DB error deleting OAuth session: {:?}", e); 1013 return ( 1014 StatusCode::INTERNAL_SERVER_ERROR, 1015 Json(json!({"error": "InternalError"})), 1016 ) 1017 .into_response(); 1018 } 1019 _ => {} 1020 } 1021 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1022 } else { 1023 return ( 1024 StatusCode::BAD_REQUEST, 1025 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1026 ) 1027 .into_response(); 1028 } 1029 (StatusCode::OK, Json(json!({}))).into_response() 1030} 1031 1032pub async fn revoke_all_sessions( 1033 State(state): State<AppState>, 1034 headers: HeaderMap, 1035 auth: BearerAuth, 1036) -> Response { 1037 let current_jti = headers 1038 .get("authorization") 1039 .and_then(|v| v.to_str().ok()) 1040 .and_then(|v| v.strip_prefix("Bearer ")) 1041 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1042 1043 if let Some(ref jti) = current_jti { 1044 if auth.0.is_oauth { 1045 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1046 .bind(&auth.0.did) 1047 .execute(&state.db) 1048 .await 1049 { 1050 error!("DB error revoking JWT sessions: {:?}", e); 1051 return ( 1052 StatusCode::INTERNAL_SERVER_ERROR, 1053 Json(json!({"error": "InternalError"})), 1054 ) 1055 .into_response(); 1056 } 1057 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1058 .bind(&auth.0.did) 1059 .bind(jti) 1060 .execute(&state.db) 1061 .await 1062 { 1063 error!("DB error revoking OAuth sessions: {:?}", e); 1064 return ( 1065 StatusCode::INTERNAL_SERVER_ERROR, 1066 Json(json!({"error": "InternalError"})), 1067 ) 1068 .into_response(); 1069 } 1070 } else { 1071 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1072 .bind(&auth.0.did) 1073 .bind(jti) 1074 .execute(&state.db) 1075 .await 1076 { 1077 error!("DB error revoking JWT sessions: {:?}", e); 1078 return ( 1079 StatusCode::INTERNAL_SERVER_ERROR, 1080 Json(json!({"error": "InternalError"})), 1081 ) 1082 .into_response(); 1083 } 1084 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1085 .bind(&auth.0.did) 1086 .execute(&state.db) 1087 .await 1088 { 1089 error!("DB error revoking OAuth sessions: {:?}", e); 1090 return ( 1091 StatusCode::INTERNAL_SERVER_ERROR, 1092 Json(json!({"error": "InternalError"})), 1093 ) 1094 .into_response(); 1095 } 1096 } 1097 } else { 1098 return ( 1099 StatusCode::BAD_REQUEST, 1100 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1101 ) 1102 .into_response(); 1103 } 1104 1105 info!(did = %auth.0.did, "All other sessions revoked"); 1106 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1107} 1108 1109#[derive(Serialize)] 1110#[serde(rename_all = "camelCase")] 1111pub struct LegacyLoginPreferenceOutput { 1112 pub allow_legacy_login: bool, 1113 pub has_mfa: bool, 1114} 1115 1116pub async fn get_legacy_login_preference( 1117 State(state): State<AppState>, 1118 auth: BearerAuth, 1119) -> Response { 1120 let result = sqlx::query!( 1121 r#"SELECT 1122 u.allow_legacy_login, 1123 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1124 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1125 FROM users u WHERE u.did = $1"#, 1126 auth.0.did 1127 ) 1128 .fetch_optional(&state.db) 1129 .await; 1130 1131 match result { 1132 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1133 allow_legacy_login: row.allow_legacy_login, 1134 has_mfa: row.has_mfa, 1135 }) 1136 .into_response(), 1137 Ok(None) => ( 1138 StatusCode::NOT_FOUND, 1139 Json(json!({"error": "AccountNotFound"})), 1140 ) 1141 .into_response(), 1142 Err(e) => { 1143 error!("DB error: {:?}", e); 1144 ( 1145 StatusCode::INTERNAL_SERVER_ERROR, 1146 Json(json!({"error": "InternalError"})), 1147 ) 1148 .into_response() 1149 } 1150 } 1151} 1152 1153#[derive(Deserialize)] 1154#[serde(rename_all = "camelCase")] 1155pub struct UpdateLegacyLoginInput { 1156 pub allow_legacy_login: bool, 1157} 1158 1159pub async fn update_legacy_login_preference( 1160 State(state): State<AppState>, 1161 auth: BearerAuth, 1162 Json(input): Json<UpdateLegacyLoginInput>, 1163) -> Response { 1164 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1165 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1166 .await; 1167 } 1168 1169 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1170 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1171 } 1172 1173 let result = sqlx::query!( 1174 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1175 input.allow_legacy_login, 1176 auth.0.did 1177 ) 1178 .fetch_optional(&state.db) 1179 .await; 1180 1181 match result { 1182 Ok(Some(_)) => { 1183 info!( 1184 did = %auth.0.did, 1185 allow_legacy_login = input.allow_legacy_login, 1186 "Legacy login preference updated" 1187 ); 1188 Json(json!({ 1189 "allowLegacyLogin": input.allow_legacy_login 1190 })) 1191 .into_response() 1192 } 1193 Ok(None) => ( 1194 StatusCode::NOT_FOUND, 1195 Json(json!({"error": "AccountNotFound"})), 1196 ) 1197 .into_response(), 1198 Err(e) => { 1199 error!("DB error: {:?}", e); 1200 ( 1201 StatusCode::INTERNAL_SERVER_ERROR, 1202 Json(json!({"error": "InternalError"})), 1203 ) 1204 .into_response() 1205 } 1206 } 1207}