this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") 17 && let Ok(value) = forwarded.to_str() 18 && let Some(first_ip) = value.split(',').next() 19 { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() 24 { 25 return value.trim().to_string(); 26 } 27 "unknown".to_string() 28} 29 30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 31 let identifier = identifier.trim(); 32 if identifier.contains('@') || identifier.starts_with("did:") { 33 identifier.to_string() 34 } else if !identifier.contains('.') { 35 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 36 } else { 37 identifier.to_lowercase() 38 } 39} 40 41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 42 stored_handle.to_string() 43} 44 45#[derive(Deserialize)] 46#[serde(rename_all = "camelCase")] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50 #[serde(default)] 51 pub allow_takendown: bool, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateSessionOutput { 57 pub access_jwt: String, 58 pub refresh_jwt: String, 59 pub handle: String, 60 pub did: String, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub did_doc: Option<serde_json::Value>, 63 #[serde(skip_serializing_if = "Option::is_none")] 64 pub email: Option<String>, 65 #[serde(skip_serializing_if = "Option::is_none")] 66 pub email_confirmed: Option<bool>, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub active: Option<bool>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub status: Option<String>, 71} 72 73pub async fn create_session( 74 State(state): State<AppState>, 75 headers: HeaderMap, 76 Json(input): Json<CreateSessionInput>, 77) -> Response { 78 info!( 79 "create_session called with identifier: {}", 80 input.identifier 81 ); 82 let client_ip = extract_client_ip(&headers); 83 if !state 84 .check_rate_limit(RateLimitKind::Login, &client_ip) 85 .await 86 { 87 warn!(ip = %client_ip, "Login rate limit exceeded"); 88 return ( 89 StatusCode::TOO_MANY_REQUESTS, 90 Json(json!({ 91 "error": "RateLimitExceeded", 92 "message": "Too many login attempts. Please try again later." 93 })), 94 ) 95 .into_response(); 96 } 97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 99 info!( 100 "Normalized identifier: {} -> {}", 101 input.identifier, normalized_identifier 102 ); 103 let row = match sqlx::query!( 104 r#"SELECT 105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref, 106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 107 u.allow_legacy_login, u.migrated_to_pds, 108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 109 k.key_bytes, k.encryption_version, 110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 111 FROM users u 112 JOIN user_keys k ON u.id = k.user_id 113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 114 normalized_identifier 115 ) 116 .fetch_optional(&state.db) 117 .await 118 { 119 Ok(Some(row)) => row, 120 Ok(None) => { 121 let _ = verify( 122 &input.password, 123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 124 ); 125 warn!("User not found for login attempt"); 126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 127 .into_response(); 128 } 129 Err(e) => { 130 error!("Database error fetching user: {:?}", e); 131 return ApiError::InternalError.into_response(); 132 } 133 }; 134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 135 Ok(k) => k, 136 Err(e) => { 137 error!("Failed to decrypt user key: {:?}", e); 138 return ApiError::InternalError.into_response(); 139 } 140 }; 141 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row 142 .password_hash 143 .as_ref() 144 .map(|h| verify(&input.password, h).unwrap_or(false)) 145 .unwrap_or(false) 146 { 147 (true, None, None, None) 148 } else { 149 let app_passwords = sqlx::query!( 150 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 151 row.id 152 ) 153 .fetch_all(&state.db) 154 .await 155 .unwrap_or_default(); 156 let matched = app_passwords 157 .iter() 158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 159 match matched { 160 Some(app) => ( 161 true, 162 Some(app.name.clone()), 163 app.scopes.clone(), 164 app.created_by_controller_did.clone(), 165 ), 166 None => (false, None, None, None), 167 } 168 }; 169 if !password_valid { 170 warn!("Password verification failed for login attempt"); 171 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 172 .into_response(); 173 } 174 let is_takendown = row.takedown_ref.is_some(); 175 if is_takendown && !input.allow_takendown { 176 warn!("Login attempt for takendown account: {}", row.did); 177 return ( 178 StatusCode::UNAUTHORIZED, 179 Json(json!({ 180 "error": "AccountTakedown", 181 "message": "Account has been taken down" 182 })), 183 ) 184 .into_response(); 185 } 186 let is_verified = 187 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 188 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 189 .await 190 .unwrap_or(false); 191 if !is_verified && !is_delegated { 192 warn!("Login attempt for unverified account: {}", row.did); 193 return ( 194 StatusCode::FORBIDDEN, 195 Json(json!({ 196 "error": "AccountNotVerified", 197 "message": "Please verify your account before logging in", 198 "did": row.did 199 })), 200 ) 201 .into_response(); 202 } 203 let has_totp = row.totp_enabled.unwrap_or(false); 204 let is_legacy_login = has_totp; 205 if has_totp && !row.allow_legacy_login { 206 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 207 return ( 208 StatusCode::FORBIDDEN, 209 Json(json!({ 210 "error": "MfaRequired", 211 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 212 "did": row.did 213 })), 214 ) 215 .into_response(); 216 } 217 let access_meta = match crate::auth::create_access_token_with_delegation( 218 &row.did, 219 &key_bytes, 220 app_password_scopes.as_deref(), 221 app_password_controller.as_deref(), 222 ) { 223 Ok(m) => m, 224 Err(e) => { 225 error!("Failed to create access token: {:?}", e); 226 return ApiError::InternalError.into_response(); 227 } 228 }; 229 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 230 Ok(m) => m, 231 Err(e) => { 232 error!("Failed to create refresh token: {:?}", e); 233 return ApiError::InternalError.into_response(); 234 } 235 }; 236 let did_for_doc = row.did.clone(); 237 let did_resolver = state.did_resolver.clone(); 238 let (insert_result, did_doc) = tokio::join!( 239 sqlx::query!( 240 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", 241 row.did, 242 access_meta.jti, 243 refresh_meta.jti, 244 access_meta.expires_at, 245 refresh_meta.expires_at, 246 is_legacy_login, 247 false, 248 app_password_scopes, 249 app_password_controller, 250 app_password_name 251 ) 252 .execute(&state.db), 253 did_resolver.resolve_did_document(&did_for_doc) 254 ); 255 if let Err(e) = insert_result { 256 error!("Failed to insert session: {:?}", e); 257 return ApiError::InternalError.into_response(); 258 } 259 if is_legacy_login { 260 warn!( 261 did = %row.did, 262 ip = %client_ip, 263 "Legacy login on TOTP-enabled account - sending notification" 264 ); 265 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 266 if let Err(e) = crate::comms::queue_legacy_login_notification( 267 &state.db, 268 row.id, 269 &hostname, 270 &client_ip, 271 row.preferred_comms_channel, 272 ) 273 .await 274 { 275 error!("Failed to queue legacy login notification: {:?}", e); 276 } 277 } 278 let handle = full_handle(&row.handle, &pds_hostname); 279 let is_migrated = row.deactivated_at.is_some() && row.migrated_to_pds.is_some(); 280 let is_active = row.deactivated_at.is_none() && !is_takendown; 281 let status = if is_takendown { 282 Some("takendown".to_string()) 283 } else if is_migrated { 284 Some("migrated".to_string()) 285 } else if row.deactivated_at.is_some() { 286 Some("deactivated".to_string()) 287 } else { 288 None 289 }; 290 Json(CreateSessionOutput { 291 access_jwt: access_meta.token, 292 refresh_jwt: refresh_meta.token, 293 handle, 294 did: row.did, 295 did_doc, 296 email: row.email, 297 email_confirmed: Some(row.email_verified), 298 active: Some(is_active), 299 status, 300 }) 301 .into_response() 302} 303 304pub async fn get_session( 305 State(state): State<AppState>, 306 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 307) -> Response { 308 let permissions = auth_user.permissions(); 309 let can_read_email = permissions.allows_email_read(); 310 311 let did_for_doc = auth_user.did.clone(); 312 let did_resolver = state.did_resolver.clone(); 313 let (db_result, did_doc) = tokio::join!( 314 sqlx::query!( 315 r#"SELECT 316 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale, 317 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 318 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at 319 FROM users WHERE did = $1"#, 320 auth_user.did 321 ) 322 .fetch_optional(&state.db), 323 did_resolver.resolve_did_document(&did_for_doc) 324 ); 325 match db_result { 326 Ok(Some(row)) => { 327 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 328 crate::comms::CommsChannel::Email => ("email", row.email_verified), 329 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 330 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 331 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 332 }; 333 let pds_hostname = 334 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 335 let handle = full_handle(&row.handle, &pds_hostname); 336 let is_takendown = row.takedown_ref.is_some(); 337 let is_migrated = row.deactivated_at.is_some() && row.migrated_to_pds.is_some(); 338 let is_active = row.deactivated_at.is_none() && !is_takendown; 339 let email_value = if can_read_email { 340 row.email.clone() 341 } else { 342 None 343 }; 344 let email_confirmed_value = can_read_email && row.email_verified; 345 let mut response = json!({ 346 "handle": handle, 347 "did": auth_user.did, 348 "active": is_active, 349 "preferredChannel": preferred_channel, 350 "preferredChannelVerified": preferred_channel_verified, 351 "preferredLocale": row.preferred_locale, 352 "isAdmin": row.is_admin 353 }); 354 if can_read_email { 355 response["email"] = json!(email_value); 356 response["emailConfirmed"] = json!(email_confirmed_value); 357 } 358 if is_takendown { 359 response["status"] = json!("takendown"); 360 } else if is_migrated { 361 response["status"] = json!("migrated"); 362 response["migratedToPds"] = json!(row.migrated_to_pds); 363 response["migratedAt"] = json!(row.migrated_at); 364 } else if row.deactivated_at.is_some() { 365 response["status"] = json!("deactivated"); 366 } 367 if let Some(doc) = did_doc { 368 response["didDoc"] = doc; 369 } 370 Json(response).into_response() 371 } 372 Ok(None) => ApiError::AuthenticationFailed.into_response(), 373 Err(e) => { 374 error!("Database error in get_session: {:?}", e); 375 ApiError::InternalError.into_response() 376 } 377 } 378} 379 380pub async fn delete_session( 381 State(state): State<AppState>, 382 headers: axum::http::HeaderMap, 383) -> Response { 384 let token = match crate::auth::extract_bearer_token_from_header( 385 headers.get("Authorization").and_then(|h| h.to_str().ok()), 386 ) { 387 Some(t) => t, 388 None => return ApiError::AuthenticationRequired.into_response(), 389 }; 390 let jti = match crate::auth::get_jti_from_token(&token) { 391 Ok(jti) => jti, 392 Err(_) => return ApiError::AuthenticationFailed.into_response(), 393 }; 394 let did = crate::auth::get_did_from_token(&token).ok(); 395 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 396 .execute(&state.db) 397 .await 398 { 399 Ok(res) if res.rows_affected() > 0 => { 400 if let Some(did) = did { 401 let session_cache_key = format!("auth:session:{}:{}", did, jti); 402 let _ = state.cache.delete(&session_cache_key).await; 403 } 404 Json(json!({})).into_response() 405 } 406 Ok(_) => ApiError::AuthenticationFailed.into_response(), 407 Err(e) => { 408 error!("Database error in delete_session: {:?}", e); 409 ApiError::AuthenticationFailed.into_response() 410 } 411 } 412} 413 414pub async fn refresh_session( 415 State(state): State<AppState>, 416 headers: axum::http::HeaderMap, 417) -> Response { 418 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 419 if !state 420 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 421 .await 422 { 423 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 424 return ( 425 axum::http::StatusCode::TOO_MANY_REQUESTS, 426 axum::Json(serde_json::json!({ 427 "error": "RateLimitExceeded", 428 "message": "Too many requests. Please try again later." 429 })), 430 ) 431 .into_response(); 432 } 433 let refresh_token = match crate::auth::extract_bearer_token_from_header( 434 headers.get("Authorization").and_then(|h| h.to_str().ok()), 435 ) { 436 Some(t) => t, 437 None => return ApiError::AuthenticationRequired.into_response(), 438 }; 439 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 440 Ok(jti) => jti, 441 Err(_) => { 442 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 443 .into_response(); 444 } 445 }; 446 let mut tx = match state.db.begin().await { 447 Ok(tx) => tx, 448 Err(e) => { 449 error!("Failed to begin transaction: {:?}", e); 450 return ApiError::InternalError.into_response(); 451 } 452 }; 453 if let Ok(Some(session_id)) = sqlx::query_scalar!( 454 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 455 refresh_jti 456 ) 457 .fetch_optional(&mut *tx) 458 .await 459 { 460 warn!( 461 "Refresh token reuse detected! Revoking token family for session_id: {}", 462 session_id 463 ); 464 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 465 .execute(&mut *tx) 466 .await; 467 let _ = tx.commit().await; 468 return ApiError::ExpiredTokenMsg( 469 "Refresh token has been revoked due to suspected compromise".into(), 470 ) 471 .into_response(); 472 } 473 let session_row = match sqlx::query!( 474 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 475 FROM session_tokens st 476 JOIN users u ON st.did = u.did 477 JOIN user_keys k ON u.id = k.user_id 478 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 479 FOR UPDATE OF st"#, 480 refresh_jti 481 ) 482 .fetch_optional(&mut *tx) 483 .await 484 { 485 Ok(Some(row)) => row, 486 Ok(None) => { 487 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 488 .into_response(); 489 } 490 Err(e) => { 491 error!("Database error fetching session: {:?}", e); 492 return ApiError::InternalError.into_response(); 493 } 494 }; 495 let key_bytes = 496 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 497 Ok(k) => k, 498 Err(e) => { 499 error!("Failed to decrypt user key: {:?}", e); 500 return ApiError::InternalError.into_response(); 501 } 502 }; 503 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 504 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 505 } 506 let new_access_meta = match crate::auth::create_access_token_with_delegation( 507 &session_row.did, 508 &key_bytes, 509 session_row.scope.as_deref(), 510 session_row.controller_did.as_deref(), 511 ) { 512 Ok(m) => m, 513 Err(e) => { 514 error!("Failed to create access token: {:?}", e); 515 return ApiError::InternalError.into_response(); 516 } 517 }; 518 let new_refresh_meta = 519 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 520 Ok(m) => m, 521 Err(e) => { 522 error!("Failed to create refresh token: {:?}", e); 523 return ApiError::InternalError.into_response(); 524 } 525 }; 526 match sqlx::query!( 527 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 528 refresh_jti, 529 session_row.id 530 ) 531 .execute(&mut *tx) 532 .await 533 { 534 Ok(result) if result.rows_affected() == 0 => { 535 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 536 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 537 .execute(&mut *tx) 538 .await; 539 let _ = tx.commit().await; 540 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 541 } 542 Err(e) => { 543 error!("Failed to record used refresh token: {:?}", e); 544 return ApiError::InternalError.into_response(); 545 } 546 Ok(_) => {} 547 } 548 if let Err(e) = sqlx::query!( 549 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 550 new_access_meta.jti, 551 new_refresh_meta.jti, 552 new_access_meta.expires_at, 553 new_refresh_meta.expires_at, 554 session_row.id 555 ) 556 .execute(&mut *tx) 557 .await 558 { 559 error!("Database error updating session: {:?}", e); 560 return ApiError::InternalError.into_response(); 561 } 562 if let Err(e) = tx.commit().await { 563 error!("Failed to commit transaction: {:?}", e); 564 return ApiError::InternalError.into_response(); 565 } 566 let did_for_doc = session_row.did.clone(); 567 let did_resolver = state.did_resolver.clone(); 568 let (db_result, did_doc) = tokio::join!( 569 sqlx::query!( 570 r#"SELECT 571 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref, 572 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 573 discord_verified, telegram_verified, signal_verified 574 FROM users WHERE did = $1"#, 575 session_row.did 576 ) 577 .fetch_optional(&state.db), 578 did_resolver.resolve_did_document(&did_for_doc) 579 ); 580 match db_result { 581 Ok(Some(u)) => { 582 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 583 crate::comms::CommsChannel::Email => ("email", u.email_verified), 584 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 585 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 586 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 587 }; 588 let pds_hostname = 589 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 590 let handle = full_handle(&u.handle, &pds_hostname); 591 let is_takendown = u.takedown_ref.is_some(); 592 let is_active = u.deactivated_at.is_none() && !is_takendown; 593 let mut response = json!({ 594 "accessJwt": new_access_meta.token, 595 "refreshJwt": new_refresh_meta.token, 596 "handle": handle, 597 "did": session_row.did, 598 "email": u.email, 599 "emailConfirmed": u.email_verified, 600 "preferredChannel": preferred_channel, 601 "preferredChannelVerified": preferred_channel_verified, 602 "preferredLocale": u.preferred_locale, 603 "isAdmin": u.is_admin, 604 "active": is_active 605 }); 606 if let Some(doc) = did_doc { 607 response["didDoc"] = doc; 608 } 609 if is_takendown { 610 response["status"] = json!("takendown"); 611 } else if u.deactivated_at.is_some() { 612 response["status"] = json!("deactivated"); 613 } 614 Json(response).into_response() 615 } 616 Ok(None) => { 617 error!("User not found for existing session: {}", session_row.did); 618 ApiError::InternalError.into_response() 619 } 620 Err(e) => { 621 error!("Database error fetching user: {:?}", e); 622 ApiError::InternalError.into_response() 623 } 624 } 625} 626 627#[derive(Deserialize)] 628#[serde(rename_all = "camelCase")] 629pub struct ConfirmSignupInput { 630 pub did: String, 631 pub verification_code: String, 632} 633 634#[derive(Serialize)] 635#[serde(rename_all = "camelCase")] 636pub struct ConfirmSignupOutput { 637 pub access_jwt: String, 638 pub refresh_jwt: String, 639 pub handle: String, 640 pub did: String, 641 pub email: Option<String>, 642 pub email_verified: bool, 643 pub preferred_channel: String, 644 pub preferred_channel_verified: bool, 645} 646 647pub async fn confirm_signup( 648 State(state): State<AppState>, 649 Json(input): Json<ConfirmSignupInput>, 650) -> Response { 651 info!("confirm_signup called for DID: {}", input.did); 652 let row = match sqlx::query!( 653 r#"SELECT 654 u.id, u.did, u.handle, u.email, 655 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 656 u.discord_id, u.telegram_username, u.signal_number, 657 k.key_bytes, k.encryption_version 658 FROM users u 659 JOIN user_keys k ON u.id = k.user_id 660 WHERE u.did = $1"#, 661 input.did 662 ) 663 .fetch_optional(&state.db) 664 .await 665 { 666 Ok(Some(row)) => row, 667 Ok(None) => { 668 warn!("User not found for confirm_signup: {}", input.did); 669 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 670 .into_response(); 671 } 672 Err(e) => { 673 error!("Database error in confirm_signup: {:?}", e); 674 return ApiError::InternalError.into_response(); 675 } 676 }; 677 678 let (channel_str, identifier) = match row.channel { 679 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 680 crate::comms::CommsChannel::Discord => { 681 ("discord", row.discord_id.clone().unwrap_or_default()) 682 } 683 crate::comms::CommsChannel::Telegram => ( 684 "telegram", 685 row.telegram_username.clone().unwrap_or_default(), 686 ), 687 crate::comms::CommsChannel::Signal => { 688 ("signal", row.signal_number.clone().unwrap_or_default()) 689 } 690 }; 691 692 let normalized_token = 693 crate::auth::verification_token::normalize_token_input(&input.verification_code); 694 match crate::auth::verification_token::verify_signup_token( 695 &normalized_token, 696 channel_str, 697 &identifier, 698 ) { 699 Ok(token_data) => { 700 if token_data.did != input.did { 701 warn!( 702 "Token DID mismatch for confirm_signup: expected {}, got {}", 703 input.did, token_data.did 704 ); 705 return ApiError::InvalidRequest("Invalid verification code".into()) 706 .into_response(); 707 } 708 } 709 Err(crate::auth::verification_token::VerifyError::Expired) => { 710 warn!("Verification code expired for user: {}", input.did); 711 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 712 .into_response(); 713 } 714 Err(e) => { 715 warn!("Invalid verification code for user {}: {:?}", input.did, e); 716 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 717 } 718 } 719 720 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 721 Ok(k) => k, 722 Err(e) => { 723 error!("Failed to decrypt user key: {:?}", e); 724 return ApiError::InternalError.into_response(); 725 } 726 }; 727 let verified_column = match row.channel { 728 crate::comms::CommsChannel::Email => "email_verified", 729 crate::comms::CommsChannel::Discord => "discord_verified", 730 crate::comms::CommsChannel::Telegram => "telegram_verified", 731 crate::comms::CommsChannel::Signal => "signal_verified", 732 }; 733 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 734 if let Err(e) = sqlx::query(&update_query) 735 .bind(&input.did) 736 .execute(&state.db) 737 .await 738 { 739 error!("Failed to update verification status: {:?}", e); 740 return ApiError::InternalError.into_response(); 741 } 742 743 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 744 Ok(m) => m, 745 Err(e) => { 746 error!("Failed to create access token: {:?}", e); 747 return ApiError::InternalError.into_response(); 748 } 749 }; 750 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 751 Ok(m) => m, 752 Err(e) => { 753 error!("Failed to create refresh token: {:?}", e); 754 return ApiError::InternalError.into_response(); 755 } 756 }; 757 let no_scope: Option<String> = None; 758 if let Err(e) = sqlx::query!( 759 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 760 row.did, 761 access_meta.jti, 762 refresh_meta.jti, 763 access_meta.expires_at, 764 refresh_meta.expires_at, 765 false, 766 false, 767 no_scope 768 ) 769 .execute(&state.db) 770 .await 771 { 772 error!("Failed to insert session: {:?}", e); 773 return ApiError::InternalError.into_response(); 774 } 775 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 776 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 777 warn!("Failed to enqueue welcome notification: {:?}", e); 778 } 779 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 780 let preferred_channel = match row.channel { 781 crate::comms::CommsChannel::Email => "email", 782 crate::comms::CommsChannel::Discord => "discord", 783 crate::comms::CommsChannel::Telegram => "telegram", 784 crate::comms::CommsChannel::Signal => "signal", 785 }; 786 Json(ConfirmSignupOutput { 787 access_jwt: access_meta.token, 788 refresh_jwt: refresh_meta.token, 789 handle: row.handle, 790 did: row.did, 791 email: row.email, 792 email_verified, 793 preferred_channel: preferred_channel.to_string(), 794 preferred_channel_verified: true, 795 }) 796 .into_response() 797} 798 799#[derive(Deserialize)] 800#[serde(rename_all = "camelCase")] 801pub struct ResendVerificationInput { 802 pub did: String, 803} 804 805pub async fn resend_verification( 806 State(state): State<AppState>, 807 Json(input): Json<ResendVerificationInput>, 808) -> Response { 809 info!("resend_verification called for DID: {}", input.did); 810 let row = match sqlx::query!( 811 r#"SELECT 812 id, handle, email, 813 preferred_comms_channel as "channel: crate::comms::CommsChannel", 814 discord_id, telegram_username, signal_number, 815 email_verified, discord_verified, telegram_verified, signal_verified 816 FROM users 817 WHERE did = $1"#, 818 input.did 819 ) 820 .fetch_optional(&state.db) 821 .await 822 { 823 Ok(Some(row)) => row, 824 Ok(None) => { 825 return ApiError::InvalidRequest("User not found".into()).into_response(); 826 } 827 Err(e) => { 828 error!("Database error in resend_verification: {:?}", e); 829 return ApiError::InternalError.into_response(); 830 } 831 }; 832 let is_verified = 833 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 834 if is_verified { 835 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 836 } 837 838 let (channel_str, recipient) = match row.channel { 839 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 840 crate::comms::CommsChannel::Discord => { 841 ("discord", row.discord_id.clone().unwrap_or_default()) 842 } 843 crate::comms::CommsChannel::Telegram => ( 844 "telegram", 845 row.telegram_username.clone().unwrap_or_default(), 846 ), 847 crate::comms::CommsChannel::Signal => { 848 ("signal", row.signal_number.clone().unwrap_or_default()) 849 } 850 }; 851 852 let verification_token = 853 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 854 let formatted_token = 855 crate::auth::verification_token::format_token_for_display(&verification_token); 856 857 if let Err(e) = crate::comms::enqueue_signup_verification( 858 &state.db, 859 row.id, 860 channel_str, 861 &recipient, 862 &formatted_token, 863 None, 864 ) 865 .await 866 { 867 warn!("Failed to enqueue verification notification: {:?}", e); 868 } 869 Json(json!({"success": true})).into_response() 870} 871 872#[derive(Serialize)] 873#[serde(rename_all = "camelCase")] 874pub struct SessionInfo { 875 pub id: String, 876 pub session_type: String, 877 pub client_name: Option<String>, 878 pub created_at: String, 879 pub expires_at: String, 880 pub is_current: bool, 881} 882 883#[derive(Serialize)] 884#[serde(rename_all = "camelCase")] 885pub struct ListSessionsOutput { 886 pub sessions: Vec<SessionInfo>, 887} 888 889pub async fn list_sessions( 890 State(state): State<AppState>, 891 headers: HeaderMap, 892 auth: BearerAuth, 893) -> Response { 894 let current_jti = headers 895 .get("authorization") 896 .and_then(|v| v.to_str().ok()) 897 .and_then(|v| v.strip_prefix("Bearer ")) 898 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 899 900 let mut sessions: Vec<SessionInfo> = Vec::new(); 901 902 let jwt_result = sqlx::query_as::< 903 _, 904 ( 905 i32, 906 String, 907 chrono::DateTime<chrono::Utc>, 908 chrono::DateTime<chrono::Utc>, 909 ), 910 >( 911 r#" 912 SELECT id, access_jti, created_at, refresh_expires_at 913 FROM session_tokens 914 WHERE did = $1 AND refresh_expires_at > NOW() 915 ORDER BY created_at DESC 916 "#, 917 ) 918 .bind(&auth.0.did) 919 .fetch_all(&state.db) 920 .await; 921 922 match jwt_result { 923 Ok(rows) => { 924 for (id, access_jti, created_at, expires_at) in rows { 925 sessions.push(SessionInfo { 926 id: format!("jwt:{}", id), 927 session_type: "legacy".to_string(), 928 client_name: None, 929 created_at: created_at.to_rfc3339(), 930 expires_at: expires_at.to_rfc3339(), 931 is_current: current_jti.as_ref() == Some(&access_jti), 932 }); 933 } 934 } 935 Err(e) => { 936 error!("DB error fetching JWT sessions: {:?}", e); 937 return ( 938 StatusCode::INTERNAL_SERVER_ERROR, 939 Json(json!({"error": "InternalError"})), 940 ) 941 .into_response(); 942 } 943 } 944 945 let oauth_result = sqlx::query_as::< 946 _, 947 ( 948 i32, 949 String, 950 chrono::DateTime<chrono::Utc>, 951 chrono::DateTime<chrono::Utc>, 952 String, 953 ), 954 >( 955 r#" 956 SELECT id, token_id, created_at, expires_at, client_id 957 FROM oauth_token 958 WHERE did = $1 AND expires_at > NOW() 959 ORDER BY created_at DESC 960 "#, 961 ) 962 .bind(&auth.0.did) 963 .fetch_all(&state.db) 964 .await; 965 966 match oauth_result { 967 Ok(rows) => { 968 for (id, token_id, created_at, expires_at, client_id) in rows { 969 let client_name = extract_client_name(&client_id); 970 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 971 sessions.push(SessionInfo { 972 id: format!("oauth:{}", id), 973 session_type: "oauth".to_string(), 974 client_name: Some(client_name), 975 created_at: created_at.to_rfc3339(), 976 expires_at: expires_at.to_rfc3339(), 977 is_current: is_current_oauth, 978 }); 979 } 980 } 981 Err(e) => { 982 error!("DB error fetching OAuth sessions: {:?}", e); 983 return ( 984 StatusCode::INTERNAL_SERVER_ERROR, 985 Json(json!({"error": "InternalError"})), 986 ) 987 .into_response(); 988 } 989 } 990 991 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 992 993 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 994} 995 996fn extract_client_name(client_id: &str) -> String { 997 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 998 "Localhost App".to_string() 999 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 1000 parsed.host_str().unwrap_or("Unknown App").to_string() 1001 } else { 1002 client_id.to_string() 1003 } 1004} 1005 1006#[derive(Deserialize)] 1007#[serde(rename_all = "camelCase")] 1008pub struct RevokeSessionInput { 1009 pub session_id: String, 1010} 1011 1012pub async fn revoke_session( 1013 State(state): State<AppState>, 1014 auth: BearerAuth, 1015 Json(input): Json<RevokeSessionInput>, 1016) -> Response { 1017 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 1018 let session_id: i32 = match jwt_id.parse() { 1019 Ok(id) => id, 1020 Err(_) => { 1021 return ( 1022 StatusCode::BAD_REQUEST, 1023 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1024 ) 1025 .into_response(); 1026 } 1027 }; 1028 let session = sqlx::query_as::<_, (String,)>( 1029 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 1030 ) 1031 .bind(session_id) 1032 .bind(&auth.0.did) 1033 .fetch_optional(&state.db) 1034 .await; 1035 let access_jti = match session { 1036 Ok(Some((jti,))) => jti, 1037 Ok(None) => { 1038 return ( 1039 StatusCode::NOT_FOUND, 1040 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1041 ) 1042 .into_response(); 1043 } 1044 Err(e) => { 1045 error!("DB error in revoke_session: {:?}", e); 1046 return ( 1047 StatusCode::INTERNAL_SERVER_ERROR, 1048 Json(json!({"error": "InternalError"})), 1049 ) 1050 .into_response(); 1051 } 1052 }; 1053 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 1054 .bind(session_id) 1055 .execute(&state.db) 1056 .await 1057 { 1058 error!("DB error deleting session: {:?}", e); 1059 return ( 1060 StatusCode::INTERNAL_SERVER_ERROR, 1061 Json(json!({"error": "InternalError"})), 1062 ) 1063 .into_response(); 1064 } 1065 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 1066 if let Err(e) = state.cache.delete(&cache_key).await { 1067 warn!("Failed to invalidate session cache: {:?}", e); 1068 } 1069 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 1070 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 1071 let session_id: i32 = match oauth_id.parse() { 1072 Ok(id) => id, 1073 Err(_) => { 1074 return ( 1075 StatusCode::BAD_REQUEST, 1076 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1077 ) 1078 .into_response(); 1079 } 1080 }; 1081 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1082 .bind(session_id) 1083 .bind(&auth.0.did) 1084 .execute(&state.db) 1085 .await; 1086 match result { 1087 Ok(r) if r.rows_affected() == 0 => { 1088 return ( 1089 StatusCode::NOT_FOUND, 1090 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1091 ) 1092 .into_response(); 1093 } 1094 Err(e) => { 1095 error!("DB error deleting OAuth session: {:?}", e); 1096 return ( 1097 StatusCode::INTERNAL_SERVER_ERROR, 1098 Json(json!({"error": "InternalError"})), 1099 ) 1100 .into_response(); 1101 } 1102 _ => {} 1103 } 1104 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1105 } else { 1106 return ( 1107 StatusCode::BAD_REQUEST, 1108 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1109 ) 1110 .into_response(); 1111 } 1112 (StatusCode::OK, Json(json!({}))).into_response() 1113} 1114 1115pub async fn revoke_all_sessions( 1116 State(state): State<AppState>, 1117 headers: HeaderMap, 1118 auth: BearerAuth, 1119) -> Response { 1120 let current_jti = headers 1121 .get("authorization") 1122 .and_then(|v| v.to_str().ok()) 1123 .and_then(|v| v.strip_prefix("Bearer ")) 1124 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1125 1126 if let Some(ref jti) = current_jti { 1127 if auth.0.is_oauth { 1128 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1129 .bind(&auth.0.did) 1130 .execute(&state.db) 1131 .await 1132 { 1133 error!("DB error revoking JWT sessions: {:?}", e); 1134 return ( 1135 StatusCode::INTERNAL_SERVER_ERROR, 1136 Json(json!({"error": "InternalError"})), 1137 ) 1138 .into_response(); 1139 } 1140 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1141 .bind(&auth.0.did) 1142 .bind(jti) 1143 .execute(&state.db) 1144 .await 1145 { 1146 error!("DB error revoking OAuth sessions: {:?}", e); 1147 return ( 1148 StatusCode::INTERNAL_SERVER_ERROR, 1149 Json(json!({"error": "InternalError"})), 1150 ) 1151 .into_response(); 1152 } 1153 } else { 1154 if let Err(e) = 1155 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1156 .bind(&auth.0.did) 1157 .bind(jti) 1158 .execute(&state.db) 1159 .await 1160 { 1161 error!("DB error revoking JWT sessions: {:?}", e); 1162 return ( 1163 StatusCode::INTERNAL_SERVER_ERROR, 1164 Json(json!({"error": "InternalError"})), 1165 ) 1166 .into_response(); 1167 } 1168 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1169 .bind(&auth.0.did) 1170 .execute(&state.db) 1171 .await 1172 { 1173 error!("DB error revoking OAuth sessions: {:?}", e); 1174 return ( 1175 StatusCode::INTERNAL_SERVER_ERROR, 1176 Json(json!({"error": "InternalError"})), 1177 ) 1178 .into_response(); 1179 } 1180 } 1181 } else { 1182 return ( 1183 StatusCode::BAD_REQUEST, 1184 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1185 ) 1186 .into_response(); 1187 } 1188 1189 info!(did = %auth.0.did, "All other sessions revoked"); 1190 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1191} 1192 1193#[derive(Serialize)] 1194#[serde(rename_all = "camelCase")] 1195pub struct LegacyLoginPreferenceOutput { 1196 pub allow_legacy_login: bool, 1197 pub has_mfa: bool, 1198} 1199 1200pub async fn get_legacy_login_preference( 1201 State(state): State<AppState>, 1202 auth: BearerAuth, 1203) -> Response { 1204 let result = sqlx::query!( 1205 r#"SELECT 1206 u.allow_legacy_login, 1207 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1208 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1209 FROM users u WHERE u.did = $1"#, 1210 auth.0.did 1211 ) 1212 .fetch_optional(&state.db) 1213 .await; 1214 1215 match result { 1216 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1217 allow_legacy_login: row.allow_legacy_login, 1218 has_mfa: row.has_mfa, 1219 }) 1220 .into_response(), 1221 Ok(None) => ( 1222 StatusCode::NOT_FOUND, 1223 Json(json!({"error": "AccountNotFound"})), 1224 ) 1225 .into_response(), 1226 Err(e) => { 1227 error!("DB error: {:?}", e); 1228 ( 1229 StatusCode::INTERNAL_SERVER_ERROR, 1230 Json(json!({"error": "InternalError"})), 1231 ) 1232 .into_response() 1233 } 1234 } 1235} 1236 1237#[derive(Deserialize)] 1238#[serde(rename_all = "camelCase")] 1239pub struct UpdateLegacyLoginInput { 1240 pub allow_legacy_login: bool, 1241} 1242 1243pub async fn update_legacy_login_preference( 1244 State(state): State<AppState>, 1245 auth: BearerAuth, 1246 Json(input): Json<UpdateLegacyLoginInput>, 1247) -> Response { 1248 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1249 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1250 .await; 1251 } 1252 1253 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1254 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1255 } 1256 1257 let result = sqlx::query!( 1258 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1259 input.allow_legacy_login, 1260 auth.0.did 1261 ) 1262 .fetch_optional(&state.db) 1263 .await; 1264 1265 match result { 1266 Ok(Some(_)) => { 1267 info!( 1268 did = %auth.0.did, 1269 allow_legacy_login = input.allow_legacy_login, 1270 "Legacy login preference updated" 1271 ); 1272 Json(json!({ 1273 "allowLegacyLogin": input.allow_legacy_login 1274 })) 1275 .into_response() 1276 } 1277 Ok(None) => ( 1278 StatusCode::NOT_FOUND, 1279 Json(json!({"error": "AccountNotFound"})), 1280 ) 1281 .into_response(), 1282 Err(e) => { 1283 error!("DB error: {:?}", e); 1284 ( 1285 StatusCode::INTERNAL_SERVER_ERROR, 1286 Json(json!({"error": "InternalError"})), 1287 ) 1288 .into_response() 1289 } 1290 } 1291} 1292 1293use crate::comms::locale::VALID_LOCALES; 1294 1295#[derive(Deserialize)] 1296#[serde(rename_all = "camelCase")] 1297pub struct UpdateLocaleInput { 1298 pub preferred_locale: String, 1299} 1300 1301pub async fn update_locale( 1302 State(state): State<AppState>, 1303 auth: BearerAuth, 1304 Json(input): Json<UpdateLocaleInput>, 1305) -> Response { 1306 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1307 return ( 1308 StatusCode::BAD_REQUEST, 1309 Json(json!({ 1310 "error": "InvalidRequest", 1311 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1312 })), 1313 ) 1314 .into_response(); 1315 } 1316 1317 let result = sqlx::query!( 1318 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1319 input.preferred_locale, 1320 auth.0.did 1321 ) 1322 .fetch_optional(&state.db) 1323 .await; 1324 1325 match result { 1326 Ok(Some(_)) => { 1327 info!( 1328 did = %auth.0.did, 1329 locale = %input.preferred_locale, 1330 "User locale preference updated" 1331 ); 1332 Json(json!({ 1333 "preferredLocale": input.preferred_locale 1334 })) 1335 .into_response() 1336 } 1337 Ok(None) => ( 1338 StatusCode::NOT_FOUND, 1339 Json(json!({"error": "AccountNotFound"})), 1340 ) 1341 .into_response(), 1342 Err(e) => { 1343 error!("DB error updating locale: {:?}", e); 1344 ( 1345 StatusCode::INTERNAL_SERVER_ERROR, 1346 Json(json!({"error": "InternalError"})), 1347 ) 1348 .into_response() 1349 } 1350 } 1351}