this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") 17 && let Ok(value) = forwarded.to_str() 18 && let Some(first_ip) = value.split(',').next() 19 { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() 24 { 25 return value.trim().to_string(); 26 } 27 "unknown".to_string() 28} 29 30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 31 let identifier = identifier.trim(); 32 if identifier.contains('@') || identifier.starts_with("did:") { 33 identifier.to_string() 34 } else if !identifier.contains('.') { 35 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 36 } else { 37 identifier.to_lowercase() 38 } 39} 40 41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 42 stored_handle.to_string() 43} 44 45#[derive(Deserialize)] 46pub struct CreateSessionInput { 47 pub identifier: String, 48 pub password: String, 49} 50 51#[derive(Serialize)] 52#[serde(rename_all = "camelCase")] 53pub struct CreateSessionOutput { 54 pub access_jwt: String, 55 pub refresh_jwt: String, 56 pub handle: String, 57 pub did: String, 58} 59 60pub async fn create_session( 61 State(state): State<AppState>, 62 headers: HeaderMap, 63 Json(input): Json<CreateSessionInput>, 64) -> Response { 65 info!( 66 "create_session called with identifier: {}", 67 input.identifier 68 ); 69 let client_ip = extract_client_ip(&headers); 70 if !state 71 .check_rate_limit(RateLimitKind::Login, &client_ip) 72 .await 73 { 74 warn!(ip = %client_ip, "Login rate limit exceeded"); 75 return ( 76 StatusCode::TOO_MANY_REQUESTS, 77 Json(json!({ 78 "error": "RateLimitExceeded", 79 "message": "Too many login attempts. Please try again later." 80 })), 81 ) 82 .into_response(); 83 } 84 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 85 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 86 info!( 87 "Normalized identifier: {} -> {}", 88 input.identifier, normalized_identifier 89 ); 90 let row = match sqlx::query!( 91 r#"SELECT 92 u.id, u.did, u.handle, u.password_hash, 93 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 94 u.allow_legacy_login, 95 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 96 k.key_bytes, k.encryption_version, 97 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 98 FROM users u 99 JOIN user_keys k ON u.id = k.user_id 100 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 101 normalized_identifier 102 ) 103 .fetch_optional(&state.db) 104 .await 105 { 106 Ok(Some(row)) => row, 107 Ok(None) => { 108 let _ = verify( 109 &input.password, 110 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 111 ); 112 warn!("User not found for login attempt"); 113 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 114 .into_response(); 115 } 116 Err(e) => { 117 error!("Database error fetching user: {:?}", e); 118 return ApiError::InternalError.into_response(); 119 } 120 }; 121 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 122 Ok(k) => k, 123 Err(e) => { 124 error!("Failed to decrypt user key: {:?}", e); 125 return ApiError::InternalError.into_response(); 126 } 127 }; 128 let (password_valid, app_password_scopes, app_password_controller) = if row 129 .password_hash 130 .as_ref() 131 .map(|h| verify(&input.password, h).unwrap_or(false)) 132 .unwrap_or(false) 133 { 134 (true, None, None) 135 } else { 136 let app_passwords = sqlx::query!( 137 "SELECT password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 138 row.id 139 ) 140 .fetch_all(&state.db) 141 .await 142 .unwrap_or_default(); 143 let matched = app_passwords 144 .iter() 145 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 146 match matched { 147 Some(app) => ( 148 true, 149 app.scopes.clone(), 150 app.created_by_controller_did.clone(), 151 ), 152 None => (false, None, None), 153 } 154 }; 155 if !password_valid { 156 warn!("Password verification failed for login attempt"); 157 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 158 .into_response(); 159 } 160 let is_verified = 161 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 162 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 163 .await 164 .unwrap_or(false); 165 if !is_verified && !is_delegated { 166 warn!("Login attempt for unverified account: {}", row.did); 167 return ( 168 StatusCode::FORBIDDEN, 169 Json(json!({ 170 "error": "AccountNotVerified", 171 "message": "Please verify your account before logging in", 172 "did": row.did 173 })), 174 ) 175 .into_response(); 176 } 177 let has_totp = row.totp_enabled.unwrap_or(false); 178 let is_legacy_login = has_totp; 179 if has_totp && !row.allow_legacy_login { 180 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 181 return ( 182 StatusCode::FORBIDDEN, 183 Json(json!({ 184 "error": "MfaRequired", 185 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 186 "did": row.did 187 })), 188 ) 189 .into_response(); 190 } 191 let access_meta = match crate::auth::create_access_token_with_delegation( 192 &row.did, 193 &key_bytes, 194 app_password_scopes.as_deref(), 195 app_password_controller.as_deref(), 196 ) { 197 Ok(m) => m, 198 Err(e) => { 199 error!("Failed to create access token: {:?}", e); 200 return ApiError::InternalError.into_response(); 201 } 202 }; 203 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 204 Ok(m) => m, 205 Err(e) => { 206 error!("Failed to create refresh token: {:?}", e); 207 return ApiError::InternalError.into_response(); 208 } 209 }; 210 if let Err(e) = sqlx::query!( 211 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 212 row.did, 213 access_meta.jti, 214 refresh_meta.jti, 215 access_meta.expires_at, 216 refresh_meta.expires_at, 217 is_legacy_login, 218 false, 219 app_password_scopes, 220 app_password_controller 221 ) 222 .execute(&state.db) 223 .await 224 { 225 error!("Failed to insert session: {:?}", e); 226 return ApiError::InternalError.into_response(); 227 } 228 if is_legacy_login { 229 warn!( 230 did = %row.did, 231 ip = %client_ip, 232 "Legacy login on TOTP-enabled account - sending notification" 233 ); 234 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 235 if let Err(e) = crate::comms::queue_legacy_login_notification( 236 &state.db, 237 row.id, 238 &hostname, 239 &client_ip, 240 row.preferred_comms_channel, 241 ) 242 .await 243 { 244 error!("Failed to queue legacy login notification: {:?}", e); 245 } 246 } 247 let handle = full_handle(&row.handle, &pds_hostname); 248 Json(CreateSessionOutput { 249 access_jwt: access_meta.token, 250 refresh_jwt: refresh_meta.token, 251 handle, 252 did: row.did, 253 }) 254 .into_response() 255} 256 257pub async fn get_session( 258 State(state): State<AppState>, 259 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 260) -> Response { 261 let permissions = auth_user.permissions(); 262 let can_read_email = permissions.allows_email_read(); 263 264 match sqlx::query!( 265 r#"SELECT 266 handle, email, email_verified, is_admin, deactivated_at, preferred_locale, 267 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 268 discord_verified, telegram_verified, signal_verified 269 FROM users WHERE did = $1"#, 270 auth_user.did 271 ) 272 .fetch_optional(&state.db) 273 .await 274 { 275 Ok(Some(row)) => { 276 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 277 crate::comms::CommsChannel::Email => ("email", row.email_verified), 278 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 279 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 280 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 281 }; 282 let pds_hostname = 283 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 284 let handle = full_handle(&row.handle, &pds_hostname); 285 let is_active = row.deactivated_at.is_none(); 286 let email_value = if can_read_email { 287 row.email.clone() 288 } else { 289 None 290 }; 291 let email_verified_value = can_read_email && row.email_verified; 292 Json(json!({ 293 "handle": handle, 294 "did": auth_user.did, 295 "email": email_value, 296 "emailVerified": email_verified_value, 297 "preferredChannel": preferred_channel, 298 "preferredChannelVerified": preferred_channel_verified, 299 "preferredLocale": row.preferred_locale, 300 "isAdmin": row.is_admin, 301 "active": is_active, 302 "status": if is_active { "active" } else { "deactivated" }, 303 "didDoc": {} 304 })) 305 .into_response() 306 } 307 Ok(None) => ApiError::AuthenticationFailed.into_response(), 308 Err(e) => { 309 error!("Database error in get_session: {:?}", e); 310 ApiError::InternalError.into_response() 311 } 312 } 313} 314 315pub async fn delete_session( 316 State(state): State<AppState>, 317 headers: axum::http::HeaderMap, 318) -> Response { 319 let token = match crate::auth::extract_bearer_token_from_header( 320 headers.get("Authorization").and_then(|h| h.to_str().ok()), 321 ) { 322 Some(t) => t, 323 None => return ApiError::AuthenticationRequired.into_response(), 324 }; 325 let jti = match crate::auth::get_jti_from_token(&token) { 326 Ok(jti) => jti, 327 Err(_) => return ApiError::AuthenticationFailed.into_response(), 328 }; 329 let did = crate::auth::get_did_from_token(&token).ok(); 330 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 331 .execute(&state.db) 332 .await 333 { 334 Ok(res) if res.rows_affected() > 0 => { 335 if let Some(did) = did { 336 let session_cache_key = format!("auth:session:{}:{}", did, jti); 337 let _ = state.cache.delete(&session_cache_key).await; 338 } 339 Json(json!({})).into_response() 340 } 341 Ok(_) => ApiError::AuthenticationFailed.into_response(), 342 Err(e) => { 343 error!("Database error in delete_session: {:?}", e); 344 ApiError::AuthenticationFailed.into_response() 345 } 346 } 347} 348 349pub async fn refresh_session( 350 State(state): State<AppState>, 351 headers: axum::http::HeaderMap, 352) -> Response { 353 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 354 if !state 355 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 356 .await 357 { 358 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 359 return ( 360 axum::http::StatusCode::TOO_MANY_REQUESTS, 361 axum::Json(serde_json::json!({ 362 "error": "RateLimitExceeded", 363 "message": "Too many requests. Please try again later." 364 })), 365 ) 366 .into_response(); 367 } 368 let refresh_token = match crate::auth::extract_bearer_token_from_header( 369 headers.get("Authorization").and_then(|h| h.to_str().ok()), 370 ) { 371 Some(t) => t, 372 None => return ApiError::AuthenticationRequired.into_response(), 373 }; 374 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 375 Ok(jti) => jti, 376 Err(_) => { 377 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 378 .into_response(); 379 } 380 }; 381 let mut tx = match state.db.begin().await { 382 Ok(tx) => tx, 383 Err(e) => { 384 error!("Failed to begin transaction: {:?}", e); 385 return ApiError::InternalError.into_response(); 386 } 387 }; 388 if let Ok(Some(session_id)) = sqlx::query_scalar!( 389 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 390 refresh_jti 391 ) 392 .fetch_optional(&mut *tx) 393 .await 394 { 395 warn!( 396 "Refresh token reuse detected! Revoking token family for session_id: {}", 397 session_id 398 ); 399 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 400 .execute(&mut *tx) 401 .await; 402 let _ = tx.commit().await; 403 return ApiError::ExpiredTokenMsg( 404 "Refresh token has been revoked due to suspected compromise".into(), 405 ) 406 .into_response(); 407 } 408 let session_row = match sqlx::query!( 409 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 410 FROM session_tokens st 411 JOIN users u ON st.did = u.did 412 JOIN user_keys k ON u.id = k.user_id 413 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 414 FOR UPDATE OF st"#, 415 refresh_jti 416 ) 417 .fetch_optional(&mut *tx) 418 .await 419 { 420 Ok(Some(row)) => row, 421 Ok(None) => { 422 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 423 .into_response(); 424 } 425 Err(e) => { 426 error!("Database error fetching session: {:?}", e); 427 return ApiError::InternalError.into_response(); 428 } 429 }; 430 let key_bytes = 431 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 432 Ok(k) => k, 433 Err(e) => { 434 error!("Failed to decrypt user key: {:?}", e); 435 return ApiError::InternalError.into_response(); 436 } 437 }; 438 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 439 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 440 } 441 let new_access_meta = match crate::auth::create_access_token_with_delegation( 442 &session_row.did, 443 &key_bytes, 444 session_row.scope.as_deref(), 445 session_row.controller_did.as_deref(), 446 ) { 447 Ok(m) => m, 448 Err(e) => { 449 error!("Failed to create access token: {:?}", e); 450 return ApiError::InternalError.into_response(); 451 } 452 }; 453 let new_refresh_meta = 454 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 455 Ok(m) => m, 456 Err(e) => { 457 error!("Failed to create refresh token: {:?}", e); 458 return ApiError::InternalError.into_response(); 459 } 460 }; 461 match sqlx::query!( 462 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 463 refresh_jti, 464 session_row.id 465 ) 466 .execute(&mut *tx) 467 .await 468 { 469 Ok(result) if result.rows_affected() == 0 => { 470 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 471 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 472 .execute(&mut *tx) 473 .await; 474 let _ = tx.commit().await; 475 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 476 } 477 Err(e) => { 478 error!("Failed to record used refresh token: {:?}", e); 479 return ApiError::InternalError.into_response(); 480 } 481 Ok(_) => {} 482 } 483 if let Err(e) = sqlx::query!( 484 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 485 new_access_meta.jti, 486 new_refresh_meta.jti, 487 new_access_meta.expires_at, 488 new_refresh_meta.expires_at, 489 session_row.id 490 ) 491 .execute(&mut *tx) 492 .await 493 { 494 error!("Database error updating session: {:?}", e); 495 return ApiError::InternalError.into_response(); 496 } 497 if let Err(e) = tx.commit().await { 498 error!("Failed to commit transaction: {:?}", e); 499 return ApiError::InternalError.into_response(); 500 } 501 match sqlx::query!( 502 r#"SELECT 503 handle, email, email_verified, is_admin, preferred_locale, 504 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 505 discord_verified, telegram_verified, signal_verified 506 FROM users WHERE did = $1"#, 507 session_row.did 508 ) 509 .fetch_optional(&state.db) 510 .await 511 { 512 Ok(Some(u)) => { 513 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 514 crate::comms::CommsChannel::Email => ("email", u.email_verified), 515 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 516 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 517 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 518 }; 519 let pds_hostname = 520 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 521 let handle = full_handle(&u.handle, &pds_hostname); 522 Json(json!({ 523 "accessJwt": new_access_meta.token, 524 "refreshJwt": new_refresh_meta.token, 525 "handle": handle, 526 "did": session_row.did, 527 "email": u.email, 528 "emailVerified": u.email_verified, 529 "preferredChannel": preferred_channel, 530 "preferredChannelVerified": preferred_channel_verified, 531 "preferredLocale": u.preferred_locale, 532 "isAdmin": u.is_admin, 533 "active": true 534 })) 535 .into_response() 536 } 537 Ok(None) => { 538 error!("User not found for existing session: {}", session_row.did); 539 ApiError::InternalError.into_response() 540 } 541 Err(e) => { 542 error!("Database error fetching user: {:?}", e); 543 ApiError::InternalError.into_response() 544 } 545 } 546} 547 548#[derive(Deserialize)] 549#[serde(rename_all = "camelCase")] 550pub struct ConfirmSignupInput { 551 pub did: String, 552 pub verification_code: String, 553} 554 555#[derive(Serialize)] 556#[serde(rename_all = "camelCase")] 557pub struct ConfirmSignupOutput { 558 pub access_jwt: String, 559 pub refresh_jwt: String, 560 pub handle: String, 561 pub did: String, 562 pub email: Option<String>, 563 pub email_verified: bool, 564 pub preferred_channel: String, 565 pub preferred_channel_verified: bool, 566} 567 568pub async fn confirm_signup( 569 State(state): State<AppState>, 570 Json(input): Json<ConfirmSignupInput>, 571) -> Response { 572 info!("confirm_signup called for DID: {}", input.did); 573 let row = match sqlx::query!( 574 r#"SELECT 575 u.id, u.did, u.handle, u.email, 576 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 577 u.discord_id, u.telegram_username, u.signal_number, 578 k.key_bytes, k.encryption_version 579 FROM users u 580 JOIN user_keys k ON u.id = k.user_id 581 WHERE u.did = $1"#, 582 input.did 583 ) 584 .fetch_optional(&state.db) 585 .await 586 { 587 Ok(Some(row)) => row, 588 Ok(None) => { 589 warn!("User not found for confirm_signup: {}", input.did); 590 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 591 .into_response(); 592 } 593 Err(e) => { 594 error!("Database error in confirm_signup: {:?}", e); 595 return ApiError::InternalError.into_response(); 596 } 597 }; 598 599 let (channel_str, identifier) = match row.channel { 600 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 601 crate::comms::CommsChannel::Discord => { 602 ("discord", row.discord_id.clone().unwrap_or_default()) 603 } 604 crate::comms::CommsChannel::Telegram => ( 605 "telegram", 606 row.telegram_username.clone().unwrap_or_default(), 607 ), 608 crate::comms::CommsChannel::Signal => { 609 ("signal", row.signal_number.clone().unwrap_or_default()) 610 } 611 }; 612 613 let normalized_token = 614 crate::auth::verification_token::normalize_token_input(&input.verification_code); 615 match crate::auth::verification_token::verify_signup_token( 616 &normalized_token, 617 channel_str, 618 &identifier, 619 ) { 620 Ok(token_data) => { 621 if token_data.did != input.did { 622 warn!( 623 "Token DID mismatch for confirm_signup: expected {}, got {}", 624 input.did, token_data.did 625 ); 626 return ApiError::InvalidRequest("Invalid verification code".into()) 627 .into_response(); 628 } 629 } 630 Err(crate::auth::verification_token::VerifyError::Expired) => { 631 warn!("Verification code expired for user: {}", input.did); 632 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 633 .into_response(); 634 } 635 Err(e) => { 636 warn!("Invalid verification code for user {}: {:?}", input.did, e); 637 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 638 } 639 } 640 641 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 642 Ok(k) => k, 643 Err(e) => { 644 error!("Failed to decrypt user key: {:?}", e); 645 return ApiError::InternalError.into_response(); 646 } 647 }; 648 let verified_column = match row.channel { 649 crate::comms::CommsChannel::Email => "email_verified", 650 crate::comms::CommsChannel::Discord => "discord_verified", 651 crate::comms::CommsChannel::Telegram => "telegram_verified", 652 crate::comms::CommsChannel::Signal => "signal_verified", 653 }; 654 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 655 if let Err(e) = sqlx::query(&update_query) 656 .bind(&input.did) 657 .execute(&state.db) 658 .await 659 { 660 error!("Failed to update verification status: {:?}", e); 661 return ApiError::InternalError.into_response(); 662 } 663 664 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 665 Ok(m) => m, 666 Err(e) => { 667 error!("Failed to create access token: {:?}", e); 668 return ApiError::InternalError.into_response(); 669 } 670 }; 671 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 672 Ok(m) => m, 673 Err(e) => { 674 error!("Failed to create refresh token: {:?}", e); 675 return ApiError::InternalError.into_response(); 676 } 677 }; 678 let no_scope: Option<String> = None; 679 if let Err(e) = sqlx::query!( 680 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 681 row.did, 682 access_meta.jti, 683 refresh_meta.jti, 684 access_meta.expires_at, 685 refresh_meta.expires_at, 686 false, 687 false, 688 no_scope 689 ) 690 .execute(&state.db) 691 .await 692 { 693 error!("Failed to insert session: {:?}", e); 694 return ApiError::InternalError.into_response(); 695 } 696 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 697 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 698 warn!("Failed to enqueue welcome notification: {:?}", e); 699 } 700 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 701 let preferred_channel = match row.channel { 702 crate::comms::CommsChannel::Email => "email", 703 crate::comms::CommsChannel::Discord => "discord", 704 crate::comms::CommsChannel::Telegram => "telegram", 705 crate::comms::CommsChannel::Signal => "signal", 706 }; 707 Json(ConfirmSignupOutput { 708 access_jwt: access_meta.token, 709 refresh_jwt: refresh_meta.token, 710 handle: row.handle, 711 did: row.did, 712 email: row.email, 713 email_verified, 714 preferred_channel: preferred_channel.to_string(), 715 preferred_channel_verified: true, 716 }) 717 .into_response() 718} 719 720#[derive(Deserialize)] 721#[serde(rename_all = "camelCase")] 722pub struct ResendVerificationInput { 723 pub did: String, 724} 725 726pub async fn resend_verification( 727 State(state): State<AppState>, 728 Json(input): Json<ResendVerificationInput>, 729) -> Response { 730 info!("resend_verification called for DID: {}", input.did); 731 let row = match sqlx::query!( 732 r#"SELECT 733 id, handle, email, 734 preferred_comms_channel as "channel: crate::comms::CommsChannel", 735 discord_id, telegram_username, signal_number, 736 email_verified, discord_verified, telegram_verified, signal_verified 737 FROM users 738 WHERE did = $1"#, 739 input.did 740 ) 741 .fetch_optional(&state.db) 742 .await 743 { 744 Ok(Some(row)) => row, 745 Ok(None) => { 746 return ApiError::InvalidRequest("User not found".into()).into_response(); 747 } 748 Err(e) => { 749 error!("Database error in resend_verification: {:?}", e); 750 return ApiError::InternalError.into_response(); 751 } 752 }; 753 let is_verified = 754 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 755 if is_verified { 756 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 757 } 758 759 let (channel_str, recipient) = match row.channel { 760 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 761 crate::comms::CommsChannel::Discord => { 762 ("discord", row.discord_id.clone().unwrap_or_default()) 763 } 764 crate::comms::CommsChannel::Telegram => ( 765 "telegram", 766 row.telegram_username.clone().unwrap_or_default(), 767 ), 768 crate::comms::CommsChannel::Signal => { 769 ("signal", row.signal_number.clone().unwrap_or_default()) 770 } 771 }; 772 773 let verification_token = 774 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 775 let formatted_token = 776 crate::auth::verification_token::format_token_for_display(&verification_token); 777 778 if let Err(e) = crate::comms::enqueue_signup_verification( 779 &state.db, 780 row.id, 781 channel_str, 782 &recipient, 783 &formatted_token, 784 None, 785 ) 786 .await 787 { 788 warn!("Failed to enqueue verification notification: {:?}", e); 789 } 790 Json(json!({"success": true})).into_response() 791} 792 793#[derive(Serialize)] 794#[serde(rename_all = "camelCase")] 795pub struct SessionInfo { 796 pub id: String, 797 pub session_type: String, 798 pub client_name: Option<String>, 799 pub created_at: String, 800 pub expires_at: String, 801 pub is_current: bool, 802} 803 804#[derive(Serialize)] 805#[serde(rename_all = "camelCase")] 806pub struct ListSessionsOutput { 807 pub sessions: Vec<SessionInfo>, 808} 809 810pub async fn list_sessions( 811 State(state): State<AppState>, 812 headers: HeaderMap, 813 auth: BearerAuth, 814) -> Response { 815 let current_jti = headers 816 .get("authorization") 817 .and_then(|v| v.to_str().ok()) 818 .and_then(|v| v.strip_prefix("Bearer ")) 819 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 820 821 let mut sessions: Vec<SessionInfo> = Vec::new(); 822 823 let jwt_result = sqlx::query_as::< 824 _, 825 ( 826 i32, 827 String, 828 chrono::DateTime<chrono::Utc>, 829 chrono::DateTime<chrono::Utc>, 830 ), 831 >( 832 r#" 833 SELECT id, access_jti, created_at, refresh_expires_at 834 FROM session_tokens 835 WHERE did = $1 AND refresh_expires_at > NOW() 836 ORDER BY created_at DESC 837 "#, 838 ) 839 .bind(&auth.0.did) 840 .fetch_all(&state.db) 841 .await; 842 843 match jwt_result { 844 Ok(rows) => { 845 for (id, access_jti, created_at, expires_at) in rows { 846 sessions.push(SessionInfo { 847 id: format!("jwt:{}", id), 848 session_type: "legacy".to_string(), 849 client_name: None, 850 created_at: created_at.to_rfc3339(), 851 expires_at: expires_at.to_rfc3339(), 852 is_current: current_jti.as_ref() == Some(&access_jti), 853 }); 854 } 855 } 856 Err(e) => { 857 error!("DB error fetching JWT sessions: {:?}", e); 858 return ( 859 StatusCode::INTERNAL_SERVER_ERROR, 860 Json(json!({"error": "InternalError"})), 861 ) 862 .into_response(); 863 } 864 } 865 866 let oauth_result = sqlx::query_as::< 867 _, 868 ( 869 i32, 870 String, 871 chrono::DateTime<chrono::Utc>, 872 chrono::DateTime<chrono::Utc>, 873 String, 874 ), 875 >( 876 r#" 877 SELECT id, token_id, created_at, expires_at, client_id 878 FROM oauth_token 879 WHERE did = $1 AND expires_at > NOW() 880 ORDER BY created_at DESC 881 "#, 882 ) 883 .bind(&auth.0.did) 884 .fetch_all(&state.db) 885 .await; 886 887 match oauth_result { 888 Ok(rows) => { 889 for (id, token_id, created_at, expires_at, client_id) in rows { 890 let client_name = extract_client_name(&client_id); 891 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 892 sessions.push(SessionInfo { 893 id: format!("oauth:{}", id), 894 session_type: "oauth".to_string(), 895 client_name: Some(client_name), 896 created_at: created_at.to_rfc3339(), 897 expires_at: expires_at.to_rfc3339(), 898 is_current: is_current_oauth, 899 }); 900 } 901 } 902 Err(e) => { 903 error!("DB error fetching OAuth sessions: {:?}", e); 904 return ( 905 StatusCode::INTERNAL_SERVER_ERROR, 906 Json(json!({"error": "InternalError"})), 907 ) 908 .into_response(); 909 } 910 } 911 912 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 913 914 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 915} 916 917fn extract_client_name(client_id: &str) -> String { 918 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 919 "Localhost App".to_string() 920 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 921 parsed.host_str().unwrap_or("Unknown App").to_string() 922 } else { 923 client_id.to_string() 924 } 925} 926 927#[derive(Deserialize)] 928#[serde(rename_all = "camelCase")] 929pub struct RevokeSessionInput { 930 pub session_id: String, 931} 932 933pub async fn revoke_session( 934 State(state): State<AppState>, 935 auth: BearerAuth, 936 Json(input): Json<RevokeSessionInput>, 937) -> Response { 938 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 939 let session_id: i32 = match jwt_id.parse() { 940 Ok(id) => id, 941 Err(_) => { 942 return ( 943 StatusCode::BAD_REQUEST, 944 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 945 ) 946 .into_response(); 947 } 948 }; 949 let session = sqlx::query_as::<_, (String,)>( 950 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 951 ) 952 .bind(session_id) 953 .bind(&auth.0.did) 954 .fetch_optional(&state.db) 955 .await; 956 let access_jti = match session { 957 Ok(Some((jti,))) => jti, 958 Ok(None) => { 959 return ( 960 StatusCode::NOT_FOUND, 961 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 962 ) 963 .into_response(); 964 } 965 Err(e) => { 966 error!("DB error in revoke_session: {:?}", e); 967 return ( 968 StatusCode::INTERNAL_SERVER_ERROR, 969 Json(json!({"error": "InternalError"})), 970 ) 971 .into_response(); 972 } 973 }; 974 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 975 .bind(session_id) 976 .execute(&state.db) 977 .await 978 { 979 error!("DB error deleting session: {:?}", e); 980 return ( 981 StatusCode::INTERNAL_SERVER_ERROR, 982 Json(json!({"error": "InternalError"})), 983 ) 984 .into_response(); 985 } 986 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 987 if let Err(e) = state.cache.delete(&cache_key).await { 988 warn!("Failed to invalidate session cache: {:?}", e); 989 } 990 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 991 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 992 let session_id: i32 = match oauth_id.parse() { 993 Ok(id) => id, 994 Err(_) => { 995 return ( 996 StatusCode::BAD_REQUEST, 997 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 998 ) 999 .into_response(); 1000 } 1001 }; 1002 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1003 .bind(session_id) 1004 .bind(&auth.0.did) 1005 .execute(&state.db) 1006 .await; 1007 match result { 1008 Ok(r) if r.rows_affected() == 0 => { 1009 return ( 1010 StatusCode::NOT_FOUND, 1011 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1012 ) 1013 .into_response(); 1014 } 1015 Err(e) => { 1016 error!("DB error deleting OAuth session: {:?}", e); 1017 return ( 1018 StatusCode::INTERNAL_SERVER_ERROR, 1019 Json(json!({"error": "InternalError"})), 1020 ) 1021 .into_response(); 1022 } 1023 _ => {} 1024 } 1025 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1026 } else { 1027 return ( 1028 StatusCode::BAD_REQUEST, 1029 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1030 ) 1031 .into_response(); 1032 } 1033 (StatusCode::OK, Json(json!({}))).into_response() 1034} 1035 1036pub async fn revoke_all_sessions( 1037 State(state): State<AppState>, 1038 headers: HeaderMap, 1039 auth: BearerAuth, 1040) -> Response { 1041 let current_jti = headers 1042 .get("authorization") 1043 .and_then(|v| v.to_str().ok()) 1044 .and_then(|v| v.strip_prefix("Bearer ")) 1045 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1046 1047 if let Some(ref jti) = current_jti { 1048 if auth.0.is_oauth { 1049 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1050 .bind(&auth.0.did) 1051 .execute(&state.db) 1052 .await 1053 { 1054 error!("DB error revoking JWT sessions: {:?}", e); 1055 return ( 1056 StatusCode::INTERNAL_SERVER_ERROR, 1057 Json(json!({"error": "InternalError"})), 1058 ) 1059 .into_response(); 1060 } 1061 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1062 .bind(&auth.0.did) 1063 .bind(jti) 1064 .execute(&state.db) 1065 .await 1066 { 1067 error!("DB error revoking OAuth sessions: {:?}", e); 1068 return ( 1069 StatusCode::INTERNAL_SERVER_ERROR, 1070 Json(json!({"error": "InternalError"})), 1071 ) 1072 .into_response(); 1073 } 1074 } else { 1075 if let Err(e) = 1076 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1077 .bind(&auth.0.did) 1078 .bind(jti) 1079 .execute(&state.db) 1080 .await 1081 { 1082 error!("DB error revoking JWT sessions: {:?}", e); 1083 return ( 1084 StatusCode::INTERNAL_SERVER_ERROR, 1085 Json(json!({"error": "InternalError"})), 1086 ) 1087 .into_response(); 1088 } 1089 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1090 .bind(&auth.0.did) 1091 .execute(&state.db) 1092 .await 1093 { 1094 error!("DB error revoking OAuth sessions: {:?}", e); 1095 return ( 1096 StatusCode::INTERNAL_SERVER_ERROR, 1097 Json(json!({"error": "InternalError"})), 1098 ) 1099 .into_response(); 1100 } 1101 } 1102 } else { 1103 return ( 1104 StatusCode::BAD_REQUEST, 1105 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1106 ) 1107 .into_response(); 1108 } 1109 1110 info!(did = %auth.0.did, "All other sessions revoked"); 1111 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1112} 1113 1114#[derive(Serialize)] 1115#[serde(rename_all = "camelCase")] 1116pub struct LegacyLoginPreferenceOutput { 1117 pub allow_legacy_login: bool, 1118 pub has_mfa: bool, 1119} 1120 1121pub async fn get_legacy_login_preference( 1122 State(state): State<AppState>, 1123 auth: BearerAuth, 1124) -> Response { 1125 let result = sqlx::query!( 1126 r#"SELECT 1127 u.allow_legacy_login, 1128 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1129 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1130 FROM users u WHERE u.did = $1"#, 1131 auth.0.did 1132 ) 1133 .fetch_optional(&state.db) 1134 .await; 1135 1136 match result { 1137 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1138 allow_legacy_login: row.allow_legacy_login, 1139 has_mfa: row.has_mfa, 1140 }) 1141 .into_response(), 1142 Ok(None) => ( 1143 StatusCode::NOT_FOUND, 1144 Json(json!({"error": "AccountNotFound"})), 1145 ) 1146 .into_response(), 1147 Err(e) => { 1148 error!("DB error: {:?}", e); 1149 ( 1150 StatusCode::INTERNAL_SERVER_ERROR, 1151 Json(json!({"error": "InternalError"})), 1152 ) 1153 .into_response() 1154 } 1155 } 1156} 1157 1158#[derive(Deserialize)] 1159#[serde(rename_all = "camelCase")] 1160pub struct UpdateLegacyLoginInput { 1161 pub allow_legacy_login: bool, 1162} 1163 1164pub async fn update_legacy_login_preference( 1165 State(state): State<AppState>, 1166 auth: BearerAuth, 1167 Json(input): Json<UpdateLegacyLoginInput>, 1168) -> Response { 1169 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1170 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1171 .await; 1172 } 1173 1174 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1175 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1176 } 1177 1178 let result = sqlx::query!( 1179 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1180 input.allow_legacy_login, 1181 auth.0.did 1182 ) 1183 .fetch_optional(&state.db) 1184 .await; 1185 1186 match result { 1187 Ok(Some(_)) => { 1188 info!( 1189 did = %auth.0.did, 1190 allow_legacy_login = input.allow_legacy_login, 1191 "Legacy login preference updated" 1192 ); 1193 Json(json!({ 1194 "allowLegacyLogin": input.allow_legacy_login 1195 })) 1196 .into_response() 1197 } 1198 Ok(None) => ( 1199 StatusCode::NOT_FOUND, 1200 Json(json!({"error": "AccountNotFound"})), 1201 ) 1202 .into_response(), 1203 Err(e) => { 1204 error!("DB error: {:?}", e); 1205 ( 1206 StatusCode::INTERNAL_SERVER_ERROR, 1207 Json(json!({"error": "InternalError"})), 1208 ) 1209 .into_response() 1210 } 1211 } 1212} 1213 1214use crate::comms::locale::VALID_LOCALES; 1215 1216#[derive(Deserialize)] 1217#[serde(rename_all = "camelCase")] 1218pub struct UpdateLocaleInput { 1219 pub preferred_locale: String, 1220} 1221 1222pub async fn update_locale( 1223 State(state): State<AppState>, 1224 auth: BearerAuth, 1225 Json(input): Json<UpdateLocaleInput>, 1226) -> Response { 1227 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1228 return ( 1229 StatusCode::BAD_REQUEST, 1230 Json(json!({ 1231 "error": "InvalidRequest", 1232 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1233 })), 1234 ) 1235 .into_response(); 1236 } 1237 1238 let result = sqlx::query!( 1239 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1240 input.preferred_locale, 1241 auth.0.did 1242 ) 1243 .fetch_optional(&state.db) 1244 .await; 1245 1246 match result { 1247 Ok(Some(_)) => { 1248 info!( 1249 did = %auth.0.did, 1250 locale = %input.preferred_locale, 1251 "User locale preference updated" 1252 ); 1253 Json(json!({ 1254 "preferredLocale": input.preferred_locale 1255 })) 1256 .into_response() 1257 } 1258 Ok(None) => ( 1259 StatusCode::NOT_FOUND, 1260 Json(json!({"error": "AccountNotFound"})), 1261 ) 1262 .into_response(), 1263 Err(e) => { 1264 error!("DB error updating locale: {:?}", e); 1265 ( 1266 StatusCode::INTERNAL_SERVER_ERROR, 1267 Json(json!({"error": "InternalError"})), 1268 ) 1269 .into_response() 1270 } 1271 } 1272}