this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") 17 && let Ok(value) = forwarded.to_str() 18 && let Some(first_ip) = value.split(',').next() 19 { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() 24 { 25 return value.trim().to_string(); 26 } 27 "unknown".to_string() 28} 29 30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 31 let identifier = identifier.trim(); 32 if identifier.contains('@') || identifier.starts_with("did:") { 33 identifier.to_string() 34 } else if !identifier.contains('.') { 35 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 36 } else { 37 identifier.to_lowercase() 38 } 39} 40 41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 42 stored_handle.to_string() 43} 44 45#[derive(Deserialize)] 46#[serde(rename_all = "camelCase")] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50 #[serde(default)] 51 pub allow_takendown: bool, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateSessionOutput { 57 pub access_jwt: String, 58 pub refresh_jwt: String, 59 pub handle: String, 60 pub did: String, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub did_doc: Option<serde_json::Value>, 63 #[serde(skip_serializing_if = "Option::is_none")] 64 pub email: Option<String>, 65 #[serde(skip_serializing_if = "Option::is_none")] 66 pub email_confirmed: Option<bool>, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub active: Option<bool>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub status: Option<String>, 71} 72 73pub async fn create_session( 74 State(state): State<AppState>, 75 headers: HeaderMap, 76 Json(input): Json<CreateSessionInput>, 77) -> Response { 78 info!( 79 "create_session called with identifier: {}", 80 input.identifier 81 ); 82 let client_ip = extract_client_ip(&headers); 83 if !state 84 .check_rate_limit(RateLimitKind::Login, &client_ip) 85 .await 86 { 87 warn!(ip = %client_ip, "Login rate limit exceeded"); 88 return ( 89 StatusCode::TOO_MANY_REQUESTS, 90 Json(json!({ 91 "error": "RateLimitExceeded", 92 "message": "Too many login attempts. Please try again later." 93 })), 94 ) 95 .into_response(); 96 } 97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 99 info!( 100 "Normalized identifier: {} -> {}", 101 input.identifier, normalized_identifier 102 ); 103 let row = match sqlx::query!( 104 r#"SELECT 105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref, 106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 107 u.allow_legacy_login, 108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 109 k.key_bytes, k.encryption_version, 110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 111 FROM users u 112 JOIN user_keys k ON u.id = k.user_id 113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 114 normalized_identifier 115 ) 116 .fetch_optional(&state.db) 117 .await 118 { 119 Ok(Some(row)) => row, 120 Ok(None) => { 121 let _ = verify( 122 &input.password, 123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 124 ); 125 warn!("User not found for login attempt"); 126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 127 .into_response(); 128 } 129 Err(e) => { 130 error!("Database error fetching user: {:?}", e); 131 return ApiError::InternalError.into_response(); 132 } 133 }; 134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 135 Ok(k) => k, 136 Err(e) => { 137 error!("Failed to decrypt user key: {:?}", e); 138 return ApiError::InternalError.into_response(); 139 } 140 }; 141 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row 142 .password_hash 143 .as_ref() 144 .map(|h| verify(&input.password, h).unwrap_or(false)) 145 .unwrap_or(false) 146 { 147 (true, None, None, None) 148 } else { 149 let app_passwords = sqlx::query!( 150 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 151 row.id 152 ) 153 .fetch_all(&state.db) 154 .await 155 .unwrap_or_default(); 156 let matched = app_passwords 157 .iter() 158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 159 match matched { 160 Some(app) => ( 161 true, 162 Some(app.name.clone()), 163 app.scopes.clone(), 164 app.created_by_controller_did.clone(), 165 ), 166 None => (false, None, None, None), 167 } 168 }; 169 if !password_valid { 170 warn!("Password verification failed for login attempt"); 171 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 172 .into_response(); 173 } 174 let is_takendown = row.takedown_ref.is_some(); 175 if is_takendown && !input.allow_takendown { 176 warn!("Login attempt for takendown account: {}", row.did); 177 return ( 178 StatusCode::UNAUTHORIZED, 179 Json(json!({ 180 "error": "AccountTakedown", 181 "message": "Account has been taken down" 182 })), 183 ) 184 .into_response(); 185 } 186 let is_verified = 187 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 188 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 189 .await 190 .unwrap_or(false); 191 if !is_verified && !is_delegated { 192 warn!("Login attempt for unverified account: {}", row.did); 193 return ( 194 StatusCode::FORBIDDEN, 195 Json(json!({ 196 "error": "AccountNotVerified", 197 "message": "Please verify your account before logging in", 198 "did": row.did 199 })), 200 ) 201 .into_response(); 202 } 203 let has_totp = row.totp_enabled.unwrap_or(false); 204 let is_legacy_login = has_totp; 205 if has_totp && !row.allow_legacy_login { 206 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 207 return ( 208 StatusCode::FORBIDDEN, 209 Json(json!({ 210 "error": "MfaRequired", 211 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 212 "did": row.did 213 })), 214 ) 215 .into_response(); 216 } 217 let access_meta = match crate::auth::create_access_token_with_delegation( 218 &row.did, 219 &key_bytes, 220 app_password_scopes.as_deref(), 221 app_password_controller.as_deref(), 222 ) { 223 Ok(m) => m, 224 Err(e) => { 225 error!("Failed to create access token: {:?}", e); 226 return ApiError::InternalError.into_response(); 227 } 228 }; 229 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 230 Ok(m) => m, 231 Err(e) => { 232 error!("Failed to create refresh token: {:?}", e); 233 return ApiError::InternalError.into_response(); 234 } 235 }; 236 let did_for_doc = row.did.clone(); 237 let did_resolver = state.did_resolver.clone(); 238 let (insert_result, did_doc) = tokio::join!( 239 sqlx::query!( 240 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", 241 row.did, 242 access_meta.jti, 243 refresh_meta.jti, 244 access_meta.expires_at, 245 refresh_meta.expires_at, 246 is_legacy_login, 247 false, 248 app_password_scopes, 249 app_password_controller, 250 app_password_name 251 ) 252 .execute(&state.db), 253 did_resolver.resolve_did_document(&did_for_doc) 254 ); 255 if let Err(e) = insert_result { 256 error!("Failed to insert session: {:?}", e); 257 return ApiError::InternalError.into_response(); 258 } 259 if is_legacy_login { 260 warn!( 261 did = %row.did, 262 ip = %client_ip, 263 "Legacy login on TOTP-enabled account - sending notification" 264 ); 265 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 266 if let Err(e) = crate::comms::queue_legacy_login_notification( 267 &state.db, 268 row.id, 269 &hostname, 270 &client_ip, 271 row.preferred_comms_channel, 272 ) 273 .await 274 { 275 error!("Failed to queue legacy login notification: {:?}", e); 276 } 277 } 278 let handle = full_handle(&row.handle, &pds_hostname); 279 let is_active = row.deactivated_at.is_none() && !is_takendown; 280 let status = if is_takendown { 281 Some("takendown".to_string()) 282 } else if row.deactivated_at.is_some() { 283 Some("deactivated".to_string()) 284 } else { 285 None 286 }; 287 Json(CreateSessionOutput { 288 access_jwt: access_meta.token, 289 refresh_jwt: refresh_meta.token, 290 handle, 291 did: row.did, 292 did_doc, 293 email: row.email, 294 email_confirmed: Some(row.email_verified), 295 active: Some(is_active), 296 status, 297 }) 298 .into_response() 299} 300 301pub async fn get_session( 302 State(state): State<AppState>, 303 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 304) -> Response { 305 let permissions = auth_user.permissions(); 306 let can_read_email = permissions.allows_email_read(); 307 308 let did_for_doc = auth_user.did.clone(); 309 let did_resolver = state.did_resolver.clone(); 310 let (db_result, did_doc) = tokio::join!( 311 sqlx::query!( 312 r#"SELECT 313 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale, 314 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 315 discord_verified, telegram_verified, signal_verified 316 FROM users WHERE did = $1"#, 317 auth_user.did 318 ) 319 .fetch_optional(&state.db), 320 did_resolver.resolve_did_document(&did_for_doc) 321 ); 322 match db_result { 323 Ok(Some(row)) => { 324 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 325 crate::comms::CommsChannel::Email => ("email", row.email_verified), 326 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 327 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 328 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 329 }; 330 let pds_hostname = 331 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 332 let handle = full_handle(&row.handle, &pds_hostname); 333 let is_takendown = row.takedown_ref.is_some(); 334 let is_active = row.deactivated_at.is_none() && !is_takendown; 335 let email_value = if can_read_email { 336 row.email.clone() 337 } else { 338 None 339 }; 340 let email_confirmed_value = can_read_email && row.email_verified; 341 let mut response = json!({ 342 "handle": handle, 343 "did": auth_user.did, 344 "active": is_active, 345 "preferredChannel": preferred_channel, 346 "preferredChannelVerified": preferred_channel_verified, 347 "preferredLocale": row.preferred_locale, 348 "isAdmin": row.is_admin 349 }); 350 if can_read_email { 351 response["email"] = json!(email_value); 352 response["emailConfirmed"] = json!(email_confirmed_value); 353 } 354 if is_takendown { 355 response["status"] = json!("takendown"); 356 } else if row.deactivated_at.is_some() { 357 response["status"] = json!("deactivated"); 358 } 359 if let Some(doc) = did_doc { 360 response["didDoc"] = doc; 361 } 362 Json(response) 363 .into_response() 364 } 365 Ok(None) => ApiError::AuthenticationFailed.into_response(), 366 Err(e) => { 367 error!("Database error in get_session: {:?}", e); 368 ApiError::InternalError.into_response() 369 } 370 } 371} 372 373pub async fn delete_session( 374 State(state): State<AppState>, 375 headers: axum::http::HeaderMap, 376) -> Response { 377 let token = match crate::auth::extract_bearer_token_from_header( 378 headers.get("Authorization").and_then(|h| h.to_str().ok()), 379 ) { 380 Some(t) => t, 381 None => return ApiError::AuthenticationRequired.into_response(), 382 }; 383 let jti = match crate::auth::get_jti_from_token(&token) { 384 Ok(jti) => jti, 385 Err(_) => return ApiError::AuthenticationFailed.into_response(), 386 }; 387 let did = crate::auth::get_did_from_token(&token).ok(); 388 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 389 .execute(&state.db) 390 .await 391 { 392 Ok(res) if res.rows_affected() > 0 => { 393 if let Some(did) = did { 394 let session_cache_key = format!("auth:session:{}:{}", did, jti); 395 let _ = state.cache.delete(&session_cache_key).await; 396 } 397 Json(json!({})).into_response() 398 } 399 Ok(_) => ApiError::AuthenticationFailed.into_response(), 400 Err(e) => { 401 error!("Database error in delete_session: {:?}", e); 402 ApiError::AuthenticationFailed.into_response() 403 } 404 } 405} 406 407pub async fn refresh_session( 408 State(state): State<AppState>, 409 headers: axum::http::HeaderMap, 410) -> Response { 411 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 412 if !state 413 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 414 .await 415 { 416 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 417 return ( 418 axum::http::StatusCode::TOO_MANY_REQUESTS, 419 axum::Json(serde_json::json!({ 420 "error": "RateLimitExceeded", 421 "message": "Too many requests. Please try again later." 422 })), 423 ) 424 .into_response(); 425 } 426 let refresh_token = match crate::auth::extract_bearer_token_from_header( 427 headers.get("Authorization").and_then(|h| h.to_str().ok()), 428 ) { 429 Some(t) => t, 430 None => return ApiError::AuthenticationRequired.into_response(), 431 }; 432 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 433 Ok(jti) => jti, 434 Err(_) => { 435 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 436 .into_response(); 437 } 438 }; 439 let mut tx = match state.db.begin().await { 440 Ok(tx) => tx, 441 Err(e) => { 442 error!("Failed to begin transaction: {:?}", e); 443 return ApiError::InternalError.into_response(); 444 } 445 }; 446 if let Ok(Some(session_id)) = sqlx::query_scalar!( 447 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 448 refresh_jti 449 ) 450 .fetch_optional(&mut *tx) 451 .await 452 { 453 warn!( 454 "Refresh token reuse detected! Revoking token family for session_id: {}", 455 session_id 456 ); 457 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 458 .execute(&mut *tx) 459 .await; 460 let _ = tx.commit().await; 461 return ApiError::ExpiredTokenMsg( 462 "Refresh token has been revoked due to suspected compromise".into(), 463 ) 464 .into_response(); 465 } 466 let session_row = match sqlx::query!( 467 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 468 FROM session_tokens st 469 JOIN users u ON st.did = u.did 470 JOIN user_keys k ON u.id = k.user_id 471 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 472 FOR UPDATE OF st"#, 473 refresh_jti 474 ) 475 .fetch_optional(&mut *tx) 476 .await 477 { 478 Ok(Some(row)) => row, 479 Ok(None) => { 480 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 481 .into_response(); 482 } 483 Err(e) => { 484 error!("Database error fetching session: {:?}", e); 485 return ApiError::InternalError.into_response(); 486 } 487 }; 488 let key_bytes = 489 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 490 Ok(k) => k, 491 Err(e) => { 492 error!("Failed to decrypt user key: {:?}", e); 493 return ApiError::InternalError.into_response(); 494 } 495 }; 496 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 497 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 498 } 499 let new_access_meta = match crate::auth::create_access_token_with_delegation( 500 &session_row.did, 501 &key_bytes, 502 session_row.scope.as_deref(), 503 session_row.controller_did.as_deref(), 504 ) { 505 Ok(m) => m, 506 Err(e) => { 507 error!("Failed to create access token: {:?}", e); 508 return ApiError::InternalError.into_response(); 509 } 510 }; 511 let new_refresh_meta = 512 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 513 Ok(m) => m, 514 Err(e) => { 515 error!("Failed to create refresh token: {:?}", e); 516 return ApiError::InternalError.into_response(); 517 } 518 }; 519 match sqlx::query!( 520 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 521 refresh_jti, 522 session_row.id 523 ) 524 .execute(&mut *tx) 525 .await 526 { 527 Ok(result) if result.rows_affected() == 0 => { 528 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 529 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 530 .execute(&mut *tx) 531 .await; 532 let _ = tx.commit().await; 533 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 534 } 535 Err(e) => { 536 error!("Failed to record used refresh token: {:?}", e); 537 return ApiError::InternalError.into_response(); 538 } 539 Ok(_) => {} 540 } 541 if let Err(e) = sqlx::query!( 542 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 543 new_access_meta.jti, 544 new_refresh_meta.jti, 545 new_access_meta.expires_at, 546 new_refresh_meta.expires_at, 547 session_row.id 548 ) 549 .execute(&mut *tx) 550 .await 551 { 552 error!("Database error updating session: {:?}", e); 553 return ApiError::InternalError.into_response(); 554 } 555 if let Err(e) = tx.commit().await { 556 error!("Failed to commit transaction: {:?}", e); 557 return ApiError::InternalError.into_response(); 558 } 559 let did_for_doc = session_row.did.clone(); 560 let did_resolver = state.did_resolver.clone(); 561 let (db_result, did_doc) = tokio::join!( 562 sqlx::query!( 563 r#"SELECT 564 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref, 565 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 566 discord_verified, telegram_verified, signal_verified 567 FROM users WHERE did = $1"#, 568 session_row.did 569 ) 570 .fetch_optional(&state.db), 571 did_resolver.resolve_did_document(&did_for_doc) 572 ); 573 match db_result { 574 Ok(Some(u)) => { 575 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 576 crate::comms::CommsChannel::Email => ("email", u.email_verified), 577 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 578 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 579 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 580 }; 581 let pds_hostname = 582 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 583 let handle = full_handle(&u.handle, &pds_hostname); 584 let is_takendown = u.takedown_ref.is_some(); 585 let is_active = u.deactivated_at.is_none() && !is_takendown; 586 let mut response = json!({ 587 "accessJwt": new_access_meta.token, 588 "refreshJwt": new_refresh_meta.token, 589 "handle": handle, 590 "did": session_row.did, 591 "email": u.email, 592 "emailConfirmed": u.email_verified, 593 "preferredChannel": preferred_channel, 594 "preferredChannelVerified": preferred_channel_verified, 595 "preferredLocale": u.preferred_locale, 596 "isAdmin": u.is_admin, 597 "active": is_active 598 }); 599 if let Some(doc) = did_doc { 600 response["didDoc"] = doc; 601 } 602 if is_takendown { 603 response["status"] = json!("takendown"); 604 } else if u.deactivated_at.is_some() { 605 response["status"] = json!("deactivated"); 606 } 607 Json(response) 608 .into_response() 609 } 610 Ok(None) => { 611 error!("User not found for existing session: {}", session_row.did); 612 ApiError::InternalError.into_response() 613 } 614 Err(e) => { 615 error!("Database error fetching user: {:?}", e); 616 ApiError::InternalError.into_response() 617 } 618 } 619} 620 621#[derive(Deserialize)] 622#[serde(rename_all = "camelCase")] 623pub struct ConfirmSignupInput { 624 pub did: String, 625 pub verification_code: String, 626} 627 628#[derive(Serialize)] 629#[serde(rename_all = "camelCase")] 630pub struct ConfirmSignupOutput { 631 pub access_jwt: String, 632 pub refresh_jwt: String, 633 pub handle: String, 634 pub did: String, 635 pub email: Option<String>, 636 pub email_verified: bool, 637 pub preferred_channel: String, 638 pub preferred_channel_verified: bool, 639} 640 641pub async fn confirm_signup( 642 State(state): State<AppState>, 643 Json(input): Json<ConfirmSignupInput>, 644) -> Response { 645 info!("confirm_signup called for DID: {}", input.did); 646 let row = match sqlx::query!( 647 r#"SELECT 648 u.id, u.did, u.handle, u.email, 649 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 650 u.discord_id, u.telegram_username, u.signal_number, 651 k.key_bytes, k.encryption_version 652 FROM users u 653 JOIN user_keys k ON u.id = k.user_id 654 WHERE u.did = $1"#, 655 input.did 656 ) 657 .fetch_optional(&state.db) 658 .await 659 { 660 Ok(Some(row)) => row, 661 Ok(None) => { 662 warn!("User not found for confirm_signup: {}", input.did); 663 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 664 .into_response(); 665 } 666 Err(e) => { 667 error!("Database error in confirm_signup: {:?}", e); 668 return ApiError::InternalError.into_response(); 669 } 670 }; 671 672 let (channel_str, identifier) = match row.channel { 673 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 674 crate::comms::CommsChannel::Discord => { 675 ("discord", row.discord_id.clone().unwrap_or_default()) 676 } 677 crate::comms::CommsChannel::Telegram => ( 678 "telegram", 679 row.telegram_username.clone().unwrap_or_default(), 680 ), 681 crate::comms::CommsChannel::Signal => { 682 ("signal", row.signal_number.clone().unwrap_or_default()) 683 } 684 }; 685 686 let normalized_token = 687 crate::auth::verification_token::normalize_token_input(&input.verification_code); 688 match crate::auth::verification_token::verify_signup_token( 689 &normalized_token, 690 channel_str, 691 &identifier, 692 ) { 693 Ok(token_data) => { 694 if token_data.did != input.did { 695 warn!( 696 "Token DID mismatch for confirm_signup: expected {}, got {}", 697 input.did, token_data.did 698 ); 699 return ApiError::InvalidRequest("Invalid verification code".into()) 700 .into_response(); 701 } 702 } 703 Err(crate::auth::verification_token::VerifyError::Expired) => { 704 warn!("Verification code expired for user: {}", input.did); 705 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 706 .into_response(); 707 } 708 Err(e) => { 709 warn!("Invalid verification code for user {}: {:?}", input.did, e); 710 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 711 } 712 } 713 714 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 715 Ok(k) => k, 716 Err(e) => { 717 error!("Failed to decrypt user key: {:?}", e); 718 return ApiError::InternalError.into_response(); 719 } 720 }; 721 let verified_column = match row.channel { 722 crate::comms::CommsChannel::Email => "email_verified", 723 crate::comms::CommsChannel::Discord => "discord_verified", 724 crate::comms::CommsChannel::Telegram => "telegram_verified", 725 crate::comms::CommsChannel::Signal => "signal_verified", 726 }; 727 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 728 if let Err(e) = sqlx::query(&update_query) 729 .bind(&input.did) 730 .execute(&state.db) 731 .await 732 { 733 error!("Failed to update verification status: {:?}", e); 734 return ApiError::InternalError.into_response(); 735 } 736 737 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 738 Ok(m) => m, 739 Err(e) => { 740 error!("Failed to create access token: {:?}", e); 741 return ApiError::InternalError.into_response(); 742 } 743 }; 744 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 745 Ok(m) => m, 746 Err(e) => { 747 error!("Failed to create refresh token: {:?}", e); 748 return ApiError::InternalError.into_response(); 749 } 750 }; 751 let no_scope: Option<String> = None; 752 if let Err(e) = sqlx::query!( 753 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 754 row.did, 755 access_meta.jti, 756 refresh_meta.jti, 757 access_meta.expires_at, 758 refresh_meta.expires_at, 759 false, 760 false, 761 no_scope 762 ) 763 .execute(&state.db) 764 .await 765 { 766 error!("Failed to insert session: {:?}", e); 767 return ApiError::InternalError.into_response(); 768 } 769 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 770 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 771 warn!("Failed to enqueue welcome notification: {:?}", e); 772 } 773 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 774 let preferred_channel = match row.channel { 775 crate::comms::CommsChannel::Email => "email", 776 crate::comms::CommsChannel::Discord => "discord", 777 crate::comms::CommsChannel::Telegram => "telegram", 778 crate::comms::CommsChannel::Signal => "signal", 779 }; 780 Json(ConfirmSignupOutput { 781 access_jwt: access_meta.token, 782 refresh_jwt: refresh_meta.token, 783 handle: row.handle, 784 did: row.did, 785 email: row.email, 786 email_verified, 787 preferred_channel: preferred_channel.to_string(), 788 preferred_channel_verified: true, 789 }) 790 .into_response() 791} 792 793#[derive(Deserialize)] 794#[serde(rename_all = "camelCase")] 795pub struct ResendVerificationInput { 796 pub did: String, 797} 798 799pub async fn resend_verification( 800 State(state): State<AppState>, 801 Json(input): Json<ResendVerificationInput>, 802) -> Response { 803 info!("resend_verification called for DID: {}", input.did); 804 let row = match sqlx::query!( 805 r#"SELECT 806 id, handle, email, 807 preferred_comms_channel as "channel: crate::comms::CommsChannel", 808 discord_id, telegram_username, signal_number, 809 email_verified, discord_verified, telegram_verified, signal_verified 810 FROM users 811 WHERE did = $1"#, 812 input.did 813 ) 814 .fetch_optional(&state.db) 815 .await 816 { 817 Ok(Some(row)) => row, 818 Ok(None) => { 819 return ApiError::InvalidRequest("User not found".into()).into_response(); 820 } 821 Err(e) => { 822 error!("Database error in resend_verification: {:?}", e); 823 return ApiError::InternalError.into_response(); 824 } 825 }; 826 let is_verified = 827 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 828 if is_verified { 829 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 830 } 831 832 let (channel_str, recipient) = match row.channel { 833 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 834 crate::comms::CommsChannel::Discord => { 835 ("discord", row.discord_id.clone().unwrap_or_default()) 836 } 837 crate::comms::CommsChannel::Telegram => ( 838 "telegram", 839 row.telegram_username.clone().unwrap_or_default(), 840 ), 841 crate::comms::CommsChannel::Signal => { 842 ("signal", row.signal_number.clone().unwrap_or_default()) 843 } 844 }; 845 846 let verification_token = 847 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 848 let formatted_token = 849 crate::auth::verification_token::format_token_for_display(&verification_token); 850 851 if let Err(e) = crate::comms::enqueue_signup_verification( 852 &state.db, 853 row.id, 854 channel_str, 855 &recipient, 856 &formatted_token, 857 None, 858 ) 859 .await 860 { 861 warn!("Failed to enqueue verification notification: {:?}", e); 862 } 863 Json(json!({"success": true})).into_response() 864} 865 866#[derive(Serialize)] 867#[serde(rename_all = "camelCase")] 868pub struct SessionInfo { 869 pub id: String, 870 pub session_type: String, 871 pub client_name: Option<String>, 872 pub created_at: String, 873 pub expires_at: String, 874 pub is_current: bool, 875} 876 877#[derive(Serialize)] 878#[serde(rename_all = "camelCase")] 879pub struct ListSessionsOutput { 880 pub sessions: Vec<SessionInfo>, 881} 882 883pub async fn list_sessions( 884 State(state): State<AppState>, 885 headers: HeaderMap, 886 auth: BearerAuth, 887) -> Response { 888 let current_jti = headers 889 .get("authorization") 890 .and_then(|v| v.to_str().ok()) 891 .and_then(|v| v.strip_prefix("Bearer ")) 892 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 893 894 let mut sessions: Vec<SessionInfo> = Vec::new(); 895 896 let jwt_result = sqlx::query_as::< 897 _, 898 ( 899 i32, 900 String, 901 chrono::DateTime<chrono::Utc>, 902 chrono::DateTime<chrono::Utc>, 903 ), 904 >( 905 r#" 906 SELECT id, access_jti, created_at, refresh_expires_at 907 FROM session_tokens 908 WHERE did = $1 AND refresh_expires_at > NOW() 909 ORDER BY created_at DESC 910 "#, 911 ) 912 .bind(&auth.0.did) 913 .fetch_all(&state.db) 914 .await; 915 916 match jwt_result { 917 Ok(rows) => { 918 for (id, access_jti, created_at, expires_at) in rows { 919 sessions.push(SessionInfo { 920 id: format!("jwt:{}", id), 921 session_type: "legacy".to_string(), 922 client_name: None, 923 created_at: created_at.to_rfc3339(), 924 expires_at: expires_at.to_rfc3339(), 925 is_current: current_jti.as_ref() == Some(&access_jti), 926 }); 927 } 928 } 929 Err(e) => { 930 error!("DB error fetching JWT sessions: {:?}", e); 931 return ( 932 StatusCode::INTERNAL_SERVER_ERROR, 933 Json(json!({"error": "InternalError"})), 934 ) 935 .into_response(); 936 } 937 } 938 939 let oauth_result = sqlx::query_as::< 940 _, 941 ( 942 i32, 943 String, 944 chrono::DateTime<chrono::Utc>, 945 chrono::DateTime<chrono::Utc>, 946 String, 947 ), 948 >( 949 r#" 950 SELECT id, token_id, created_at, expires_at, client_id 951 FROM oauth_token 952 WHERE did = $1 AND expires_at > NOW() 953 ORDER BY created_at DESC 954 "#, 955 ) 956 .bind(&auth.0.did) 957 .fetch_all(&state.db) 958 .await; 959 960 match oauth_result { 961 Ok(rows) => { 962 for (id, token_id, created_at, expires_at, client_id) in rows { 963 let client_name = extract_client_name(&client_id); 964 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 965 sessions.push(SessionInfo { 966 id: format!("oauth:{}", id), 967 session_type: "oauth".to_string(), 968 client_name: Some(client_name), 969 created_at: created_at.to_rfc3339(), 970 expires_at: expires_at.to_rfc3339(), 971 is_current: is_current_oauth, 972 }); 973 } 974 } 975 Err(e) => { 976 error!("DB error fetching OAuth sessions: {:?}", e); 977 return ( 978 StatusCode::INTERNAL_SERVER_ERROR, 979 Json(json!({"error": "InternalError"})), 980 ) 981 .into_response(); 982 } 983 } 984 985 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 986 987 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 988} 989 990fn extract_client_name(client_id: &str) -> String { 991 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 992 "Localhost App".to_string() 993 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 994 parsed.host_str().unwrap_or("Unknown App").to_string() 995 } else { 996 client_id.to_string() 997 } 998} 999 1000#[derive(Deserialize)] 1001#[serde(rename_all = "camelCase")] 1002pub struct RevokeSessionInput { 1003 pub session_id: String, 1004} 1005 1006pub async fn revoke_session( 1007 State(state): State<AppState>, 1008 auth: BearerAuth, 1009 Json(input): Json<RevokeSessionInput>, 1010) -> Response { 1011 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 1012 let session_id: i32 = match jwt_id.parse() { 1013 Ok(id) => id, 1014 Err(_) => { 1015 return ( 1016 StatusCode::BAD_REQUEST, 1017 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1018 ) 1019 .into_response(); 1020 } 1021 }; 1022 let session = sqlx::query_as::<_, (String,)>( 1023 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 1024 ) 1025 .bind(session_id) 1026 .bind(&auth.0.did) 1027 .fetch_optional(&state.db) 1028 .await; 1029 let access_jti = match session { 1030 Ok(Some((jti,))) => jti, 1031 Ok(None) => { 1032 return ( 1033 StatusCode::NOT_FOUND, 1034 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1035 ) 1036 .into_response(); 1037 } 1038 Err(e) => { 1039 error!("DB error in revoke_session: {:?}", e); 1040 return ( 1041 StatusCode::INTERNAL_SERVER_ERROR, 1042 Json(json!({"error": "InternalError"})), 1043 ) 1044 .into_response(); 1045 } 1046 }; 1047 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 1048 .bind(session_id) 1049 .execute(&state.db) 1050 .await 1051 { 1052 error!("DB error deleting session: {:?}", e); 1053 return ( 1054 StatusCode::INTERNAL_SERVER_ERROR, 1055 Json(json!({"error": "InternalError"})), 1056 ) 1057 .into_response(); 1058 } 1059 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 1060 if let Err(e) = state.cache.delete(&cache_key).await { 1061 warn!("Failed to invalidate session cache: {:?}", e); 1062 } 1063 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 1064 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 1065 let session_id: i32 = match oauth_id.parse() { 1066 Ok(id) => id, 1067 Err(_) => { 1068 return ( 1069 StatusCode::BAD_REQUEST, 1070 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1071 ) 1072 .into_response(); 1073 } 1074 }; 1075 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1076 .bind(session_id) 1077 .bind(&auth.0.did) 1078 .execute(&state.db) 1079 .await; 1080 match result { 1081 Ok(r) if r.rows_affected() == 0 => { 1082 return ( 1083 StatusCode::NOT_FOUND, 1084 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1085 ) 1086 .into_response(); 1087 } 1088 Err(e) => { 1089 error!("DB error deleting OAuth session: {:?}", e); 1090 return ( 1091 StatusCode::INTERNAL_SERVER_ERROR, 1092 Json(json!({"error": "InternalError"})), 1093 ) 1094 .into_response(); 1095 } 1096 _ => {} 1097 } 1098 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1099 } else { 1100 return ( 1101 StatusCode::BAD_REQUEST, 1102 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1103 ) 1104 .into_response(); 1105 } 1106 (StatusCode::OK, Json(json!({}))).into_response() 1107} 1108 1109pub async fn revoke_all_sessions( 1110 State(state): State<AppState>, 1111 headers: HeaderMap, 1112 auth: BearerAuth, 1113) -> Response { 1114 let current_jti = headers 1115 .get("authorization") 1116 .and_then(|v| v.to_str().ok()) 1117 .and_then(|v| v.strip_prefix("Bearer ")) 1118 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1119 1120 if let Some(ref jti) = current_jti { 1121 if auth.0.is_oauth { 1122 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1123 .bind(&auth.0.did) 1124 .execute(&state.db) 1125 .await 1126 { 1127 error!("DB error revoking JWT sessions: {:?}", e); 1128 return ( 1129 StatusCode::INTERNAL_SERVER_ERROR, 1130 Json(json!({"error": "InternalError"})), 1131 ) 1132 .into_response(); 1133 } 1134 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1135 .bind(&auth.0.did) 1136 .bind(jti) 1137 .execute(&state.db) 1138 .await 1139 { 1140 error!("DB error revoking OAuth sessions: {:?}", e); 1141 return ( 1142 StatusCode::INTERNAL_SERVER_ERROR, 1143 Json(json!({"error": "InternalError"})), 1144 ) 1145 .into_response(); 1146 } 1147 } else { 1148 if let Err(e) = 1149 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1150 .bind(&auth.0.did) 1151 .bind(jti) 1152 .execute(&state.db) 1153 .await 1154 { 1155 error!("DB error revoking JWT sessions: {:?}", e); 1156 return ( 1157 StatusCode::INTERNAL_SERVER_ERROR, 1158 Json(json!({"error": "InternalError"})), 1159 ) 1160 .into_response(); 1161 } 1162 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1163 .bind(&auth.0.did) 1164 .execute(&state.db) 1165 .await 1166 { 1167 error!("DB error revoking OAuth sessions: {:?}", e); 1168 return ( 1169 StatusCode::INTERNAL_SERVER_ERROR, 1170 Json(json!({"error": "InternalError"})), 1171 ) 1172 .into_response(); 1173 } 1174 } 1175 } else { 1176 return ( 1177 StatusCode::BAD_REQUEST, 1178 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1179 ) 1180 .into_response(); 1181 } 1182 1183 info!(did = %auth.0.did, "All other sessions revoked"); 1184 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1185} 1186 1187#[derive(Serialize)] 1188#[serde(rename_all = "camelCase")] 1189pub struct LegacyLoginPreferenceOutput { 1190 pub allow_legacy_login: bool, 1191 pub has_mfa: bool, 1192} 1193 1194pub async fn get_legacy_login_preference( 1195 State(state): State<AppState>, 1196 auth: BearerAuth, 1197) -> Response { 1198 let result = sqlx::query!( 1199 r#"SELECT 1200 u.allow_legacy_login, 1201 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1202 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1203 FROM users u WHERE u.did = $1"#, 1204 auth.0.did 1205 ) 1206 .fetch_optional(&state.db) 1207 .await; 1208 1209 match result { 1210 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1211 allow_legacy_login: row.allow_legacy_login, 1212 has_mfa: row.has_mfa, 1213 }) 1214 .into_response(), 1215 Ok(None) => ( 1216 StatusCode::NOT_FOUND, 1217 Json(json!({"error": "AccountNotFound"})), 1218 ) 1219 .into_response(), 1220 Err(e) => { 1221 error!("DB error: {:?}", e); 1222 ( 1223 StatusCode::INTERNAL_SERVER_ERROR, 1224 Json(json!({"error": "InternalError"})), 1225 ) 1226 .into_response() 1227 } 1228 } 1229} 1230 1231#[derive(Deserialize)] 1232#[serde(rename_all = "camelCase")] 1233pub struct UpdateLegacyLoginInput { 1234 pub allow_legacy_login: bool, 1235} 1236 1237pub async fn update_legacy_login_preference( 1238 State(state): State<AppState>, 1239 auth: BearerAuth, 1240 Json(input): Json<UpdateLegacyLoginInput>, 1241) -> Response { 1242 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1243 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1244 .await; 1245 } 1246 1247 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1248 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1249 } 1250 1251 let result = sqlx::query!( 1252 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1253 input.allow_legacy_login, 1254 auth.0.did 1255 ) 1256 .fetch_optional(&state.db) 1257 .await; 1258 1259 match result { 1260 Ok(Some(_)) => { 1261 info!( 1262 did = %auth.0.did, 1263 allow_legacy_login = input.allow_legacy_login, 1264 "Legacy login preference updated" 1265 ); 1266 Json(json!({ 1267 "allowLegacyLogin": input.allow_legacy_login 1268 })) 1269 .into_response() 1270 } 1271 Ok(None) => ( 1272 StatusCode::NOT_FOUND, 1273 Json(json!({"error": "AccountNotFound"})), 1274 ) 1275 .into_response(), 1276 Err(e) => { 1277 error!("DB error: {:?}", e); 1278 ( 1279 StatusCode::INTERNAL_SERVER_ERROR, 1280 Json(json!({"error": "InternalError"})), 1281 ) 1282 .into_response() 1283 } 1284 } 1285} 1286 1287use crate::comms::locale::VALID_LOCALES; 1288 1289#[derive(Deserialize)] 1290#[serde(rename_all = "camelCase")] 1291pub struct UpdateLocaleInput { 1292 pub preferred_locale: String, 1293} 1294 1295pub async fn update_locale( 1296 State(state): State<AppState>, 1297 auth: BearerAuth, 1298 Json(input): Json<UpdateLocaleInput>, 1299) -> Response { 1300 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1301 return ( 1302 StatusCode::BAD_REQUEST, 1303 Json(json!({ 1304 "error": "InvalidRequest", 1305 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1306 })), 1307 ) 1308 .into_response(); 1309 } 1310 1311 let result = sqlx::query!( 1312 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1313 input.preferred_locale, 1314 auth.0.did 1315 ) 1316 .fetch_optional(&state.db) 1317 .await; 1318 1319 match result { 1320 Ok(Some(_)) => { 1321 info!( 1322 did = %auth.0.did, 1323 locale = %input.preferred_locale, 1324 "User locale preference updated" 1325 ); 1326 Json(json!({ 1327 "preferredLocale": input.preferred_locale 1328 })) 1329 .into_response() 1330 } 1331 Ok(None) => ( 1332 StatusCode::NOT_FOUND, 1333 Json(json!({"error": "AccountNotFound"})), 1334 ) 1335 .into_response(), 1336 Err(e) => { 1337 error!("DB error updating locale: {:?}", e); 1338 ( 1339 StatusCode::INTERNAL_SERVER_ERROR, 1340 Json(json!({"error": "InternalError"})), 1341 ) 1342 .into_response() 1343 } 1344 } 1345}