this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") 17 && let Ok(value) = forwarded.to_str() 18 && let Some(first_ip) = value.split(',').next() 19 { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() 24 { 25 return value.trim().to_string(); 26 } 27 "unknown".to_string() 28} 29 30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 31 let identifier = identifier.trim(); 32 if identifier.contains('@') || identifier.starts_with("did:") { 33 identifier.to_string() 34 } else if !identifier.contains('.') { 35 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 36 } else { 37 identifier.to_lowercase() 38 } 39} 40 41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 42 stored_handle.to_string() 43} 44 45#[derive(Deserialize)] 46#[serde(rename_all = "camelCase")] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50 #[serde(default)] 51 pub allow_takendown: bool, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateSessionOutput { 57 pub access_jwt: String, 58 pub refresh_jwt: String, 59 pub handle: String, 60 pub did: String, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub did_doc: Option<serde_json::Value>, 63 #[serde(skip_serializing_if = "Option::is_none")] 64 pub email: Option<String>, 65 #[serde(skip_serializing_if = "Option::is_none")] 66 pub email_confirmed: Option<bool>, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub active: Option<bool>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub status: Option<String>, 71} 72 73pub async fn create_session( 74 State(state): State<AppState>, 75 headers: HeaderMap, 76 Json(input): Json<CreateSessionInput>, 77) -> Response { 78 info!( 79 "create_session called with identifier: {}", 80 input.identifier 81 ); 82 let client_ip = extract_client_ip(&headers); 83 if !state 84 .check_rate_limit(RateLimitKind::Login, &client_ip) 85 .await 86 { 87 warn!(ip = %client_ip, "Login rate limit exceeded"); 88 return ( 89 StatusCode::TOO_MANY_REQUESTS, 90 Json(json!({ 91 "error": "RateLimitExceeded", 92 "message": "Too many login attempts. Please try again later." 93 })), 94 ) 95 .into_response(); 96 } 97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 99 info!( 100 "Normalized identifier: {} -> {}", 101 input.identifier, normalized_identifier 102 ); 103 let row = match sqlx::query!( 104 r#"SELECT 105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref, 106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 107 u.allow_legacy_login, 108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 109 k.key_bytes, k.encryption_version, 110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 111 FROM users u 112 JOIN user_keys k ON u.id = k.user_id 113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 114 normalized_identifier 115 ) 116 .fetch_optional(&state.db) 117 .await 118 { 119 Ok(Some(row)) => row, 120 Ok(None) => { 121 let _ = verify( 122 &input.password, 123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 124 ); 125 warn!("User not found for login attempt"); 126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 127 .into_response(); 128 } 129 Err(e) => { 130 error!("Database error fetching user: {:?}", e); 131 return ApiError::InternalError.into_response(); 132 } 133 }; 134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 135 Ok(k) => k, 136 Err(e) => { 137 error!("Failed to decrypt user key: {:?}", e); 138 return ApiError::InternalError.into_response(); 139 } 140 }; 141 let (password_valid, app_password_scopes, app_password_controller) = if row 142 .password_hash 143 .as_ref() 144 .map(|h| verify(&input.password, h).unwrap_or(false)) 145 .unwrap_or(false) 146 { 147 (true, None, None) 148 } else { 149 let app_passwords = sqlx::query!( 150 "SELECT password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 151 row.id 152 ) 153 .fetch_all(&state.db) 154 .await 155 .unwrap_or_default(); 156 let matched = app_passwords 157 .iter() 158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 159 match matched { 160 Some(app) => ( 161 true, 162 app.scopes.clone(), 163 app.created_by_controller_did.clone(), 164 ), 165 None => (false, None, None), 166 } 167 }; 168 if !password_valid { 169 warn!("Password verification failed for login attempt"); 170 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 171 .into_response(); 172 } 173 let is_takendown = row.takedown_ref.is_some(); 174 if is_takendown && !input.allow_takendown { 175 warn!("Login attempt for takendown account: {}", row.did); 176 return ( 177 StatusCode::UNAUTHORIZED, 178 Json(json!({ 179 "error": "AccountTakedown", 180 "message": "Account has been taken down" 181 })), 182 ) 183 .into_response(); 184 } 185 let is_verified = 186 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 187 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 188 .await 189 .unwrap_or(false); 190 if !is_verified && !is_delegated { 191 warn!("Login attempt for unverified account: {}", row.did); 192 return ( 193 StatusCode::FORBIDDEN, 194 Json(json!({ 195 "error": "AccountNotVerified", 196 "message": "Please verify your account before logging in", 197 "did": row.did 198 })), 199 ) 200 .into_response(); 201 } 202 let has_totp = row.totp_enabled.unwrap_or(false); 203 let is_legacy_login = has_totp; 204 if has_totp && !row.allow_legacy_login { 205 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 206 return ( 207 StatusCode::FORBIDDEN, 208 Json(json!({ 209 "error": "MfaRequired", 210 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 211 "did": row.did 212 })), 213 ) 214 .into_response(); 215 } 216 let access_meta = match crate::auth::create_access_token_with_delegation( 217 &row.did, 218 &key_bytes, 219 app_password_scopes.as_deref(), 220 app_password_controller.as_deref(), 221 ) { 222 Ok(m) => m, 223 Err(e) => { 224 error!("Failed to create access token: {:?}", e); 225 return ApiError::InternalError.into_response(); 226 } 227 }; 228 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 229 Ok(m) => m, 230 Err(e) => { 231 error!("Failed to create refresh token: {:?}", e); 232 return ApiError::InternalError.into_response(); 233 } 234 }; 235 let did_for_doc = row.did.clone(); 236 let did_resolver = state.did_resolver.clone(); 237 let (insert_result, did_doc) = tokio::join!( 238 sqlx::query!( 239 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 240 row.did, 241 access_meta.jti, 242 refresh_meta.jti, 243 access_meta.expires_at, 244 refresh_meta.expires_at, 245 is_legacy_login, 246 false, 247 app_password_scopes, 248 app_password_controller 249 ) 250 .execute(&state.db), 251 did_resolver.resolve_did_document(&did_for_doc) 252 ); 253 if let Err(e) = insert_result { 254 error!("Failed to insert session: {:?}", e); 255 return ApiError::InternalError.into_response(); 256 } 257 if is_legacy_login { 258 warn!( 259 did = %row.did, 260 ip = %client_ip, 261 "Legacy login on TOTP-enabled account - sending notification" 262 ); 263 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 264 if let Err(e) = crate::comms::queue_legacy_login_notification( 265 &state.db, 266 row.id, 267 &hostname, 268 &client_ip, 269 row.preferred_comms_channel, 270 ) 271 .await 272 { 273 error!("Failed to queue legacy login notification: {:?}", e); 274 } 275 } 276 let handle = full_handle(&row.handle, &pds_hostname); 277 let is_active = row.deactivated_at.is_none() && !is_takendown; 278 let status = if is_takendown { 279 Some("takendown".to_string()) 280 } else if row.deactivated_at.is_some() { 281 Some("deactivated".to_string()) 282 } else { 283 None 284 }; 285 Json(CreateSessionOutput { 286 access_jwt: access_meta.token, 287 refresh_jwt: refresh_meta.token, 288 handle, 289 did: row.did, 290 did_doc, 291 email: row.email, 292 email_confirmed: Some(row.email_verified), 293 active: Some(is_active), 294 status, 295 }) 296 .into_response() 297} 298 299pub async fn get_session( 300 State(state): State<AppState>, 301 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 302) -> Response { 303 let permissions = auth_user.permissions(); 304 let can_read_email = permissions.allows_email_read(); 305 306 let did_for_doc = auth_user.did.clone(); 307 let did_resolver = state.did_resolver.clone(); 308 let (db_result, did_doc) = tokio::join!( 309 sqlx::query!( 310 r#"SELECT 311 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale, 312 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 313 discord_verified, telegram_verified, signal_verified 314 FROM users WHERE did = $1"#, 315 auth_user.did 316 ) 317 .fetch_optional(&state.db), 318 did_resolver.resolve_did_document(&did_for_doc) 319 ); 320 match db_result { 321 Ok(Some(row)) => { 322 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 323 crate::comms::CommsChannel::Email => ("email", row.email_verified), 324 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 325 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 326 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 327 }; 328 let pds_hostname = 329 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 330 let handle = full_handle(&row.handle, &pds_hostname); 331 let is_takendown = row.takedown_ref.is_some(); 332 let is_active = row.deactivated_at.is_none() && !is_takendown; 333 let email_value = if can_read_email { 334 row.email.clone() 335 } else { 336 None 337 }; 338 let email_confirmed_value = can_read_email && row.email_verified; 339 let mut response = json!({ 340 "handle": handle, 341 "did": auth_user.did, 342 "active": is_active, 343 "preferredChannel": preferred_channel, 344 "preferredChannelVerified": preferred_channel_verified, 345 "preferredLocale": row.preferred_locale, 346 "isAdmin": row.is_admin 347 }); 348 if can_read_email { 349 response["email"] = json!(email_value); 350 response["emailConfirmed"] = json!(email_confirmed_value); 351 } 352 if is_takendown { 353 response["status"] = json!("takendown"); 354 } else if row.deactivated_at.is_some() { 355 response["status"] = json!("deactivated"); 356 } 357 if let Some(doc) = did_doc { 358 response["didDoc"] = doc; 359 } 360 Json(response) 361 .into_response() 362 } 363 Ok(None) => ApiError::AuthenticationFailed.into_response(), 364 Err(e) => { 365 error!("Database error in get_session: {:?}", e); 366 ApiError::InternalError.into_response() 367 } 368 } 369} 370 371pub async fn delete_session( 372 State(state): State<AppState>, 373 headers: axum::http::HeaderMap, 374) -> Response { 375 let token = match crate::auth::extract_bearer_token_from_header( 376 headers.get("Authorization").and_then(|h| h.to_str().ok()), 377 ) { 378 Some(t) => t, 379 None => return ApiError::AuthenticationRequired.into_response(), 380 }; 381 let jti = match crate::auth::get_jti_from_token(&token) { 382 Ok(jti) => jti, 383 Err(_) => return ApiError::AuthenticationFailed.into_response(), 384 }; 385 let did = crate::auth::get_did_from_token(&token).ok(); 386 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 387 .execute(&state.db) 388 .await 389 { 390 Ok(res) if res.rows_affected() > 0 => { 391 if let Some(did) = did { 392 let session_cache_key = format!("auth:session:{}:{}", did, jti); 393 let _ = state.cache.delete(&session_cache_key).await; 394 } 395 Json(json!({})).into_response() 396 } 397 Ok(_) => ApiError::AuthenticationFailed.into_response(), 398 Err(e) => { 399 error!("Database error in delete_session: {:?}", e); 400 ApiError::AuthenticationFailed.into_response() 401 } 402 } 403} 404 405pub async fn refresh_session( 406 State(state): State<AppState>, 407 headers: axum::http::HeaderMap, 408) -> Response { 409 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 410 if !state 411 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 412 .await 413 { 414 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 415 return ( 416 axum::http::StatusCode::TOO_MANY_REQUESTS, 417 axum::Json(serde_json::json!({ 418 "error": "RateLimitExceeded", 419 "message": "Too many requests. Please try again later." 420 })), 421 ) 422 .into_response(); 423 } 424 let refresh_token = match crate::auth::extract_bearer_token_from_header( 425 headers.get("Authorization").and_then(|h| h.to_str().ok()), 426 ) { 427 Some(t) => t, 428 None => return ApiError::AuthenticationRequired.into_response(), 429 }; 430 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 431 Ok(jti) => jti, 432 Err(_) => { 433 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 434 .into_response(); 435 } 436 }; 437 let mut tx = match state.db.begin().await { 438 Ok(tx) => tx, 439 Err(e) => { 440 error!("Failed to begin transaction: {:?}", e); 441 return ApiError::InternalError.into_response(); 442 } 443 }; 444 if let Ok(Some(session_id)) = sqlx::query_scalar!( 445 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 446 refresh_jti 447 ) 448 .fetch_optional(&mut *tx) 449 .await 450 { 451 warn!( 452 "Refresh token reuse detected! Revoking token family for session_id: {}", 453 session_id 454 ); 455 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 456 .execute(&mut *tx) 457 .await; 458 let _ = tx.commit().await; 459 return ApiError::ExpiredTokenMsg( 460 "Refresh token has been revoked due to suspected compromise".into(), 461 ) 462 .into_response(); 463 } 464 let session_row = match sqlx::query!( 465 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 466 FROM session_tokens st 467 JOIN users u ON st.did = u.did 468 JOIN user_keys k ON u.id = k.user_id 469 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 470 FOR UPDATE OF st"#, 471 refresh_jti 472 ) 473 .fetch_optional(&mut *tx) 474 .await 475 { 476 Ok(Some(row)) => row, 477 Ok(None) => { 478 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 479 .into_response(); 480 } 481 Err(e) => { 482 error!("Database error fetching session: {:?}", e); 483 return ApiError::InternalError.into_response(); 484 } 485 }; 486 let key_bytes = 487 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 488 Ok(k) => k, 489 Err(e) => { 490 error!("Failed to decrypt user key: {:?}", e); 491 return ApiError::InternalError.into_response(); 492 } 493 }; 494 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 495 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 496 } 497 let new_access_meta = match crate::auth::create_access_token_with_delegation( 498 &session_row.did, 499 &key_bytes, 500 session_row.scope.as_deref(), 501 session_row.controller_did.as_deref(), 502 ) { 503 Ok(m) => m, 504 Err(e) => { 505 error!("Failed to create access token: {:?}", e); 506 return ApiError::InternalError.into_response(); 507 } 508 }; 509 let new_refresh_meta = 510 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 511 Ok(m) => m, 512 Err(e) => { 513 error!("Failed to create refresh token: {:?}", e); 514 return ApiError::InternalError.into_response(); 515 } 516 }; 517 match sqlx::query!( 518 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 519 refresh_jti, 520 session_row.id 521 ) 522 .execute(&mut *tx) 523 .await 524 { 525 Ok(result) if result.rows_affected() == 0 => { 526 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 527 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 528 .execute(&mut *tx) 529 .await; 530 let _ = tx.commit().await; 531 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 532 } 533 Err(e) => { 534 error!("Failed to record used refresh token: {:?}", e); 535 return ApiError::InternalError.into_response(); 536 } 537 Ok(_) => {} 538 } 539 if let Err(e) = sqlx::query!( 540 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 541 new_access_meta.jti, 542 new_refresh_meta.jti, 543 new_access_meta.expires_at, 544 new_refresh_meta.expires_at, 545 session_row.id 546 ) 547 .execute(&mut *tx) 548 .await 549 { 550 error!("Database error updating session: {:?}", e); 551 return ApiError::InternalError.into_response(); 552 } 553 if let Err(e) = tx.commit().await { 554 error!("Failed to commit transaction: {:?}", e); 555 return ApiError::InternalError.into_response(); 556 } 557 let did_for_doc = session_row.did.clone(); 558 let did_resolver = state.did_resolver.clone(); 559 let (db_result, did_doc) = tokio::join!( 560 sqlx::query!( 561 r#"SELECT 562 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref, 563 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 564 discord_verified, telegram_verified, signal_verified 565 FROM users WHERE did = $1"#, 566 session_row.did 567 ) 568 .fetch_optional(&state.db), 569 did_resolver.resolve_did_document(&did_for_doc) 570 ); 571 match db_result { 572 Ok(Some(u)) => { 573 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 574 crate::comms::CommsChannel::Email => ("email", u.email_verified), 575 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 576 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 577 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 578 }; 579 let pds_hostname = 580 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 581 let handle = full_handle(&u.handle, &pds_hostname); 582 let is_takendown = u.takedown_ref.is_some(); 583 let is_active = u.deactivated_at.is_none() && !is_takendown; 584 let mut response = json!({ 585 "accessJwt": new_access_meta.token, 586 "refreshJwt": new_refresh_meta.token, 587 "handle": handle, 588 "did": session_row.did, 589 "email": u.email, 590 "emailConfirmed": u.email_verified, 591 "preferredChannel": preferred_channel, 592 "preferredChannelVerified": preferred_channel_verified, 593 "preferredLocale": u.preferred_locale, 594 "isAdmin": u.is_admin, 595 "active": is_active 596 }); 597 if let Some(doc) = did_doc { 598 response["didDoc"] = doc; 599 } 600 if is_takendown { 601 response["status"] = json!("takendown"); 602 } else if u.deactivated_at.is_some() { 603 response["status"] = json!("deactivated"); 604 } 605 Json(response) 606 .into_response() 607 } 608 Ok(None) => { 609 error!("User not found for existing session: {}", session_row.did); 610 ApiError::InternalError.into_response() 611 } 612 Err(e) => { 613 error!("Database error fetching user: {:?}", e); 614 ApiError::InternalError.into_response() 615 } 616 } 617} 618 619#[derive(Deserialize)] 620#[serde(rename_all = "camelCase")] 621pub struct ConfirmSignupInput { 622 pub did: String, 623 pub verification_code: String, 624} 625 626#[derive(Serialize)] 627#[serde(rename_all = "camelCase")] 628pub struct ConfirmSignupOutput { 629 pub access_jwt: String, 630 pub refresh_jwt: String, 631 pub handle: String, 632 pub did: String, 633 pub email: Option<String>, 634 pub email_verified: bool, 635 pub preferred_channel: String, 636 pub preferred_channel_verified: bool, 637} 638 639pub async fn confirm_signup( 640 State(state): State<AppState>, 641 Json(input): Json<ConfirmSignupInput>, 642) -> Response { 643 info!("confirm_signup called for DID: {}", input.did); 644 let row = match sqlx::query!( 645 r#"SELECT 646 u.id, u.did, u.handle, u.email, 647 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 648 u.discord_id, u.telegram_username, u.signal_number, 649 k.key_bytes, k.encryption_version 650 FROM users u 651 JOIN user_keys k ON u.id = k.user_id 652 WHERE u.did = $1"#, 653 input.did 654 ) 655 .fetch_optional(&state.db) 656 .await 657 { 658 Ok(Some(row)) => row, 659 Ok(None) => { 660 warn!("User not found for confirm_signup: {}", input.did); 661 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 662 .into_response(); 663 } 664 Err(e) => { 665 error!("Database error in confirm_signup: {:?}", e); 666 return ApiError::InternalError.into_response(); 667 } 668 }; 669 670 let (channel_str, identifier) = match row.channel { 671 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 672 crate::comms::CommsChannel::Discord => { 673 ("discord", row.discord_id.clone().unwrap_or_default()) 674 } 675 crate::comms::CommsChannel::Telegram => ( 676 "telegram", 677 row.telegram_username.clone().unwrap_or_default(), 678 ), 679 crate::comms::CommsChannel::Signal => { 680 ("signal", row.signal_number.clone().unwrap_or_default()) 681 } 682 }; 683 684 let normalized_token = 685 crate::auth::verification_token::normalize_token_input(&input.verification_code); 686 match crate::auth::verification_token::verify_signup_token( 687 &normalized_token, 688 channel_str, 689 &identifier, 690 ) { 691 Ok(token_data) => { 692 if token_data.did != input.did { 693 warn!( 694 "Token DID mismatch for confirm_signup: expected {}, got {}", 695 input.did, token_data.did 696 ); 697 return ApiError::InvalidRequest("Invalid verification code".into()) 698 .into_response(); 699 } 700 } 701 Err(crate::auth::verification_token::VerifyError::Expired) => { 702 warn!("Verification code expired for user: {}", input.did); 703 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 704 .into_response(); 705 } 706 Err(e) => { 707 warn!("Invalid verification code for user {}: {:?}", input.did, e); 708 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 709 } 710 } 711 712 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 713 Ok(k) => k, 714 Err(e) => { 715 error!("Failed to decrypt user key: {:?}", e); 716 return ApiError::InternalError.into_response(); 717 } 718 }; 719 let verified_column = match row.channel { 720 crate::comms::CommsChannel::Email => "email_verified", 721 crate::comms::CommsChannel::Discord => "discord_verified", 722 crate::comms::CommsChannel::Telegram => "telegram_verified", 723 crate::comms::CommsChannel::Signal => "signal_verified", 724 }; 725 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 726 if let Err(e) = sqlx::query(&update_query) 727 .bind(&input.did) 728 .execute(&state.db) 729 .await 730 { 731 error!("Failed to update verification status: {:?}", e); 732 return ApiError::InternalError.into_response(); 733 } 734 735 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 736 Ok(m) => m, 737 Err(e) => { 738 error!("Failed to create access token: {:?}", e); 739 return ApiError::InternalError.into_response(); 740 } 741 }; 742 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 743 Ok(m) => m, 744 Err(e) => { 745 error!("Failed to create refresh token: {:?}", e); 746 return ApiError::InternalError.into_response(); 747 } 748 }; 749 let no_scope: Option<String> = None; 750 if let Err(e) = sqlx::query!( 751 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 752 row.did, 753 access_meta.jti, 754 refresh_meta.jti, 755 access_meta.expires_at, 756 refresh_meta.expires_at, 757 false, 758 false, 759 no_scope 760 ) 761 .execute(&state.db) 762 .await 763 { 764 error!("Failed to insert session: {:?}", e); 765 return ApiError::InternalError.into_response(); 766 } 767 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 768 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 769 warn!("Failed to enqueue welcome notification: {:?}", e); 770 } 771 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 772 let preferred_channel = match row.channel { 773 crate::comms::CommsChannel::Email => "email", 774 crate::comms::CommsChannel::Discord => "discord", 775 crate::comms::CommsChannel::Telegram => "telegram", 776 crate::comms::CommsChannel::Signal => "signal", 777 }; 778 Json(ConfirmSignupOutput { 779 access_jwt: access_meta.token, 780 refresh_jwt: refresh_meta.token, 781 handle: row.handle, 782 did: row.did, 783 email: row.email, 784 email_verified, 785 preferred_channel: preferred_channel.to_string(), 786 preferred_channel_verified: true, 787 }) 788 .into_response() 789} 790 791#[derive(Deserialize)] 792#[serde(rename_all = "camelCase")] 793pub struct ResendVerificationInput { 794 pub did: String, 795} 796 797pub async fn resend_verification( 798 State(state): State<AppState>, 799 Json(input): Json<ResendVerificationInput>, 800) -> Response { 801 info!("resend_verification called for DID: {}", input.did); 802 let row = match sqlx::query!( 803 r#"SELECT 804 id, handle, email, 805 preferred_comms_channel as "channel: crate::comms::CommsChannel", 806 discord_id, telegram_username, signal_number, 807 email_verified, discord_verified, telegram_verified, signal_verified 808 FROM users 809 WHERE did = $1"#, 810 input.did 811 ) 812 .fetch_optional(&state.db) 813 .await 814 { 815 Ok(Some(row)) => row, 816 Ok(None) => { 817 return ApiError::InvalidRequest("User not found".into()).into_response(); 818 } 819 Err(e) => { 820 error!("Database error in resend_verification: {:?}", e); 821 return ApiError::InternalError.into_response(); 822 } 823 }; 824 let is_verified = 825 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 826 if is_verified { 827 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 828 } 829 830 let (channel_str, recipient) = match row.channel { 831 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 832 crate::comms::CommsChannel::Discord => { 833 ("discord", row.discord_id.clone().unwrap_or_default()) 834 } 835 crate::comms::CommsChannel::Telegram => ( 836 "telegram", 837 row.telegram_username.clone().unwrap_or_default(), 838 ), 839 crate::comms::CommsChannel::Signal => { 840 ("signal", row.signal_number.clone().unwrap_or_default()) 841 } 842 }; 843 844 let verification_token = 845 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 846 let formatted_token = 847 crate::auth::verification_token::format_token_for_display(&verification_token); 848 849 if let Err(e) = crate::comms::enqueue_signup_verification( 850 &state.db, 851 row.id, 852 channel_str, 853 &recipient, 854 &formatted_token, 855 None, 856 ) 857 .await 858 { 859 warn!("Failed to enqueue verification notification: {:?}", e); 860 } 861 Json(json!({"success": true})).into_response() 862} 863 864#[derive(Serialize)] 865#[serde(rename_all = "camelCase")] 866pub struct SessionInfo { 867 pub id: String, 868 pub session_type: String, 869 pub client_name: Option<String>, 870 pub created_at: String, 871 pub expires_at: String, 872 pub is_current: bool, 873} 874 875#[derive(Serialize)] 876#[serde(rename_all = "camelCase")] 877pub struct ListSessionsOutput { 878 pub sessions: Vec<SessionInfo>, 879} 880 881pub async fn list_sessions( 882 State(state): State<AppState>, 883 headers: HeaderMap, 884 auth: BearerAuth, 885) -> Response { 886 let current_jti = headers 887 .get("authorization") 888 .and_then(|v| v.to_str().ok()) 889 .and_then(|v| v.strip_prefix("Bearer ")) 890 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 891 892 let mut sessions: Vec<SessionInfo> = Vec::new(); 893 894 let jwt_result = sqlx::query_as::< 895 _, 896 ( 897 i32, 898 String, 899 chrono::DateTime<chrono::Utc>, 900 chrono::DateTime<chrono::Utc>, 901 ), 902 >( 903 r#" 904 SELECT id, access_jti, created_at, refresh_expires_at 905 FROM session_tokens 906 WHERE did = $1 AND refresh_expires_at > NOW() 907 ORDER BY created_at DESC 908 "#, 909 ) 910 .bind(&auth.0.did) 911 .fetch_all(&state.db) 912 .await; 913 914 match jwt_result { 915 Ok(rows) => { 916 for (id, access_jti, created_at, expires_at) in rows { 917 sessions.push(SessionInfo { 918 id: format!("jwt:{}", id), 919 session_type: "legacy".to_string(), 920 client_name: None, 921 created_at: created_at.to_rfc3339(), 922 expires_at: expires_at.to_rfc3339(), 923 is_current: current_jti.as_ref() == Some(&access_jti), 924 }); 925 } 926 } 927 Err(e) => { 928 error!("DB error fetching JWT sessions: {:?}", e); 929 return ( 930 StatusCode::INTERNAL_SERVER_ERROR, 931 Json(json!({"error": "InternalError"})), 932 ) 933 .into_response(); 934 } 935 } 936 937 let oauth_result = sqlx::query_as::< 938 _, 939 ( 940 i32, 941 String, 942 chrono::DateTime<chrono::Utc>, 943 chrono::DateTime<chrono::Utc>, 944 String, 945 ), 946 >( 947 r#" 948 SELECT id, token_id, created_at, expires_at, client_id 949 FROM oauth_token 950 WHERE did = $1 AND expires_at > NOW() 951 ORDER BY created_at DESC 952 "#, 953 ) 954 .bind(&auth.0.did) 955 .fetch_all(&state.db) 956 .await; 957 958 match oauth_result { 959 Ok(rows) => { 960 for (id, token_id, created_at, expires_at, client_id) in rows { 961 let client_name = extract_client_name(&client_id); 962 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 963 sessions.push(SessionInfo { 964 id: format!("oauth:{}", id), 965 session_type: "oauth".to_string(), 966 client_name: Some(client_name), 967 created_at: created_at.to_rfc3339(), 968 expires_at: expires_at.to_rfc3339(), 969 is_current: is_current_oauth, 970 }); 971 } 972 } 973 Err(e) => { 974 error!("DB error fetching OAuth sessions: {:?}", e); 975 return ( 976 StatusCode::INTERNAL_SERVER_ERROR, 977 Json(json!({"error": "InternalError"})), 978 ) 979 .into_response(); 980 } 981 } 982 983 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 984 985 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 986} 987 988fn extract_client_name(client_id: &str) -> String { 989 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 990 "Localhost App".to_string() 991 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 992 parsed.host_str().unwrap_or("Unknown App").to_string() 993 } else { 994 client_id.to_string() 995 } 996} 997 998#[derive(Deserialize)] 999#[serde(rename_all = "camelCase")] 1000pub struct RevokeSessionInput { 1001 pub session_id: String, 1002} 1003 1004pub async fn revoke_session( 1005 State(state): State<AppState>, 1006 auth: BearerAuth, 1007 Json(input): Json<RevokeSessionInput>, 1008) -> Response { 1009 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 1010 let session_id: i32 = match jwt_id.parse() { 1011 Ok(id) => id, 1012 Err(_) => { 1013 return ( 1014 StatusCode::BAD_REQUEST, 1015 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1016 ) 1017 .into_response(); 1018 } 1019 }; 1020 let session = sqlx::query_as::<_, (String,)>( 1021 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 1022 ) 1023 .bind(session_id) 1024 .bind(&auth.0.did) 1025 .fetch_optional(&state.db) 1026 .await; 1027 let access_jti = match session { 1028 Ok(Some((jti,))) => jti, 1029 Ok(None) => { 1030 return ( 1031 StatusCode::NOT_FOUND, 1032 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1033 ) 1034 .into_response(); 1035 } 1036 Err(e) => { 1037 error!("DB error in revoke_session: {:?}", e); 1038 return ( 1039 StatusCode::INTERNAL_SERVER_ERROR, 1040 Json(json!({"error": "InternalError"})), 1041 ) 1042 .into_response(); 1043 } 1044 }; 1045 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 1046 .bind(session_id) 1047 .execute(&state.db) 1048 .await 1049 { 1050 error!("DB error deleting session: {:?}", e); 1051 return ( 1052 StatusCode::INTERNAL_SERVER_ERROR, 1053 Json(json!({"error": "InternalError"})), 1054 ) 1055 .into_response(); 1056 } 1057 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 1058 if let Err(e) = state.cache.delete(&cache_key).await { 1059 warn!("Failed to invalidate session cache: {:?}", e); 1060 } 1061 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 1062 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 1063 let session_id: i32 = match oauth_id.parse() { 1064 Ok(id) => id, 1065 Err(_) => { 1066 return ( 1067 StatusCode::BAD_REQUEST, 1068 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1069 ) 1070 .into_response(); 1071 } 1072 }; 1073 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1074 .bind(session_id) 1075 .bind(&auth.0.did) 1076 .execute(&state.db) 1077 .await; 1078 match result { 1079 Ok(r) if r.rows_affected() == 0 => { 1080 return ( 1081 StatusCode::NOT_FOUND, 1082 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1083 ) 1084 .into_response(); 1085 } 1086 Err(e) => { 1087 error!("DB error deleting OAuth session: {:?}", e); 1088 return ( 1089 StatusCode::INTERNAL_SERVER_ERROR, 1090 Json(json!({"error": "InternalError"})), 1091 ) 1092 .into_response(); 1093 } 1094 _ => {} 1095 } 1096 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1097 } else { 1098 return ( 1099 StatusCode::BAD_REQUEST, 1100 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1101 ) 1102 .into_response(); 1103 } 1104 (StatusCode::OK, Json(json!({}))).into_response() 1105} 1106 1107pub async fn revoke_all_sessions( 1108 State(state): State<AppState>, 1109 headers: HeaderMap, 1110 auth: BearerAuth, 1111) -> Response { 1112 let current_jti = headers 1113 .get("authorization") 1114 .and_then(|v| v.to_str().ok()) 1115 .and_then(|v| v.strip_prefix("Bearer ")) 1116 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1117 1118 if let Some(ref jti) = current_jti { 1119 if auth.0.is_oauth { 1120 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1121 .bind(&auth.0.did) 1122 .execute(&state.db) 1123 .await 1124 { 1125 error!("DB error revoking JWT sessions: {:?}", e); 1126 return ( 1127 StatusCode::INTERNAL_SERVER_ERROR, 1128 Json(json!({"error": "InternalError"})), 1129 ) 1130 .into_response(); 1131 } 1132 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1133 .bind(&auth.0.did) 1134 .bind(jti) 1135 .execute(&state.db) 1136 .await 1137 { 1138 error!("DB error revoking OAuth sessions: {:?}", e); 1139 return ( 1140 StatusCode::INTERNAL_SERVER_ERROR, 1141 Json(json!({"error": "InternalError"})), 1142 ) 1143 .into_response(); 1144 } 1145 } else { 1146 if let Err(e) = 1147 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1148 .bind(&auth.0.did) 1149 .bind(jti) 1150 .execute(&state.db) 1151 .await 1152 { 1153 error!("DB error revoking JWT sessions: {:?}", e); 1154 return ( 1155 StatusCode::INTERNAL_SERVER_ERROR, 1156 Json(json!({"error": "InternalError"})), 1157 ) 1158 .into_response(); 1159 } 1160 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1161 .bind(&auth.0.did) 1162 .execute(&state.db) 1163 .await 1164 { 1165 error!("DB error revoking OAuth sessions: {:?}", e); 1166 return ( 1167 StatusCode::INTERNAL_SERVER_ERROR, 1168 Json(json!({"error": "InternalError"})), 1169 ) 1170 .into_response(); 1171 } 1172 } 1173 } else { 1174 return ( 1175 StatusCode::BAD_REQUEST, 1176 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1177 ) 1178 .into_response(); 1179 } 1180 1181 info!(did = %auth.0.did, "All other sessions revoked"); 1182 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1183} 1184 1185#[derive(Serialize)] 1186#[serde(rename_all = "camelCase")] 1187pub struct LegacyLoginPreferenceOutput { 1188 pub allow_legacy_login: bool, 1189 pub has_mfa: bool, 1190} 1191 1192pub async fn get_legacy_login_preference( 1193 State(state): State<AppState>, 1194 auth: BearerAuth, 1195) -> Response { 1196 let result = sqlx::query!( 1197 r#"SELECT 1198 u.allow_legacy_login, 1199 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1200 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1201 FROM users u WHERE u.did = $1"#, 1202 auth.0.did 1203 ) 1204 .fetch_optional(&state.db) 1205 .await; 1206 1207 match result { 1208 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1209 allow_legacy_login: row.allow_legacy_login, 1210 has_mfa: row.has_mfa, 1211 }) 1212 .into_response(), 1213 Ok(None) => ( 1214 StatusCode::NOT_FOUND, 1215 Json(json!({"error": "AccountNotFound"})), 1216 ) 1217 .into_response(), 1218 Err(e) => { 1219 error!("DB error: {:?}", e); 1220 ( 1221 StatusCode::INTERNAL_SERVER_ERROR, 1222 Json(json!({"error": "InternalError"})), 1223 ) 1224 .into_response() 1225 } 1226 } 1227} 1228 1229#[derive(Deserialize)] 1230#[serde(rename_all = "camelCase")] 1231pub struct UpdateLegacyLoginInput { 1232 pub allow_legacy_login: bool, 1233} 1234 1235pub async fn update_legacy_login_preference( 1236 State(state): State<AppState>, 1237 auth: BearerAuth, 1238 Json(input): Json<UpdateLegacyLoginInput>, 1239) -> Response { 1240 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1241 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1242 .await; 1243 } 1244 1245 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1246 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1247 } 1248 1249 let result = sqlx::query!( 1250 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1251 input.allow_legacy_login, 1252 auth.0.did 1253 ) 1254 .fetch_optional(&state.db) 1255 .await; 1256 1257 match result { 1258 Ok(Some(_)) => { 1259 info!( 1260 did = %auth.0.did, 1261 allow_legacy_login = input.allow_legacy_login, 1262 "Legacy login preference updated" 1263 ); 1264 Json(json!({ 1265 "allowLegacyLogin": input.allow_legacy_login 1266 })) 1267 .into_response() 1268 } 1269 Ok(None) => ( 1270 StatusCode::NOT_FOUND, 1271 Json(json!({"error": "AccountNotFound"})), 1272 ) 1273 .into_response(), 1274 Err(e) => { 1275 error!("DB error: {:?}", e); 1276 ( 1277 StatusCode::INTERNAL_SERVER_ERROR, 1278 Json(json!({"error": "InternalError"})), 1279 ) 1280 .into_response() 1281 } 1282 } 1283} 1284 1285use crate::comms::locale::VALID_LOCALES; 1286 1287#[derive(Deserialize)] 1288#[serde(rename_all = "camelCase")] 1289pub struct UpdateLocaleInput { 1290 pub preferred_locale: String, 1291} 1292 1293pub async fn update_locale( 1294 State(state): State<AppState>, 1295 auth: BearerAuth, 1296 Json(input): Json<UpdateLocaleInput>, 1297) -> Response { 1298 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1299 return ( 1300 StatusCode::BAD_REQUEST, 1301 Json(json!({ 1302 "error": "InvalidRequest", 1303 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1304 })), 1305 ) 1306 .into_response(); 1307 } 1308 1309 let result = sqlx::query!( 1310 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1311 input.preferred_locale, 1312 auth.0.did 1313 ) 1314 .fetch_optional(&state.db) 1315 .await; 1316 1317 match result { 1318 Ok(Some(_)) => { 1319 info!( 1320 did = %auth.0.did, 1321 locale = %input.preferred_locale, 1322 "User locale preference updated" 1323 ); 1324 Json(json!({ 1325 "preferredLocale": input.preferred_locale 1326 })) 1327 .into_response() 1328 } 1329 Ok(None) => ( 1330 StatusCode::NOT_FOUND, 1331 Json(json!({"error": "AccountNotFound"})), 1332 ) 1333 .into_response(), 1334 Err(e) => { 1335 error!("DB error updating locale: {:?}", e); 1336 ( 1337 StatusCode::INTERNAL_SERVER_ERROR, 1338 Json(json!({"error": "InternalError"})), 1339 ) 1340 .into_response() 1341 } 1342 } 1343}