this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") 17 && let Ok(value) = forwarded.to_str() 18 && let Some(first_ip) = value.split(',').next() 19 { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() 24 { 25 return value.trim().to_string(); 26 } 27 "unknown".to_string() 28} 29 30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 31 let identifier = identifier.trim(); 32 if identifier.contains('@') || identifier.starts_with("did:") { 33 identifier.to_string() 34 } else if !identifier.contains('.') { 35 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 36 } else { 37 identifier.to_lowercase() 38 } 39} 40 41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 42 stored_handle.to_string() 43} 44 45#[derive(Deserialize)] 46pub struct CreateSessionInput { 47 pub identifier: String, 48 pub password: String, 49} 50 51#[derive(Serialize)] 52#[serde(rename_all = "camelCase")] 53pub struct CreateSessionOutput { 54 pub access_jwt: String, 55 pub refresh_jwt: String, 56 pub handle: String, 57 pub did: String, 58} 59 60pub async fn create_session( 61 State(state): State<AppState>, 62 headers: HeaderMap, 63 Json(input): Json<CreateSessionInput>, 64) -> Response { 65 info!( 66 "create_session called with identifier: {}", 67 input.identifier 68 ); 69 let client_ip = extract_client_ip(&headers); 70 if !state 71 .check_rate_limit(RateLimitKind::Login, &client_ip) 72 .await 73 { 74 warn!(ip = %client_ip, "Login rate limit exceeded"); 75 return ( 76 StatusCode::TOO_MANY_REQUESTS, 77 Json(json!({ 78 "error": "RateLimitExceeded", 79 "message": "Too many login attempts. Please try again later." 80 })), 81 ) 82 .into_response(); 83 } 84 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 85 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 86 info!( 87 "Normalized identifier: {} -> {}", 88 input.identifier, normalized_identifier 89 ); 90 let row = match sqlx::query!( 91 r#"SELECT 92 u.id, u.did, u.handle, u.password_hash, 93 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 94 u.allow_legacy_login, 95 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 96 k.key_bytes, k.encryption_version, 97 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 98 FROM users u 99 JOIN user_keys k ON u.id = k.user_id 100 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 101 normalized_identifier 102 ) 103 .fetch_optional(&state.db) 104 .await 105 { 106 Ok(Some(row)) => row, 107 Ok(None) => { 108 let _ = verify( 109 &input.password, 110 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 111 ); 112 warn!("User not found for login attempt"); 113 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 114 .into_response(); 115 } 116 Err(e) => { 117 error!("Database error fetching user: {:?}", e); 118 return ApiError::InternalError.into_response(); 119 } 120 }; 121 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 122 Ok(k) => k, 123 Err(e) => { 124 error!("Failed to decrypt user key: {:?}", e); 125 return ApiError::InternalError.into_response(); 126 } 127 }; 128 let (password_valid, app_password_scopes) = if row 129 .password_hash 130 .as_ref() 131 .map(|h| verify(&input.password, h).unwrap_or(false)) 132 .unwrap_or(false) 133 { 134 (true, None) 135 } else { 136 let app_passwords = sqlx::query!( 137 "SELECT password_hash, scopes FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 138 row.id 139 ) 140 .fetch_all(&state.db) 141 .await 142 .unwrap_or_default(); 143 let matched = app_passwords 144 .iter() 145 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 146 match matched { 147 Some(app) => (true, app.scopes.clone()), 148 None => (false, None), 149 } 150 }; 151 if !password_valid { 152 warn!("Password verification failed for login attempt"); 153 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 154 .into_response(); 155 } 156 let is_verified = 157 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 158 if !is_verified { 159 warn!("Login attempt for unverified account: {}", row.did); 160 return ( 161 StatusCode::FORBIDDEN, 162 Json(json!({ 163 "error": "AccountNotVerified", 164 "message": "Please verify your account before logging in", 165 "did": row.did 166 })), 167 ) 168 .into_response(); 169 } 170 let has_totp = row.totp_enabled.unwrap_or(false); 171 let is_legacy_login = has_totp; 172 if has_totp && !row.allow_legacy_login { 173 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 174 return ( 175 StatusCode::FORBIDDEN, 176 Json(json!({ 177 "error": "MfaRequired", 178 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 179 "did": row.did 180 })), 181 ) 182 .into_response(); 183 } 184 let access_meta = match crate::auth::create_access_token_with_scope_metadata( 185 &row.did, 186 &key_bytes, 187 app_password_scopes.as_deref(), 188 ) { 189 Ok(m) => m, 190 Err(e) => { 191 error!("Failed to create access token: {:?}", e); 192 return ApiError::InternalError.into_response(); 193 } 194 }; 195 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 196 Ok(m) => m, 197 Err(e) => { 198 error!("Failed to create refresh token: {:?}", e); 199 return ApiError::InternalError.into_response(); 200 } 201 }; 202 if let Err(e) = sqlx::query!( 203 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 204 row.did, 205 access_meta.jti, 206 refresh_meta.jti, 207 access_meta.expires_at, 208 refresh_meta.expires_at, 209 is_legacy_login, 210 false, 211 app_password_scopes 212 ) 213 .execute(&state.db) 214 .await 215 { 216 error!("Failed to insert session: {:?}", e); 217 return ApiError::InternalError.into_response(); 218 } 219 if is_legacy_login { 220 warn!( 221 did = %row.did, 222 ip = %client_ip, 223 "Legacy login on TOTP-enabled account - sending notification" 224 ); 225 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 226 if let Err(e) = crate::comms::queue_legacy_login_notification( 227 &state.db, 228 row.id, 229 &hostname, 230 &client_ip, 231 row.preferred_comms_channel, 232 ) 233 .await 234 { 235 error!("Failed to queue legacy login notification: {:?}", e); 236 } 237 } 238 let handle = full_handle(&row.handle, &pds_hostname); 239 Json(CreateSessionOutput { 240 access_jwt: access_meta.token, 241 refresh_jwt: refresh_meta.token, 242 handle, 243 did: row.did, 244 }) 245 .into_response() 246} 247 248pub async fn get_session( 249 State(state): State<AppState>, 250 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 251) -> Response { 252 let permissions = auth_user.permissions(); 253 let can_read_email = permissions.allows_email_read(); 254 255 match sqlx::query!( 256 r#"SELECT 257 handle, email, email_verified, is_admin, deactivated_at, preferred_locale, 258 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 259 discord_verified, telegram_verified, signal_verified 260 FROM users WHERE did = $1"#, 261 auth_user.did 262 ) 263 .fetch_optional(&state.db) 264 .await 265 { 266 Ok(Some(row)) => { 267 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 268 crate::comms::CommsChannel::Email => ("email", row.email_verified), 269 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 270 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 271 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 272 }; 273 let pds_hostname = 274 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 275 let handle = full_handle(&row.handle, &pds_hostname); 276 let is_active = row.deactivated_at.is_none(); 277 let email_value = if can_read_email { 278 row.email.clone() 279 } else { 280 None 281 }; 282 let email_verified_value = can_read_email && row.email_verified; 283 Json(json!({ 284 "handle": handle, 285 "did": auth_user.did, 286 "email": email_value, 287 "emailVerified": email_verified_value, 288 "preferredChannel": preferred_channel, 289 "preferredChannelVerified": preferred_channel_verified, 290 "preferredLocale": row.preferred_locale, 291 "isAdmin": row.is_admin, 292 "active": is_active, 293 "status": if is_active { "active" } else { "deactivated" }, 294 "didDoc": {} 295 })) 296 .into_response() 297 } 298 Ok(None) => ApiError::AuthenticationFailed.into_response(), 299 Err(e) => { 300 error!("Database error in get_session: {:?}", e); 301 ApiError::InternalError.into_response() 302 } 303 } 304} 305 306pub async fn delete_session( 307 State(state): State<AppState>, 308 headers: axum::http::HeaderMap, 309) -> Response { 310 let token = match crate::auth::extract_bearer_token_from_header( 311 headers.get("Authorization").and_then(|h| h.to_str().ok()), 312 ) { 313 Some(t) => t, 314 None => return ApiError::AuthenticationRequired.into_response(), 315 }; 316 let jti = match crate::auth::get_jti_from_token(&token) { 317 Ok(jti) => jti, 318 Err(_) => return ApiError::AuthenticationFailed.into_response(), 319 }; 320 let did = crate::auth::get_did_from_token(&token).ok(); 321 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 322 .execute(&state.db) 323 .await 324 { 325 Ok(res) if res.rows_affected() > 0 => { 326 if let Some(did) = did { 327 let session_cache_key = format!("auth:session:{}:{}", did, jti); 328 let _ = state.cache.delete(&session_cache_key).await; 329 } 330 Json(json!({})).into_response() 331 } 332 Ok(_) => ApiError::AuthenticationFailed.into_response(), 333 Err(e) => { 334 error!("Database error in delete_session: {:?}", e); 335 ApiError::AuthenticationFailed.into_response() 336 } 337 } 338} 339 340pub async fn refresh_session( 341 State(state): State<AppState>, 342 headers: axum::http::HeaderMap, 343) -> Response { 344 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 345 if !state 346 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 347 .await 348 { 349 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 350 return ( 351 axum::http::StatusCode::TOO_MANY_REQUESTS, 352 axum::Json(serde_json::json!({ 353 "error": "RateLimitExceeded", 354 "message": "Too many requests. Please try again later." 355 })), 356 ) 357 .into_response(); 358 } 359 let refresh_token = match crate::auth::extract_bearer_token_from_header( 360 headers.get("Authorization").and_then(|h| h.to_str().ok()), 361 ) { 362 Some(t) => t, 363 None => return ApiError::AuthenticationRequired.into_response(), 364 }; 365 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 366 Ok(jti) => jti, 367 Err(_) => { 368 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 369 .into_response(); 370 } 371 }; 372 let mut tx = match state.db.begin().await { 373 Ok(tx) => tx, 374 Err(e) => { 375 error!("Failed to begin transaction: {:?}", e); 376 return ApiError::InternalError.into_response(); 377 } 378 }; 379 if let Ok(Some(session_id)) = sqlx::query_scalar!( 380 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 381 refresh_jti 382 ) 383 .fetch_optional(&mut *tx) 384 .await 385 { 386 warn!( 387 "Refresh token reuse detected! Revoking token family for session_id: {}", 388 session_id 389 ); 390 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 391 .execute(&mut *tx) 392 .await; 393 let _ = tx.commit().await; 394 return ApiError::ExpiredTokenMsg( 395 "Refresh token has been revoked due to suspected compromise".into(), 396 ) 397 .into_response(); 398 } 399 let session_row = match sqlx::query!( 400 r#"SELECT st.id, st.did, st.scope, k.key_bytes, k.encryption_version 401 FROM session_tokens st 402 JOIN users u ON st.did = u.did 403 JOIN user_keys k ON u.id = k.user_id 404 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 405 FOR UPDATE OF st"#, 406 refresh_jti 407 ) 408 .fetch_optional(&mut *tx) 409 .await 410 { 411 Ok(Some(row)) => row, 412 Ok(None) => { 413 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 414 .into_response(); 415 } 416 Err(e) => { 417 error!("Database error fetching session: {:?}", e); 418 return ApiError::InternalError.into_response(); 419 } 420 }; 421 let key_bytes = 422 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 423 Ok(k) => k, 424 Err(e) => { 425 error!("Failed to decrypt user key: {:?}", e); 426 return ApiError::InternalError.into_response(); 427 } 428 }; 429 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 430 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 431 } 432 let new_access_meta = match crate::auth::create_access_token_with_scope_metadata( 433 &session_row.did, 434 &key_bytes, 435 session_row.scope.as_deref(), 436 ) { 437 Ok(m) => m, 438 Err(e) => { 439 error!("Failed to create access token: {:?}", e); 440 return ApiError::InternalError.into_response(); 441 } 442 }; 443 let new_refresh_meta = 444 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 445 Ok(m) => m, 446 Err(e) => { 447 error!("Failed to create refresh token: {:?}", e); 448 return ApiError::InternalError.into_response(); 449 } 450 }; 451 match sqlx::query!( 452 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 453 refresh_jti, 454 session_row.id 455 ) 456 .execute(&mut *tx) 457 .await 458 { 459 Ok(result) if result.rows_affected() == 0 => { 460 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 461 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 462 .execute(&mut *tx) 463 .await; 464 let _ = tx.commit().await; 465 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 466 } 467 Err(e) => { 468 error!("Failed to record used refresh token: {:?}", e); 469 return ApiError::InternalError.into_response(); 470 } 471 Ok(_) => {} 472 } 473 if let Err(e) = sqlx::query!( 474 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 475 new_access_meta.jti, 476 new_refresh_meta.jti, 477 new_access_meta.expires_at, 478 new_refresh_meta.expires_at, 479 session_row.id 480 ) 481 .execute(&mut *tx) 482 .await 483 { 484 error!("Database error updating session: {:?}", e); 485 return ApiError::InternalError.into_response(); 486 } 487 if let Err(e) = tx.commit().await { 488 error!("Failed to commit transaction: {:?}", e); 489 return ApiError::InternalError.into_response(); 490 } 491 match sqlx::query!( 492 r#"SELECT 493 handle, email, email_verified, is_admin, preferred_locale, 494 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 495 discord_verified, telegram_verified, signal_verified 496 FROM users WHERE did = $1"#, 497 session_row.did 498 ) 499 .fetch_optional(&state.db) 500 .await 501 { 502 Ok(Some(u)) => { 503 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 504 crate::comms::CommsChannel::Email => ("email", u.email_verified), 505 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 506 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 507 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 508 }; 509 let pds_hostname = 510 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 511 let handle = full_handle(&u.handle, &pds_hostname); 512 Json(json!({ 513 "accessJwt": new_access_meta.token, 514 "refreshJwt": new_refresh_meta.token, 515 "handle": handle, 516 "did": session_row.did, 517 "email": u.email, 518 "emailVerified": u.email_verified, 519 "preferredChannel": preferred_channel, 520 "preferredChannelVerified": preferred_channel_verified, 521 "preferredLocale": u.preferred_locale, 522 "isAdmin": u.is_admin, 523 "active": true 524 })) 525 .into_response() 526 } 527 Ok(None) => { 528 error!("User not found for existing session: {}", session_row.did); 529 ApiError::InternalError.into_response() 530 } 531 Err(e) => { 532 error!("Database error fetching user: {:?}", e); 533 ApiError::InternalError.into_response() 534 } 535 } 536} 537 538#[derive(Deserialize)] 539#[serde(rename_all = "camelCase")] 540pub struct ConfirmSignupInput { 541 pub did: String, 542 pub verification_code: String, 543} 544 545#[derive(Serialize)] 546#[serde(rename_all = "camelCase")] 547pub struct ConfirmSignupOutput { 548 pub access_jwt: String, 549 pub refresh_jwt: String, 550 pub handle: String, 551 pub did: String, 552 pub email: Option<String>, 553 pub email_verified: bool, 554 pub preferred_channel: String, 555 pub preferred_channel_verified: bool, 556} 557 558pub async fn confirm_signup( 559 State(state): State<AppState>, 560 Json(input): Json<ConfirmSignupInput>, 561) -> Response { 562 info!("confirm_signup called for DID: {}", input.did); 563 let row = match sqlx::query!( 564 r#"SELECT 565 u.id, u.did, u.handle, u.email, 566 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 567 u.discord_id, u.telegram_username, u.signal_number, 568 k.key_bytes, k.encryption_version 569 FROM users u 570 JOIN user_keys k ON u.id = k.user_id 571 WHERE u.did = $1"#, 572 input.did 573 ) 574 .fetch_optional(&state.db) 575 .await 576 { 577 Ok(Some(row)) => row, 578 Ok(None) => { 579 warn!("User not found for confirm_signup: {}", input.did); 580 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 581 .into_response(); 582 } 583 Err(e) => { 584 error!("Database error in confirm_signup: {:?}", e); 585 return ApiError::InternalError.into_response(); 586 } 587 }; 588 589 let (channel_str, identifier) = match row.channel { 590 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 591 crate::comms::CommsChannel::Discord => { 592 ("discord", row.discord_id.clone().unwrap_or_default()) 593 } 594 crate::comms::CommsChannel::Telegram => ( 595 "telegram", 596 row.telegram_username.clone().unwrap_or_default(), 597 ), 598 crate::comms::CommsChannel::Signal => { 599 ("signal", row.signal_number.clone().unwrap_or_default()) 600 } 601 }; 602 603 let normalized_token = 604 crate::auth::verification_token::normalize_token_input(&input.verification_code); 605 match crate::auth::verification_token::verify_signup_token( 606 &normalized_token, 607 channel_str, 608 &identifier, 609 ) { 610 Ok(token_data) => { 611 if token_data.did != input.did { 612 warn!( 613 "Token DID mismatch for confirm_signup: expected {}, got {}", 614 input.did, token_data.did 615 ); 616 return ApiError::InvalidRequest("Invalid verification code".into()) 617 .into_response(); 618 } 619 } 620 Err(crate::auth::verification_token::VerifyError::Expired) => { 621 warn!("Verification code expired for user: {}", input.did); 622 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 623 .into_response(); 624 } 625 Err(e) => { 626 warn!("Invalid verification code for user {}: {:?}", input.did, e); 627 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 628 } 629 } 630 631 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 632 Ok(k) => k, 633 Err(e) => { 634 error!("Failed to decrypt user key: {:?}", e); 635 return ApiError::InternalError.into_response(); 636 } 637 }; 638 let verified_column = match row.channel { 639 crate::comms::CommsChannel::Email => "email_verified", 640 crate::comms::CommsChannel::Discord => "discord_verified", 641 crate::comms::CommsChannel::Telegram => "telegram_verified", 642 crate::comms::CommsChannel::Signal => "signal_verified", 643 }; 644 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 645 if let Err(e) = sqlx::query(&update_query) 646 .bind(&input.did) 647 .execute(&state.db) 648 .await 649 { 650 error!("Failed to update verification status: {:?}", e); 651 return ApiError::InternalError.into_response(); 652 } 653 654 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 655 Ok(m) => m, 656 Err(e) => { 657 error!("Failed to create access token: {:?}", e); 658 return ApiError::InternalError.into_response(); 659 } 660 }; 661 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 662 Ok(m) => m, 663 Err(e) => { 664 error!("Failed to create refresh token: {:?}", e); 665 return ApiError::InternalError.into_response(); 666 } 667 }; 668 let no_scope: Option<String> = None; 669 if let Err(e) = sqlx::query!( 670 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 671 row.did, 672 access_meta.jti, 673 refresh_meta.jti, 674 access_meta.expires_at, 675 refresh_meta.expires_at, 676 false, 677 false, 678 no_scope 679 ) 680 .execute(&state.db) 681 .await 682 { 683 error!("Failed to insert session: {:?}", e); 684 return ApiError::InternalError.into_response(); 685 } 686 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 687 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 688 warn!("Failed to enqueue welcome notification: {:?}", e); 689 } 690 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 691 let preferred_channel = match row.channel { 692 crate::comms::CommsChannel::Email => "email", 693 crate::comms::CommsChannel::Discord => "discord", 694 crate::comms::CommsChannel::Telegram => "telegram", 695 crate::comms::CommsChannel::Signal => "signal", 696 }; 697 Json(ConfirmSignupOutput { 698 access_jwt: access_meta.token, 699 refresh_jwt: refresh_meta.token, 700 handle: row.handle, 701 did: row.did, 702 email: row.email, 703 email_verified, 704 preferred_channel: preferred_channel.to_string(), 705 preferred_channel_verified: true, 706 }) 707 .into_response() 708} 709 710#[derive(Deserialize)] 711#[serde(rename_all = "camelCase")] 712pub struct ResendVerificationInput { 713 pub did: String, 714} 715 716pub async fn resend_verification( 717 State(state): State<AppState>, 718 Json(input): Json<ResendVerificationInput>, 719) -> Response { 720 info!("resend_verification called for DID: {}", input.did); 721 let row = match sqlx::query!( 722 r#"SELECT 723 id, handle, email, 724 preferred_comms_channel as "channel: crate::comms::CommsChannel", 725 discord_id, telegram_username, signal_number, 726 email_verified, discord_verified, telegram_verified, signal_verified 727 FROM users 728 WHERE did = $1"#, 729 input.did 730 ) 731 .fetch_optional(&state.db) 732 .await 733 { 734 Ok(Some(row)) => row, 735 Ok(None) => { 736 return ApiError::InvalidRequest("User not found".into()).into_response(); 737 } 738 Err(e) => { 739 error!("Database error in resend_verification: {:?}", e); 740 return ApiError::InternalError.into_response(); 741 } 742 }; 743 let is_verified = 744 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 745 if is_verified { 746 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 747 } 748 749 let (channel_str, recipient) = match row.channel { 750 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 751 crate::comms::CommsChannel::Discord => { 752 ("discord", row.discord_id.clone().unwrap_or_default()) 753 } 754 crate::comms::CommsChannel::Telegram => ( 755 "telegram", 756 row.telegram_username.clone().unwrap_or_default(), 757 ), 758 crate::comms::CommsChannel::Signal => { 759 ("signal", row.signal_number.clone().unwrap_or_default()) 760 } 761 }; 762 763 let verification_token = 764 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 765 let formatted_token = 766 crate::auth::verification_token::format_token_for_display(&verification_token); 767 768 if let Err(e) = crate::comms::enqueue_signup_verification( 769 &state.db, 770 row.id, 771 channel_str, 772 &recipient, 773 &formatted_token, 774 None, 775 ) 776 .await 777 { 778 warn!("Failed to enqueue verification notification: {:?}", e); 779 } 780 Json(json!({"success": true})).into_response() 781} 782 783#[derive(Serialize)] 784#[serde(rename_all = "camelCase")] 785pub struct SessionInfo { 786 pub id: String, 787 pub session_type: String, 788 pub client_name: Option<String>, 789 pub created_at: String, 790 pub expires_at: String, 791 pub is_current: bool, 792} 793 794#[derive(Serialize)] 795#[serde(rename_all = "camelCase")] 796pub struct ListSessionsOutput { 797 pub sessions: Vec<SessionInfo>, 798} 799 800pub async fn list_sessions( 801 State(state): State<AppState>, 802 headers: HeaderMap, 803 auth: BearerAuth, 804) -> Response { 805 let current_jti = headers 806 .get("authorization") 807 .and_then(|v| v.to_str().ok()) 808 .and_then(|v| v.strip_prefix("Bearer ")) 809 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 810 811 let mut sessions: Vec<SessionInfo> = Vec::new(); 812 813 let jwt_result = sqlx::query_as::< 814 _, 815 ( 816 i32, 817 String, 818 chrono::DateTime<chrono::Utc>, 819 chrono::DateTime<chrono::Utc>, 820 ), 821 >( 822 r#" 823 SELECT id, access_jti, created_at, refresh_expires_at 824 FROM session_tokens 825 WHERE did = $1 AND refresh_expires_at > NOW() 826 ORDER BY created_at DESC 827 "#, 828 ) 829 .bind(&auth.0.did) 830 .fetch_all(&state.db) 831 .await; 832 833 match jwt_result { 834 Ok(rows) => { 835 for (id, access_jti, created_at, expires_at) in rows { 836 sessions.push(SessionInfo { 837 id: format!("jwt:{}", id), 838 session_type: "legacy".to_string(), 839 client_name: None, 840 created_at: created_at.to_rfc3339(), 841 expires_at: expires_at.to_rfc3339(), 842 is_current: current_jti.as_ref() == Some(&access_jti), 843 }); 844 } 845 } 846 Err(e) => { 847 error!("DB error fetching JWT sessions: {:?}", e); 848 return ( 849 StatusCode::INTERNAL_SERVER_ERROR, 850 Json(json!({"error": "InternalError"})), 851 ) 852 .into_response(); 853 } 854 } 855 856 let oauth_result = sqlx::query_as::< 857 _, 858 ( 859 i32, 860 String, 861 chrono::DateTime<chrono::Utc>, 862 chrono::DateTime<chrono::Utc>, 863 String, 864 ), 865 >( 866 r#" 867 SELECT id, token_id, created_at, expires_at, client_id 868 FROM oauth_token 869 WHERE did = $1 AND expires_at > NOW() 870 ORDER BY created_at DESC 871 "#, 872 ) 873 .bind(&auth.0.did) 874 .fetch_all(&state.db) 875 .await; 876 877 match oauth_result { 878 Ok(rows) => { 879 for (id, token_id, created_at, expires_at, client_id) in rows { 880 let client_name = extract_client_name(&client_id); 881 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 882 sessions.push(SessionInfo { 883 id: format!("oauth:{}", id), 884 session_type: "oauth".to_string(), 885 client_name: Some(client_name), 886 created_at: created_at.to_rfc3339(), 887 expires_at: expires_at.to_rfc3339(), 888 is_current: is_current_oauth, 889 }); 890 } 891 } 892 Err(e) => { 893 error!("DB error fetching OAuth sessions: {:?}", e); 894 return ( 895 StatusCode::INTERNAL_SERVER_ERROR, 896 Json(json!({"error": "InternalError"})), 897 ) 898 .into_response(); 899 } 900 } 901 902 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 903 904 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 905} 906 907fn extract_client_name(client_id: &str) -> String { 908 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 909 "Localhost App".to_string() 910 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 911 parsed.host_str().unwrap_or("Unknown App").to_string() 912 } else { 913 client_id.to_string() 914 } 915} 916 917#[derive(Deserialize)] 918#[serde(rename_all = "camelCase")] 919pub struct RevokeSessionInput { 920 pub session_id: String, 921} 922 923pub async fn revoke_session( 924 State(state): State<AppState>, 925 auth: BearerAuth, 926 Json(input): Json<RevokeSessionInput>, 927) -> Response { 928 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 929 let session_id: i32 = match jwt_id.parse() { 930 Ok(id) => id, 931 Err(_) => { 932 return ( 933 StatusCode::BAD_REQUEST, 934 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 935 ) 936 .into_response(); 937 } 938 }; 939 let session = sqlx::query_as::<_, (String,)>( 940 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 941 ) 942 .bind(session_id) 943 .bind(&auth.0.did) 944 .fetch_optional(&state.db) 945 .await; 946 let access_jti = match session { 947 Ok(Some((jti,))) => jti, 948 Ok(None) => { 949 return ( 950 StatusCode::NOT_FOUND, 951 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 952 ) 953 .into_response(); 954 } 955 Err(e) => { 956 error!("DB error in revoke_session: {:?}", e); 957 return ( 958 StatusCode::INTERNAL_SERVER_ERROR, 959 Json(json!({"error": "InternalError"})), 960 ) 961 .into_response(); 962 } 963 }; 964 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 965 .bind(session_id) 966 .execute(&state.db) 967 .await 968 { 969 error!("DB error deleting session: {:?}", e); 970 return ( 971 StatusCode::INTERNAL_SERVER_ERROR, 972 Json(json!({"error": "InternalError"})), 973 ) 974 .into_response(); 975 } 976 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 977 if let Err(e) = state.cache.delete(&cache_key).await { 978 warn!("Failed to invalidate session cache: {:?}", e); 979 } 980 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 981 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 982 let session_id: i32 = match oauth_id.parse() { 983 Ok(id) => id, 984 Err(_) => { 985 return ( 986 StatusCode::BAD_REQUEST, 987 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 988 ) 989 .into_response(); 990 } 991 }; 992 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 993 .bind(session_id) 994 .bind(&auth.0.did) 995 .execute(&state.db) 996 .await; 997 match result { 998 Ok(r) if r.rows_affected() == 0 => { 999 return ( 1000 StatusCode::NOT_FOUND, 1001 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1002 ) 1003 .into_response(); 1004 } 1005 Err(e) => { 1006 error!("DB error deleting OAuth session: {:?}", e); 1007 return ( 1008 StatusCode::INTERNAL_SERVER_ERROR, 1009 Json(json!({"error": "InternalError"})), 1010 ) 1011 .into_response(); 1012 } 1013 _ => {} 1014 } 1015 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1016 } else { 1017 return ( 1018 StatusCode::BAD_REQUEST, 1019 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1020 ) 1021 .into_response(); 1022 } 1023 (StatusCode::OK, Json(json!({}))).into_response() 1024} 1025 1026pub async fn revoke_all_sessions( 1027 State(state): State<AppState>, 1028 headers: HeaderMap, 1029 auth: BearerAuth, 1030) -> Response { 1031 let current_jti = headers 1032 .get("authorization") 1033 .and_then(|v| v.to_str().ok()) 1034 .and_then(|v| v.strip_prefix("Bearer ")) 1035 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1036 1037 if let Some(ref jti) = current_jti { 1038 if auth.0.is_oauth { 1039 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1040 .bind(&auth.0.did) 1041 .execute(&state.db) 1042 .await 1043 { 1044 error!("DB error revoking JWT sessions: {:?}", e); 1045 return ( 1046 StatusCode::INTERNAL_SERVER_ERROR, 1047 Json(json!({"error": "InternalError"})), 1048 ) 1049 .into_response(); 1050 } 1051 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1052 .bind(&auth.0.did) 1053 .bind(jti) 1054 .execute(&state.db) 1055 .await 1056 { 1057 error!("DB error revoking OAuth sessions: {:?}", e); 1058 return ( 1059 StatusCode::INTERNAL_SERVER_ERROR, 1060 Json(json!({"error": "InternalError"})), 1061 ) 1062 .into_response(); 1063 } 1064 } else { 1065 if let Err(e) = 1066 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1067 .bind(&auth.0.did) 1068 .bind(jti) 1069 .execute(&state.db) 1070 .await 1071 { 1072 error!("DB error revoking JWT sessions: {:?}", e); 1073 return ( 1074 StatusCode::INTERNAL_SERVER_ERROR, 1075 Json(json!({"error": "InternalError"})), 1076 ) 1077 .into_response(); 1078 } 1079 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1080 .bind(&auth.0.did) 1081 .execute(&state.db) 1082 .await 1083 { 1084 error!("DB error revoking OAuth sessions: {:?}", e); 1085 return ( 1086 StatusCode::INTERNAL_SERVER_ERROR, 1087 Json(json!({"error": "InternalError"})), 1088 ) 1089 .into_response(); 1090 } 1091 } 1092 } else { 1093 return ( 1094 StatusCode::BAD_REQUEST, 1095 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1096 ) 1097 .into_response(); 1098 } 1099 1100 info!(did = %auth.0.did, "All other sessions revoked"); 1101 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1102} 1103 1104#[derive(Serialize)] 1105#[serde(rename_all = "camelCase")] 1106pub struct LegacyLoginPreferenceOutput { 1107 pub allow_legacy_login: bool, 1108 pub has_mfa: bool, 1109} 1110 1111pub async fn get_legacy_login_preference( 1112 State(state): State<AppState>, 1113 auth: BearerAuth, 1114) -> Response { 1115 let result = sqlx::query!( 1116 r#"SELECT 1117 u.allow_legacy_login, 1118 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1119 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1120 FROM users u WHERE u.did = $1"#, 1121 auth.0.did 1122 ) 1123 .fetch_optional(&state.db) 1124 .await; 1125 1126 match result { 1127 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1128 allow_legacy_login: row.allow_legacy_login, 1129 has_mfa: row.has_mfa, 1130 }) 1131 .into_response(), 1132 Ok(None) => ( 1133 StatusCode::NOT_FOUND, 1134 Json(json!({"error": "AccountNotFound"})), 1135 ) 1136 .into_response(), 1137 Err(e) => { 1138 error!("DB error: {:?}", e); 1139 ( 1140 StatusCode::INTERNAL_SERVER_ERROR, 1141 Json(json!({"error": "InternalError"})), 1142 ) 1143 .into_response() 1144 } 1145 } 1146} 1147 1148#[derive(Deserialize)] 1149#[serde(rename_all = "camelCase")] 1150pub struct UpdateLegacyLoginInput { 1151 pub allow_legacy_login: bool, 1152} 1153 1154pub async fn update_legacy_login_preference( 1155 State(state): State<AppState>, 1156 auth: BearerAuth, 1157 Json(input): Json<UpdateLegacyLoginInput>, 1158) -> Response { 1159 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1160 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1161 .await; 1162 } 1163 1164 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1165 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1166 } 1167 1168 let result = sqlx::query!( 1169 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1170 input.allow_legacy_login, 1171 auth.0.did 1172 ) 1173 .fetch_optional(&state.db) 1174 .await; 1175 1176 match result { 1177 Ok(Some(_)) => { 1178 info!( 1179 did = %auth.0.did, 1180 allow_legacy_login = input.allow_legacy_login, 1181 "Legacy login preference updated" 1182 ); 1183 Json(json!({ 1184 "allowLegacyLogin": input.allow_legacy_login 1185 })) 1186 .into_response() 1187 } 1188 Ok(None) => ( 1189 StatusCode::NOT_FOUND, 1190 Json(json!({"error": "AccountNotFound"})), 1191 ) 1192 .into_response(), 1193 Err(e) => { 1194 error!("DB error: {:?}", e); 1195 ( 1196 StatusCode::INTERNAL_SERVER_ERROR, 1197 Json(json!({"error": "InternalError"})), 1198 ) 1199 .into_response() 1200 } 1201 } 1202} 1203 1204use crate::comms::locale::VALID_LOCALES; 1205 1206#[derive(Deserialize)] 1207#[serde(rename_all = "camelCase")] 1208pub struct UpdateLocaleInput { 1209 pub preferred_locale: String, 1210} 1211 1212pub async fn update_locale( 1213 State(state): State<AppState>, 1214 auth: BearerAuth, 1215 Json(input): Json<UpdateLocaleInput>, 1216) -> Response { 1217 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1218 return ( 1219 StatusCode::BAD_REQUEST, 1220 Json(json!({ 1221 "error": "InvalidRequest", 1222 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1223 })), 1224 ) 1225 .into_response(); 1226 } 1227 1228 let result = sqlx::query!( 1229 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1230 input.preferred_locale, 1231 auth.0.did 1232 ) 1233 .fetch_optional(&state.db) 1234 .await; 1235 1236 match result { 1237 Ok(Some(_)) => { 1238 info!( 1239 did = %auth.0.did, 1240 locale = %input.preferred_locale, 1241 "User locale preference updated" 1242 ); 1243 Json(json!({ 1244 "preferredLocale": input.preferred_locale 1245 })) 1246 .into_response() 1247 } 1248 Ok(None) => ( 1249 StatusCode::NOT_FOUND, 1250 Json(json!({"error": "AccountNotFound"})), 1251 ) 1252 .into_response(), 1253 Err(e) => { 1254 error!("DB error updating locale: {:?}", e); 1255 ( 1256 StatusCode::INTERNAL_SERVER_ERROR, 1257 Json(json!({"error": "InternalError"})), 1258 ) 1259 .into_response() 1260 } 1261 } 1262}