this repo has no description
1use crate::api::error::ApiError; 2use crate::api::{EmptyResponse, SuccessResponse}; 3use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 4use crate::state::{AppState, RateLimitKind}; 5use crate::types::{AccountState, Did, Handle, PlainPassword}; 6use axum::{ 7 Json, 8 extract::State, 9 http::{HeaderMap, StatusCode}, 10 response::{IntoResponse, Response}, 11}; 12use bcrypt::verify; 13use serde::{Deserialize, Serialize}; 14use serde_json::json; 15use tracing::{error, info, warn}; 16 17fn extract_client_ip(headers: &HeaderMap) -> String { 18 if let Some(forwarded) = headers.get("x-forwarded-for") 19 && let Ok(value) = forwarded.to_str() 20 && let Some(first_ip) = value.split(',').next() 21 { 22 return first_ip.trim().to_string(); 23 } 24 if let Some(real_ip) = headers.get("x-real-ip") 25 && let Ok(value) = real_ip.to_str() 26 { 27 return value.trim().to_string(); 28 } 29 "unknown".to_string() 30} 31 32fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 33 let identifier = identifier.trim(); 34 if identifier.contains('@') || identifier.starts_with("did:") { 35 identifier.to_string() 36 } else if !identifier.contains('.') { 37 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 38 } else { 39 identifier.to_lowercase() 40 } 41} 42 43fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 44 stored_handle.to_string() 45} 46 47#[derive(Deserialize)] 48#[serde(rename_all = "camelCase")] 49pub struct CreateSessionInput { 50 pub identifier: String, 51 pub password: PlainPassword, 52 #[serde(default)] 53 pub allow_takendown: bool, 54} 55 56#[derive(Serialize)] 57#[serde(rename_all = "camelCase")] 58pub struct CreateSessionOutput { 59 pub access_jwt: String, 60 pub refresh_jwt: String, 61 pub handle: Handle, 62 pub did: Did, 63 #[serde(skip_serializing_if = "Option::is_none")] 64 pub did_doc: Option<serde_json::Value>, 65 #[serde(skip_serializing_if = "Option::is_none")] 66 pub email: Option<String>, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub email_confirmed: Option<bool>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub active: Option<bool>, 71 #[serde(skip_serializing_if = "Option::is_none")] 72 pub status: Option<String>, 73} 74 75pub async fn create_session( 76 State(state): State<AppState>, 77 headers: HeaderMap, 78 Json(input): Json<CreateSessionInput>, 79) -> Response { 80 info!( 81 "create_session called with identifier: {}", 82 input.identifier 83 ); 84 let client_ip = extract_client_ip(&headers); 85 if !state 86 .check_rate_limit(RateLimitKind::Login, &client_ip) 87 .await 88 { 89 warn!(ip = %client_ip, "Login rate limit exceeded"); 90 return ApiError::RateLimitExceeded(None).into_response(); 91 } 92 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 93 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 94 info!( 95 "Normalized identifier: {} -> {}", 96 input.identifier, normalized_identifier 97 ); 98 let row = match sqlx::query!( 99 r#"SELECT 100 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref, 101 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 102 u.allow_legacy_login, u.migrated_to_pds, 103 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 104 k.key_bytes, k.encryption_version, 105 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 106 FROM users u 107 JOIN user_keys k ON u.id = k.user_id 108 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 109 normalized_identifier 110 ) 111 .fetch_optional(&state.db) 112 .await 113 { 114 Ok(Some(row)) => row, 115 Ok(None) => { 116 let _ = verify( 117 &input.password, 118 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 119 ); 120 warn!("User not found for login attempt"); 121 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into())) 122 .into_response(); 123 } 124 Err(e) => { 125 error!("Database error fetching user: {:?}", e); 126 return ApiError::InternalError(None).into_response(); 127 } 128 }; 129 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 130 Ok(k) => k, 131 Err(e) => { 132 error!("Failed to decrypt user key: {:?}", e); 133 return ApiError::InternalError(None).into_response(); 134 } 135 }; 136 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row 137 .password_hash 138 .as_ref() 139 .map(|h| verify(&input.password, h).unwrap_or(false)) 140 .unwrap_or(false) 141 { 142 (true, None, None, None) 143 } else { 144 let app_passwords = sqlx::query!( 145 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 146 row.id 147 ) 148 .fetch_all(&state.db) 149 .await 150 .unwrap_or_default(); 151 let matched = app_passwords 152 .iter() 153 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 154 match matched { 155 Some(app) => ( 156 true, 157 Some(app.name.clone()), 158 app.scopes.clone(), 159 app.created_by_controller_did.clone(), 160 ), 161 None => (false, None, None, None), 162 } 163 }; 164 if !password_valid { 165 warn!("Password verification failed for login attempt"); 166 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into())) 167 .into_response(); 168 } 169 let account_state = AccountState::from_db_fields( 170 row.deactivated_at, 171 row.takedown_ref.clone(), 172 row.migrated_to_pds.clone(), 173 None, 174 ); 175 if account_state.is_takendown() && !input.allow_takendown { 176 warn!("Login attempt for takendown account: {}", row.did); 177 return ApiError::AccountTakedown.into_response(); 178 } 179 let is_verified = 180 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 181 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 182 .await 183 .unwrap_or(false); 184 if !is_verified && !is_delegated { 185 warn!("Login attempt for unverified account: {}", row.did); 186 return ( 187 StatusCode::FORBIDDEN, 188 Json(json!({ 189 "error": "AccountNotVerified", 190 "message": "Please verify your account before logging in", 191 "did": row.did 192 })), 193 ) 194 .into_response(); 195 } 196 let has_totp = row.totp_enabled.unwrap_or(false); 197 let is_legacy_login = has_totp; 198 if has_totp && !row.allow_legacy_login { 199 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 200 return ( 201 StatusCode::FORBIDDEN, 202 Json(json!({ 203 "error": "MfaRequired", 204 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 205 "did": row.did 206 })), 207 ) 208 .into_response(); 209 } 210 let access_meta = match crate::auth::create_access_token_with_delegation( 211 &row.did, 212 &key_bytes, 213 app_password_scopes.as_deref(), 214 app_password_controller.as_deref(), 215 ) { 216 Ok(m) => m, 217 Err(e) => { 218 error!("Failed to create access token: {:?}", e); 219 return ApiError::InternalError(None).into_response(); 220 } 221 }; 222 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 223 Ok(m) => m, 224 Err(e) => { 225 error!("Failed to create refresh token: {:?}", e); 226 return ApiError::InternalError(None).into_response(); 227 } 228 }; 229 let did_for_doc = row.did.clone(); 230 let did_resolver = state.did_resolver.clone(); 231 let (insert_result, did_doc) = tokio::join!( 232 sqlx::query!( 233 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", 234 row.did, 235 access_meta.jti, 236 refresh_meta.jti, 237 access_meta.expires_at, 238 refresh_meta.expires_at, 239 is_legacy_login, 240 false, 241 app_password_scopes, 242 app_password_controller, 243 app_password_name 244 ) 245 .execute(&state.db), 246 did_resolver.resolve_did_document(&did_for_doc) 247 ); 248 if let Err(e) = insert_result { 249 error!("Failed to insert session: {:?}", e); 250 return ApiError::InternalError(None).into_response(); 251 } 252 if is_legacy_login { 253 warn!( 254 did = %row.did, 255 ip = %client_ip, 256 "Legacy login on TOTP-enabled account - sending notification" 257 ); 258 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 259 if let Err(e) = crate::comms::queue_legacy_login_notification( 260 &state.db, 261 row.id, 262 &hostname, 263 &client_ip, 264 row.preferred_comms_channel, 265 ) 266 .await 267 { 268 error!("Failed to queue legacy login notification: {:?}", e); 269 } 270 } 271 let handle = full_handle(&row.handle, &pds_hostname); 272 let is_active = account_state.is_active(); 273 let status = account_state.status_for_session().map(String::from); 274 Json(CreateSessionOutput { 275 access_jwt: access_meta.token, 276 refresh_jwt: refresh_meta.token, 277 handle: handle.into(), 278 did: row.did.into(), 279 did_doc, 280 email: row.email, 281 email_confirmed: Some(row.email_verified), 282 active: Some(is_active), 283 status, 284 }) 285 .into_response() 286} 287 288pub async fn get_session( 289 State(state): State<AppState>, 290 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 291) -> Response { 292 let permissions = auth_user.permissions(); 293 let can_read_email = permissions.allows_email_read(); 294 295 let did_for_doc = auth_user.did.clone(); 296 let did_resolver = state.did_resolver.clone(); 297 let (db_result, did_doc) = tokio::join!( 298 sqlx::query!( 299 r#"SELECT 300 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale, 301 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 302 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at 303 FROM users WHERE did = $1"#, 304 &auth_user.did 305 ) 306 .fetch_optional(&state.db), 307 did_resolver.resolve_did_document(&did_for_doc) 308 ); 309 match db_result { 310 Ok(Some(row)) => { 311 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 312 crate::comms::CommsChannel::Email => ("email", row.email_verified), 313 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 314 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 315 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 316 }; 317 let pds_hostname = 318 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 319 let handle = full_handle(&row.handle, &pds_hostname); 320 let account_state = AccountState::from_db_fields( 321 row.deactivated_at, 322 row.takedown_ref.clone(), 323 row.migrated_to_pds.clone(), 324 row.migrated_at, 325 ); 326 let email_value = if can_read_email { 327 row.email.clone() 328 } else { 329 None 330 }; 331 let email_confirmed_value = can_read_email && row.email_verified; 332 let mut response = json!({ 333 "handle": handle, 334 "did": &auth_user.did, 335 "active": account_state.is_active(), 336 "preferredChannel": preferred_channel, 337 "preferredChannelVerified": preferred_channel_verified, 338 "preferredLocale": row.preferred_locale, 339 "isAdmin": row.is_admin 340 }); 341 if can_read_email { 342 response["email"] = json!(email_value); 343 response["emailConfirmed"] = json!(email_confirmed_value); 344 } 345 if let Some(status) = account_state.status_for_session() { 346 response["status"] = json!(status); 347 } 348 if let AccountState::Migrated { to_pds, at } = &account_state { 349 response["migratedToPds"] = json!(to_pds); 350 response["migratedAt"] = json!(at); 351 } 352 if let Some(doc) = did_doc { 353 response["didDoc"] = doc; 354 } 355 Json(response).into_response() 356 } 357 Ok(None) => ApiError::AuthenticationFailed(None).into_response(), 358 Err(e) => { 359 error!("Database error in get_session: {:?}", e); 360 ApiError::InternalError(None).into_response() 361 } 362 } 363} 364 365pub async fn delete_session( 366 State(state): State<AppState>, 367 headers: axum::http::HeaderMap, 368) -> Response { 369 let token = match crate::auth::extract_bearer_token_from_header( 370 headers.get("Authorization").and_then(|h| h.to_str().ok()), 371 ) { 372 Some(t) => t, 373 None => return ApiError::AuthenticationRequired.into_response(), 374 }; 375 let jti = match crate::auth::get_jti_from_token(&token) { 376 Ok(jti) => jti, 377 Err(_) => return ApiError::AuthenticationFailed(None).into_response(), 378 }; 379 let did = crate::auth::get_did_from_token(&token).ok(); 380 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 381 .execute(&state.db) 382 .await 383 { 384 Ok(res) if res.rows_affected() > 0 => { 385 if let Some(did) = did { 386 let session_cache_key = format!("auth:session:{}:{}", did, jti); 387 let _ = state.cache.delete(&session_cache_key).await; 388 } 389 EmptyResponse::ok().into_response() 390 } 391 Ok(_) => ApiError::AuthenticationFailed(None).into_response(), 392 Err(e) => { 393 error!("Database error in delete_session: {:?}", e); 394 ApiError::AuthenticationFailed(None).into_response() 395 } 396 } 397} 398 399pub async fn refresh_session( 400 State(state): State<AppState>, 401 headers: axum::http::HeaderMap, 402) -> Response { 403 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 404 if !state 405 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 406 .await 407 { 408 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 409 return ApiError::RateLimitExceeded(None).into_response(); 410 } 411 let refresh_token = match crate::auth::extract_bearer_token_from_header( 412 headers.get("Authorization").and_then(|h| h.to_str().ok()), 413 ) { 414 Some(t) => t, 415 None => return ApiError::AuthenticationRequired.into_response(), 416 }; 417 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 418 Ok(jti) => jti, 419 Err(_) => { 420 return ApiError::AuthenticationFailed(Some("Invalid token format".into())) 421 .into_response(); 422 } 423 }; 424 let mut tx = match state.db.begin().await { 425 Ok(tx) => tx, 426 Err(e) => { 427 error!("Failed to begin transaction: {:?}", e); 428 return ApiError::InternalError(None).into_response(); 429 } 430 }; 431 if let Ok(Some(session_id)) = sqlx::query_scalar!( 432 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 433 refresh_jti 434 ) 435 .fetch_optional(&mut *tx) 436 .await 437 { 438 warn!( 439 "Refresh token reuse detected! Revoking token family for session_id: {}", 440 session_id 441 ); 442 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 443 .execute(&mut *tx) 444 .await; 445 let _ = tx.commit().await; 446 return ApiError::AuthenticationFailed(Some( 447 "Refresh token has been revoked due to suspected compromise".into(), 448 )) 449 .into_response(); 450 } 451 let session_row = match sqlx::query!( 452 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 453 FROM session_tokens st 454 JOIN users u ON st.did = u.did 455 JOIN user_keys k ON u.id = k.user_id 456 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 457 FOR UPDATE OF st"#, 458 refresh_jti 459 ) 460 .fetch_optional(&mut *tx) 461 .await 462 { 463 Ok(Some(row)) => row, 464 Ok(None) => { 465 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into())) 466 .into_response(); 467 } 468 Err(e) => { 469 error!("Database error fetching session: {:?}", e); 470 return ApiError::InternalError(None).into_response(); 471 } 472 }; 473 let key_bytes = 474 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 475 Ok(k) => k, 476 Err(e) => { 477 error!("Failed to decrypt user key: {:?}", e); 478 return ApiError::InternalError(None).into_response(); 479 } 480 }; 481 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 482 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into())) 483 .into_response(); 484 } 485 let new_access_meta = match crate::auth::create_access_token_with_delegation( 486 &session_row.did, 487 &key_bytes, 488 session_row.scope.as_deref(), 489 session_row.controller_did.as_deref(), 490 ) { 491 Ok(m) => m, 492 Err(e) => { 493 error!("Failed to create access token: {:?}", e); 494 return ApiError::InternalError(None).into_response(); 495 } 496 }; 497 let new_refresh_meta = 498 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 499 Ok(m) => m, 500 Err(e) => { 501 error!("Failed to create refresh token: {:?}", e); 502 return ApiError::InternalError(None).into_response(); 503 } 504 }; 505 match sqlx::query!( 506 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 507 refresh_jti, 508 session_row.id 509 ) 510 .execute(&mut *tx) 511 .await 512 { 513 Ok(result) if result.rows_affected() == 0 => { 514 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 515 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 516 .execute(&mut *tx) 517 .await; 518 let _ = tx.commit().await; 519 return ApiError::AuthenticationFailed(Some("Refresh token has been revoked due to suspected compromise".into())).into_response(); 520 } 521 Err(e) => { 522 error!("Failed to record used refresh token: {:?}", e); 523 return ApiError::InternalError(None).into_response(); 524 } 525 Ok(_) => {} 526 } 527 if let Err(e) = sqlx::query!( 528 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 529 new_access_meta.jti, 530 new_refresh_meta.jti, 531 new_access_meta.expires_at, 532 new_refresh_meta.expires_at, 533 session_row.id 534 ) 535 .execute(&mut *tx) 536 .await 537 { 538 error!("Database error updating session: {:?}", e); 539 return ApiError::InternalError(None).into_response(); 540 } 541 if let Err(e) = tx.commit().await { 542 error!("Failed to commit transaction: {:?}", e); 543 return ApiError::InternalError(None).into_response(); 544 } 545 let did_for_doc = session_row.did.clone(); 546 let did_resolver = state.did_resolver.clone(); 547 let (db_result, did_doc) = tokio::join!( 548 sqlx::query!( 549 r#"SELECT 550 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref, 551 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 552 discord_verified, telegram_verified, signal_verified 553 FROM users WHERE did = $1"#, 554 session_row.did 555 ) 556 .fetch_optional(&state.db), 557 did_resolver.resolve_did_document(&did_for_doc) 558 ); 559 match db_result { 560 Ok(Some(u)) => { 561 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 562 crate::comms::CommsChannel::Email => ("email", u.email_verified), 563 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 564 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 565 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 566 }; 567 let pds_hostname = 568 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 569 let handle = full_handle(&u.handle, &pds_hostname); 570 let account_state = 571 AccountState::from_db_fields(u.deactivated_at, u.takedown_ref.clone(), None, None); 572 let mut response = json!({ 573 "accessJwt": new_access_meta.token, 574 "refreshJwt": new_refresh_meta.token, 575 "handle": handle, 576 "did": session_row.did, 577 "email": u.email, 578 "emailConfirmed": u.email_verified, 579 "preferredChannel": preferred_channel, 580 "preferredChannelVerified": preferred_channel_verified, 581 "preferredLocale": u.preferred_locale, 582 "isAdmin": u.is_admin, 583 "active": account_state.is_active() 584 }); 585 if let Some(doc) = did_doc { 586 response["didDoc"] = doc; 587 } 588 if let Some(status) = account_state.status_for_session() { 589 response["status"] = json!(status); 590 } 591 Json(response).into_response() 592 } 593 Ok(None) => { 594 error!("User not found for existing session: {}", session_row.did); 595 ApiError::InternalError(None).into_response() 596 } 597 Err(e) => { 598 error!("Database error fetching user: {:?}", e); 599 ApiError::InternalError(None).into_response() 600 } 601 } 602} 603 604#[derive(Deserialize)] 605#[serde(rename_all = "camelCase")] 606pub struct ConfirmSignupInput { 607 pub did: Did, 608 pub verification_code: String, 609} 610 611#[derive(Serialize)] 612#[serde(rename_all = "camelCase")] 613pub struct ConfirmSignupOutput { 614 pub access_jwt: String, 615 pub refresh_jwt: String, 616 pub handle: Handle, 617 pub did: Did, 618 pub email: Option<String>, 619 pub email_verified: bool, 620 pub preferred_channel: String, 621 pub preferred_channel_verified: bool, 622} 623 624pub async fn confirm_signup( 625 State(state): State<AppState>, 626 Json(input): Json<ConfirmSignupInput>, 627) -> Response { 628 info!("confirm_signup called for DID: {}", input.did); 629 let row = match sqlx::query!( 630 r#"SELECT 631 u.id, u.did, u.handle, u.email, 632 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 633 u.discord_id, u.telegram_username, u.signal_number, 634 k.key_bytes, k.encryption_version 635 FROM users u 636 JOIN user_keys k ON u.id = k.user_id 637 WHERE u.did = $1"#, 638 input.did.as_str() 639 ) 640 .fetch_optional(&state.db) 641 .await 642 { 643 Ok(Some(row)) => row, 644 Ok(None) => { 645 warn!("User not found for confirm_signup: {}", input.did); 646 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 647 .into_response(); 648 } 649 Err(e) => { 650 error!("Database error in confirm_signup: {:?}", e); 651 return ApiError::InternalError(None).into_response(); 652 } 653 }; 654 655 let (channel_str, identifier) = match row.channel { 656 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 657 crate::comms::CommsChannel::Discord => { 658 ("discord", row.discord_id.clone().unwrap_or_default()) 659 } 660 crate::comms::CommsChannel::Telegram => ( 661 "telegram", 662 row.telegram_username.clone().unwrap_or_default(), 663 ), 664 crate::comms::CommsChannel::Signal => { 665 ("signal", row.signal_number.clone().unwrap_or_default()) 666 } 667 }; 668 669 let normalized_token = 670 crate::auth::verification_token::normalize_token_input(&input.verification_code); 671 match crate::auth::verification_token::verify_signup_token( 672 &normalized_token, 673 channel_str, 674 &identifier, 675 ) { 676 Ok(token_data) => { 677 if token_data.did != input.did.as_str() { 678 warn!( 679 "Token DID mismatch for confirm_signup: expected {}, got {}", 680 input.did, token_data.did 681 ); 682 return ApiError::InvalidRequest("Invalid verification code".into()) 683 .into_response(); 684 } 685 } 686 Err(crate::auth::verification_token::VerifyError::Expired) => { 687 warn!("Verification code expired for user: {}", input.did); 688 return ApiError::ExpiredToken(Some("Verification code has expired".into())) 689 .into_response(); 690 } 691 Err(e) => { 692 warn!("Invalid verification code for user {}: {:?}", input.did, e); 693 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 694 } 695 } 696 697 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 698 Ok(k) => k, 699 Err(e) => { 700 error!("Failed to decrypt user key: {:?}", e); 701 return ApiError::InternalError(None).into_response(); 702 } 703 }; 704 let verified_column = match row.channel { 705 crate::comms::CommsChannel::Email => "email_verified", 706 crate::comms::CommsChannel::Discord => "discord_verified", 707 crate::comms::CommsChannel::Telegram => "telegram_verified", 708 crate::comms::CommsChannel::Signal => "signal_verified", 709 }; 710 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 711 if let Err(e) = sqlx::query(&update_query) 712 .bind(input.did.as_str()) 713 .execute(&state.db) 714 .await 715 { 716 error!("Failed to update verification status: {:?}", e); 717 return ApiError::InternalError(None).into_response(); 718 } 719 720 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 721 Ok(m) => m, 722 Err(e) => { 723 error!("Failed to create access token: {:?}", e); 724 return ApiError::InternalError(None).into_response(); 725 } 726 }; 727 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 728 Ok(m) => m, 729 Err(e) => { 730 error!("Failed to create refresh token: {:?}", e); 731 return ApiError::InternalError(None).into_response(); 732 } 733 }; 734 let no_scope: Option<String> = None; 735 if let Err(e) = sqlx::query!( 736 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 737 row.did, 738 access_meta.jti, 739 refresh_meta.jti, 740 access_meta.expires_at, 741 refresh_meta.expires_at, 742 false, 743 false, 744 no_scope 745 ) 746 .execute(&state.db) 747 .await 748 { 749 error!("Failed to insert session: {:?}", e); 750 return ApiError::InternalError(None).into_response(); 751 } 752 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 753 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 754 warn!("Failed to enqueue welcome notification: {:?}", e); 755 } 756 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 757 let preferred_channel = match row.channel { 758 crate::comms::CommsChannel::Email => "email", 759 crate::comms::CommsChannel::Discord => "discord", 760 crate::comms::CommsChannel::Telegram => "telegram", 761 crate::comms::CommsChannel::Signal => "signal", 762 }; 763 Json(ConfirmSignupOutput { 764 access_jwt: access_meta.token, 765 refresh_jwt: refresh_meta.token, 766 handle: row.handle.into(), 767 did: row.did.into(), 768 email: row.email, 769 email_verified, 770 preferred_channel: preferred_channel.to_string(), 771 preferred_channel_verified: true, 772 }) 773 .into_response() 774} 775 776#[derive(Deserialize)] 777#[serde(rename_all = "camelCase")] 778pub struct ResendVerificationInput { 779 pub did: Did, 780} 781 782pub async fn resend_verification( 783 State(state): State<AppState>, 784 Json(input): Json<ResendVerificationInput>, 785) -> Response { 786 info!("resend_verification called for DID: {}", input.did); 787 let row = match sqlx::query!( 788 r#"SELECT 789 id, handle, email, 790 preferred_comms_channel as "channel: crate::comms::CommsChannel", 791 discord_id, telegram_username, signal_number, 792 email_verified, discord_verified, telegram_verified, signal_verified 793 FROM users 794 WHERE did = $1"#, 795 input.did.as_str() 796 ) 797 .fetch_optional(&state.db) 798 .await 799 { 800 Ok(Some(row)) => row, 801 Ok(None) => { 802 return ApiError::InvalidRequest("User not found".into()).into_response(); 803 } 804 Err(e) => { 805 error!("Database error in resend_verification: {:?}", e); 806 return ApiError::InternalError(None).into_response(); 807 } 808 }; 809 let is_verified = 810 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 811 if is_verified { 812 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 813 } 814 815 let (channel_str, recipient) = match row.channel { 816 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 817 crate::comms::CommsChannel::Discord => { 818 ("discord", row.discord_id.clone().unwrap_or_default()) 819 } 820 crate::comms::CommsChannel::Telegram => ( 821 "telegram", 822 row.telegram_username.clone().unwrap_or_default(), 823 ), 824 crate::comms::CommsChannel::Signal => { 825 ("signal", row.signal_number.clone().unwrap_or_default()) 826 } 827 }; 828 829 let verification_token = 830 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 831 let formatted_token = 832 crate::auth::verification_token::format_token_for_display(&verification_token); 833 834 if let Err(e) = crate::comms::enqueue_signup_verification( 835 &state.db, 836 row.id, 837 channel_str, 838 &recipient, 839 &formatted_token, 840 None, 841 ) 842 .await 843 { 844 warn!("Failed to enqueue verification notification: {:?}", e); 845 } 846 SuccessResponse::ok().into_response() 847} 848 849#[derive(Serialize)] 850#[serde(rename_all = "camelCase")] 851pub struct SessionInfo { 852 pub id: String, 853 pub session_type: String, 854 pub client_name: Option<String>, 855 pub created_at: String, 856 pub expires_at: String, 857 pub is_current: bool, 858} 859 860#[derive(Serialize)] 861#[serde(rename_all = "camelCase")] 862pub struct ListSessionsOutput { 863 pub sessions: Vec<SessionInfo>, 864} 865 866pub async fn list_sessions( 867 State(state): State<AppState>, 868 headers: HeaderMap, 869 auth: BearerAuth, 870) -> Response { 871 let current_jti = headers 872 .get("authorization") 873 .and_then(|v| v.to_str().ok()) 874 .and_then(|v| v.strip_prefix("Bearer ")) 875 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 876 877 let mut sessions: Vec<SessionInfo> = Vec::new(); 878 879 let jwt_result = sqlx::query_as::< 880 _, 881 ( 882 i32, 883 String, 884 chrono::DateTime<chrono::Utc>, 885 chrono::DateTime<chrono::Utc>, 886 ), 887 >( 888 r#" 889 SELECT id, access_jti, created_at, refresh_expires_at 890 FROM session_tokens 891 WHERE did = $1 AND refresh_expires_at > NOW() 892 ORDER BY created_at DESC 893 "#, 894 ) 895 .bind(&auth.0.did) 896 .fetch_all(&state.db) 897 .await; 898 899 match jwt_result { 900 Ok(rows) => { 901 for (id, access_jti, created_at, expires_at) in rows { 902 sessions.push(SessionInfo { 903 id: format!("jwt:{}", id), 904 session_type: "legacy".to_string(), 905 client_name: None, 906 created_at: created_at.to_rfc3339(), 907 expires_at: expires_at.to_rfc3339(), 908 is_current: current_jti.as_ref() == Some(&access_jti), 909 }); 910 } 911 } 912 Err(e) => { 913 error!("DB error fetching JWT sessions: {:?}", e); 914 return ApiError::InternalError(None).into_response(); 915 } 916 } 917 918 let oauth_result = sqlx::query_as::< 919 _, 920 ( 921 i32, 922 String, 923 chrono::DateTime<chrono::Utc>, 924 chrono::DateTime<chrono::Utc>, 925 String, 926 ), 927 >( 928 r#" 929 SELECT id, token_id, created_at, expires_at, client_id 930 FROM oauth_token 931 WHERE did = $1 AND expires_at > NOW() 932 ORDER BY created_at DESC 933 "#, 934 ) 935 .bind(&auth.0.did) 936 .fetch_all(&state.db) 937 .await; 938 939 match oauth_result { 940 Ok(rows) => { 941 for (id, token_id, created_at, expires_at, client_id) in rows { 942 let client_name = extract_client_name(&client_id); 943 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 944 sessions.push(SessionInfo { 945 id: format!("oauth:{}", id), 946 session_type: "oauth".to_string(), 947 client_name: Some(client_name), 948 created_at: created_at.to_rfc3339(), 949 expires_at: expires_at.to_rfc3339(), 950 is_current: is_current_oauth, 951 }); 952 } 953 } 954 Err(e) => { 955 error!("DB error fetching OAuth sessions: {:?}", e); 956 return ApiError::InternalError(None).into_response(); 957 } 958 } 959 960 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 961 962 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 963} 964 965fn extract_client_name(client_id: &str) -> String { 966 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 967 "Localhost App".to_string() 968 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 969 parsed.host_str().unwrap_or("Unknown App").to_string() 970 } else { 971 client_id.to_string() 972 } 973} 974 975#[derive(Deserialize)] 976#[serde(rename_all = "camelCase")] 977pub struct RevokeSessionInput { 978 pub session_id: String, 979} 980 981pub async fn revoke_session( 982 State(state): State<AppState>, 983 auth: BearerAuth, 984 Json(input): Json<RevokeSessionInput>, 985) -> Response { 986 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 987 let Ok(session_id) = jwt_id.parse::<i32>() else { 988 return ApiError::InvalidRequest("Invalid session ID".into()).into_response(); 989 }; 990 let session = sqlx::query_as::<_, (String,)>( 991 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 992 ) 993 .bind(session_id) 994 .bind(&auth.0.did) 995 .fetch_optional(&state.db) 996 .await; 997 let access_jti = match session { 998 Ok(Some((jti,))) => jti, 999 Ok(None) => { 1000 return ApiError::SessionNotFound.into_response(); 1001 } 1002 Err(e) => { 1003 error!("DB error in revoke_session: {:?}", e); 1004 return ApiError::InternalError(None).into_response(); 1005 } 1006 }; 1007 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 1008 .bind(session_id) 1009 .execute(&state.db) 1010 .await 1011 { 1012 error!("DB error deleting session: {:?}", e); 1013 return ApiError::InternalError(None).into_response(); 1014 } 1015 let cache_key = format!("auth:session:{}:{}", &auth.0.did, access_jti); 1016 if let Err(e) = state.cache.delete(&cache_key).await { 1017 warn!("Failed to invalidate session cache: {:?}", e); 1018 } 1019 info!(did = %&auth.0.did, session_id = %session_id, "JWT session revoked"); 1020 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 1021 let Ok(session_id) = oauth_id.parse::<i32>() else { 1022 return ApiError::InvalidRequest("Invalid session ID".into()).into_response(); 1023 }; 1024 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1025 .bind(session_id) 1026 .bind(&auth.0.did) 1027 .execute(&state.db) 1028 .await; 1029 match result { 1030 Ok(r) if r.rows_affected() == 0 => { 1031 return ApiError::SessionNotFound.into_response(); 1032 } 1033 Err(e) => { 1034 error!("DB error deleting OAuth session: {:?}", e); 1035 return ApiError::InternalError(None).into_response(); 1036 } 1037 _ => {} 1038 } 1039 info!(did = %&auth.0.did, session_id = %session_id, "OAuth session revoked"); 1040 } else { 1041 return ApiError::InvalidRequest("Invalid session ID format".into()).into_response(); 1042 } 1043 EmptyResponse::ok().into_response() 1044} 1045 1046pub async fn revoke_all_sessions( 1047 State(state): State<AppState>, 1048 headers: HeaderMap, 1049 auth: BearerAuth, 1050) -> Response { 1051 let current_jti = headers 1052 .get("authorization") 1053 .and_then(|v| v.to_str().ok()) 1054 .and_then(|v| v.strip_prefix("Bearer ")) 1055 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1056 1057 let Some(ref jti) = current_jti else { 1058 return ApiError::InvalidToken(None).into_response(); 1059 }; 1060 1061 if auth.0.is_oauth { 1062 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1063 .bind(&auth.0.did) 1064 .execute(&state.db) 1065 .await 1066 { 1067 error!("DB error revoking JWT sessions: {:?}", e); 1068 return ApiError::InternalError(None).into_response(); 1069 } 1070 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1071 .bind(&auth.0.did) 1072 .bind(jti) 1073 .execute(&state.db) 1074 .await 1075 { 1076 error!("DB error revoking OAuth sessions: {:?}", e); 1077 return ApiError::InternalError(None).into_response(); 1078 } 1079 } else { 1080 if let Err(e) = 1081 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1082 .bind(&auth.0.did) 1083 .bind(jti) 1084 .execute(&state.db) 1085 .await 1086 { 1087 error!("DB error revoking JWT sessions: {:?}", e); 1088 return ApiError::InternalError(None).into_response(); 1089 } 1090 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1091 .bind(&auth.0.did) 1092 .execute(&state.db) 1093 .await 1094 { 1095 error!("DB error revoking OAuth sessions: {:?}", e); 1096 return ApiError::InternalError(None).into_response(); 1097 } 1098 } 1099 1100 info!(did = %&auth.0.did, "All other sessions revoked"); 1101 SuccessResponse::ok().into_response() 1102} 1103 1104#[derive(Serialize)] 1105#[serde(rename_all = "camelCase")] 1106pub struct LegacyLoginPreferenceOutput { 1107 pub allow_legacy_login: bool, 1108 pub has_mfa: bool, 1109} 1110 1111pub async fn get_legacy_login_preference( 1112 State(state): State<AppState>, 1113 auth: BearerAuth, 1114) -> Response { 1115 let result = sqlx::query!( 1116 r#"SELECT 1117 u.allow_legacy_login, 1118 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1119 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1120 FROM users u WHERE u.did = $1"#, 1121 &auth.0.did 1122 ) 1123 .fetch_optional(&state.db) 1124 .await; 1125 1126 match result { 1127 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1128 allow_legacy_login: row.allow_legacy_login, 1129 has_mfa: row.has_mfa, 1130 }) 1131 .into_response(), 1132 Ok(None) => ApiError::AccountNotFound.into_response(), 1133 Err(e) => { 1134 error!("DB error: {:?}", e); 1135 ApiError::InternalError(None).into_response() 1136 } 1137 } 1138} 1139 1140#[derive(Deserialize)] 1141#[serde(rename_all = "camelCase")] 1142pub struct UpdateLegacyLoginInput { 1143 pub allow_legacy_login: bool, 1144} 1145 1146pub async fn update_legacy_login_preference( 1147 State(state): State<AppState>, 1148 auth: BearerAuth, 1149 Json(input): Json<UpdateLegacyLoginInput>, 1150) -> Response { 1151 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1152 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1153 .await; 1154 } 1155 1156 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1157 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1158 } 1159 1160 let result = sqlx::query!( 1161 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1162 input.allow_legacy_login, 1163 &auth.0.did 1164 ) 1165 .fetch_optional(&state.db) 1166 .await; 1167 1168 match result { 1169 Ok(Some(_)) => { 1170 info!( 1171 did = %&auth.0.did, 1172 allow_legacy_login = input.allow_legacy_login, 1173 "Legacy login preference updated" 1174 ); 1175 Json(json!({ 1176 "allowLegacyLogin": input.allow_legacy_login 1177 })) 1178 .into_response() 1179 } 1180 Ok(None) => ApiError::AccountNotFound.into_response(), 1181 Err(e) => { 1182 error!("DB error: {:?}", e); 1183 ApiError::InternalError(None).into_response() 1184 } 1185 } 1186} 1187 1188use crate::comms::locale::VALID_LOCALES; 1189 1190#[derive(Deserialize)] 1191#[serde(rename_all = "camelCase")] 1192pub struct UpdateLocaleInput { 1193 pub preferred_locale: String, 1194} 1195 1196pub async fn update_locale( 1197 State(state): State<AppState>, 1198 auth: BearerAuth, 1199 Json(input): Json<UpdateLocaleInput>, 1200) -> Response { 1201 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1202 return ApiError::InvalidRequest(format!( 1203 "Invalid locale. Valid options: {}", 1204 VALID_LOCALES.join(", ") 1205 )) 1206 .into_response(); 1207 } 1208 1209 let result = sqlx::query!( 1210 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1211 input.preferred_locale, 1212 &auth.0.did 1213 ) 1214 .fetch_optional(&state.db) 1215 .await; 1216 1217 match result { 1218 Ok(Some(_)) => { 1219 info!( 1220 did = %&auth.0.did, 1221 locale = %input.preferred_locale, 1222 "User locale preference updated" 1223 ); 1224 Json(json!({ 1225 "preferredLocale": input.preferred_locale 1226 })) 1227 .into_response() 1228 } 1229 Ok(None) => ApiError::AccountNotFound.into_response(), 1230 Err(e) => { 1231 error!("DB error updating locale: {:?}", e); 1232 ApiError::InternalError(None).into_response() 1233 } 1234 } 1235}