this repo has no description
1use crate::api::error::ApiError; 2use crate::api::{EmptyResponse, SuccessResponse}; 3use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 4use crate::state::{AppState, RateLimitKind}; 5use crate::types::{AccountState, Did, Handle, PlainPassword}; 6use axum::{ 7 Json, 8 extract::State, 9 http::{HeaderMap, StatusCode}, 10 response::{IntoResponse, Response}, 11}; 12use bcrypt::verify; 13use serde::{Deserialize, Serialize}; 14use serde_json::json; 15use tracing::{error, info, warn}; 16 17fn extract_client_ip(headers: &HeaderMap) -> String { 18 if let Some(forwarded) = headers.get("x-forwarded-for") 19 && let Ok(value) = forwarded.to_str() 20 && let Some(first_ip) = value.split(',').next() 21 { 22 return first_ip.trim().to_string(); 23 } 24 if let Some(real_ip) = headers.get("x-real-ip") 25 && let Ok(value) = real_ip.to_str() 26 { 27 return value.trim().to_string(); 28 } 29 "unknown".to_string() 30} 31 32fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 33 let identifier = identifier.trim(); 34 if identifier.contains('@') || identifier.starts_with("did:") { 35 identifier.to_string() 36 } else if !identifier.contains('.') { 37 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 38 } else { 39 identifier.to_lowercase() 40 } 41} 42 43fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 44 stored_handle.to_string() 45} 46 47#[derive(Deserialize)] 48#[serde(rename_all = "camelCase")] 49pub struct CreateSessionInput { 50 pub identifier: String, 51 pub password: PlainPassword, 52 #[serde(default)] 53 pub allow_takendown: bool, 54} 55 56#[derive(Serialize)] 57#[serde(rename_all = "camelCase")] 58pub struct CreateSessionOutput { 59 pub access_jwt: String, 60 pub refresh_jwt: String, 61 pub handle: Handle, 62 pub did: Did, 63 #[serde(skip_serializing_if = "Option::is_none")] 64 pub did_doc: Option<serde_json::Value>, 65 #[serde(skip_serializing_if = "Option::is_none")] 66 pub email: Option<String>, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub email_confirmed: Option<bool>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub active: Option<bool>, 71 #[serde(skip_serializing_if = "Option::is_none")] 72 pub status: Option<String>, 73} 74 75pub async fn create_session( 76 State(state): State<AppState>, 77 headers: HeaderMap, 78 Json(input): Json<CreateSessionInput>, 79) -> Response { 80 info!( 81 "create_session called with identifier: {}", 82 input.identifier 83 ); 84 let client_ip = extract_client_ip(&headers); 85 if !state 86 .check_rate_limit(RateLimitKind::Login, &client_ip) 87 .await 88 { 89 warn!(ip = %client_ip, "Login rate limit exceeded"); 90 return ApiError::RateLimitExceeded(None).into_response(); 91 } 92 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 93 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 94 info!( 95 "Normalized identifier: {} -> {}", 96 input.identifier, normalized_identifier 97 ); 98 let row = match sqlx::query!( 99 r#"SELECT 100 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref, 101 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 102 u.allow_legacy_login, u.migrated_to_pds, 103 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 104 k.key_bytes, k.encryption_version, 105 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 106 FROM users u 107 JOIN user_keys k ON u.id = k.user_id 108 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 109 normalized_identifier 110 ) 111 .fetch_optional(&state.db) 112 .await 113 { 114 Ok(Some(row)) => row, 115 Ok(None) => { 116 let _ = verify( 117 &input.password, 118 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 119 ); 120 warn!("User not found for login attempt"); 121 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into())) 122 .into_response(); 123 } 124 Err(e) => { 125 error!("Database error fetching user: {:?}", e); 126 return ApiError::InternalError(None).into_response(); 127 } 128 }; 129 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 130 Ok(k) => k, 131 Err(e) => { 132 error!("Failed to decrypt user key: {:?}", e); 133 return ApiError::InternalError(None).into_response(); 134 } 135 }; 136 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row 137 .password_hash 138 .as_ref() 139 .map(|h| verify(&input.password, h).unwrap_or(false)) 140 .unwrap_or(false) 141 { 142 (true, None, None, None) 143 } else { 144 let app_passwords = sqlx::query!( 145 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 146 row.id 147 ) 148 .fetch_all(&state.db) 149 .await 150 .unwrap_or_default(); 151 let matched = app_passwords 152 .iter() 153 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 154 match matched { 155 Some(app) => ( 156 true, 157 Some(app.name.clone()), 158 app.scopes.clone(), 159 app.created_by_controller_did.clone(), 160 ), 161 None => (false, None, None, None), 162 } 163 }; 164 if !password_valid { 165 warn!("Password verification failed for login attempt"); 166 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into())) 167 .into_response(); 168 } 169 let account_state = AccountState::from_db_fields( 170 row.deactivated_at, 171 row.takedown_ref.clone(), 172 row.migrated_to_pds.clone(), 173 None, 174 ); 175 if account_state.is_takendown() && !input.allow_takendown { 176 warn!("Login attempt for takendown account: {}", row.did); 177 return ApiError::AccountTakedown.into_response(); 178 } 179 let is_verified = 180 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 181 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 182 .await 183 .unwrap_or(false); 184 if !is_verified && !is_delegated { 185 warn!("Login attempt for unverified account: {}", row.did); 186 return ( 187 StatusCode::FORBIDDEN, 188 Json(json!({ 189 "error": "AccountNotVerified", 190 "message": "Please verify your account before logging in", 191 "did": row.did 192 })), 193 ) 194 .into_response(); 195 } 196 let has_totp = row.totp_enabled.unwrap_or(false); 197 let is_legacy_login = has_totp; 198 if has_totp && !row.allow_legacy_login { 199 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 200 return ( 201 StatusCode::FORBIDDEN, 202 Json(json!({ 203 "error": "MfaRequired", 204 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 205 "did": row.did 206 })), 207 ) 208 .into_response(); 209 } 210 let access_meta = match crate::auth::create_access_token_with_delegation( 211 &row.did, 212 &key_bytes, 213 app_password_scopes.as_deref(), 214 app_password_controller.as_deref(), 215 ) { 216 Ok(m) => m, 217 Err(e) => { 218 error!("Failed to create access token: {:?}", e); 219 return ApiError::InternalError(None).into_response(); 220 } 221 }; 222 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 223 Ok(m) => m, 224 Err(e) => { 225 error!("Failed to create refresh token: {:?}", e); 226 return ApiError::InternalError(None).into_response(); 227 } 228 }; 229 let did_for_doc = row.did.clone(); 230 let did_resolver = state.did_resolver.clone(); 231 let (insert_result, did_doc) = tokio::join!( 232 sqlx::query!( 233 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", 234 row.did, 235 access_meta.jti, 236 refresh_meta.jti, 237 access_meta.expires_at, 238 refresh_meta.expires_at, 239 is_legacy_login, 240 false, 241 app_password_scopes, 242 app_password_controller, 243 app_password_name 244 ) 245 .execute(&state.db), 246 did_resolver.resolve_did_document(&did_for_doc) 247 ); 248 if let Err(e) = insert_result { 249 error!("Failed to insert session: {:?}", e); 250 return ApiError::InternalError(None).into_response(); 251 } 252 if is_legacy_login { 253 warn!( 254 did = %row.did, 255 ip = %client_ip, 256 "Legacy login on TOTP-enabled account - sending notification" 257 ); 258 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 259 if let Err(e) = crate::comms::queue_legacy_login_notification( 260 &state.db, 261 row.id, 262 &hostname, 263 &client_ip, 264 row.preferred_comms_channel, 265 ) 266 .await 267 { 268 error!("Failed to queue legacy login notification: {:?}", e); 269 } 270 } 271 let handle = full_handle(&row.handle, &pds_hostname); 272 let is_active = account_state.is_active(); 273 let status = account_state.status_for_session().map(String::from); 274 Json(CreateSessionOutput { 275 access_jwt: access_meta.token, 276 refresh_jwt: refresh_meta.token, 277 handle: handle.into(), 278 did: row.did.into(), 279 did_doc, 280 email: row.email, 281 email_confirmed: Some(row.email_verified), 282 active: Some(is_active), 283 status, 284 }) 285 .into_response() 286} 287 288pub async fn get_session( 289 State(state): State<AppState>, 290 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 291) -> Response { 292 let permissions = auth_user.permissions(); 293 let can_read_email = permissions.allows_email_read(); 294 295 let did_for_doc = auth_user.did.clone(); 296 let did_resolver = state.did_resolver.clone(); 297 let (db_result, did_doc) = tokio::join!( 298 sqlx::query!( 299 r#"SELECT 300 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale, 301 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 302 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at 303 FROM users WHERE did = $1"#, 304 &auth_user.did 305 ) 306 .fetch_optional(&state.db), 307 did_resolver.resolve_did_document(&did_for_doc) 308 ); 309 match db_result { 310 Ok(Some(row)) => { 311 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 312 crate::comms::CommsChannel::Email => ("email", row.email_verified), 313 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 314 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 315 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 316 }; 317 let pds_hostname = 318 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 319 let handle = full_handle(&row.handle, &pds_hostname); 320 let account_state = AccountState::from_db_fields( 321 row.deactivated_at, 322 row.takedown_ref.clone(), 323 row.migrated_to_pds.clone(), 324 row.migrated_at, 325 ); 326 let email_value = if can_read_email { 327 row.email.clone() 328 } else { 329 None 330 }; 331 let email_confirmed_value = can_read_email && row.email_verified; 332 let mut response = json!({ 333 "handle": handle, 334 "did": &auth_user.did, 335 "active": account_state.is_active(), 336 "preferredChannel": preferred_channel, 337 "preferredChannelVerified": preferred_channel_verified, 338 "preferredLocale": row.preferred_locale, 339 "isAdmin": row.is_admin 340 }); 341 if can_read_email { 342 response["email"] = json!(email_value); 343 response["emailConfirmed"] = json!(email_confirmed_value); 344 } 345 if let Some(status) = account_state.status_for_session() { 346 response["status"] = json!(status); 347 } 348 if let AccountState::Migrated { to_pds, at } = &account_state { 349 response["migratedToPds"] = json!(to_pds); 350 response["migratedAt"] = json!(at); 351 } 352 if let Some(doc) = did_doc { 353 response["didDoc"] = doc; 354 } 355 Json(response).into_response() 356 } 357 Ok(None) => ApiError::AuthenticationFailed(None).into_response(), 358 Err(e) => { 359 error!("Database error in get_session: {:?}", e); 360 ApiError::InternalError(None).into_response() 361 } 362 } 363} 364 365pub async fn delete_session( 366 State(state): State<AppState>, 367 headers: axum::http::HeaderMap, 368 _auth: BearerAuth, 369) -> Response { 370 let extracted = match crate::auth::extract_auth_token_from_header( 371 headers.get("Authorization").and_then(|h| h.to_str().ok()), 372 ) { 373 Some(t) => t, 374 None => return ApiError::AuthenticationRequired.into_response(), 375 }; 376 let jti = match crate::auth::get_jti_from_token(&extracted.token) { 377 Ok(jti) => jti, 378 Err(_) => return ApiError::AuthenticationFailed(None).into_response(), 379 }; 380 let did = crate::auth::get_did_from_token(&extracted.token).ok(); 381 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 382 .execute(&state.db) 383 .await 384 { 385 Ok(res) if res.rows_affected() > 0 => { 386 if let Some(did) = did { 387 let session_cache_key = format!("auth:session:{}:{}", did, jti); 388 let _ = state.cache.delete(&session_cache_key).await; 389 } 390 EmptyResponse::ok().into_response() 391 } 392 Ok(_) => ApiError::AuthenticationFailed(None).into_response(), 393 Err(e) => { 394 error!("Database error in delete_session: {:?}", e); 395 ApiError::AuthenticationFailed(None).into_response() 396 } 397 } 398} 399 400pub async fn refresh_session( 401 State(state): State<AppState>, 402 headers: axum::http::HeaderMap, 403) -> Response { 404 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 405 if !state 406 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 407 .await 408 { 409 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 410 return ApiError::RateLimitExceeded(None).into_response(); 411 } 412 let extracted = match crate::auth::extract_auth_token_from_header( 413 headers.get("Authorization").and_then(|h| h.to_str().ok()), 414 ) { 415 Some(t) => t, 416 None => return ApiError::AuthenticationRequired.into_response(), 417 }; 418 let refresh_token = extracted.token; 419 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 420 Ok(jti) => jti, 421 Err(_) => { 422 return ApiError::AuthenticationFailed(Some("Invalid token format".into())) 423 .into_response(); 424 } 425 }; 426 let mut tx = match state.db.begin().await { 427 Ok(tx) => tx, 428 Err(e) => { 429 error!("Failed to begin transaction: {:?}", e); 430 return ApiError::InternalError(None).into_response(); 431 } 432 }; 433 if let Ok(Some(session_id)) = sqlx::query_scalar!( 434 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 435 refresh_jti 436 ) 437 .fetch_optional(&mut *tx) 438 .await 439 { 440 warn!( 441 "Refresh token reuse detected! Revoking token family for session_id: {}", 442 session_id 443 ); 444 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 445 .execute(&mut *tx) 446 .await; 447 let _ = tx.commit().await; 448 return ApiError::AuthenticationFailed(Some( 449 "Refresh token has been revoked due to suspected compromise".into(), 450 )) 451 .into_response(); 452 } 453 let session_row = match sqlx::query!( 454 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 455 FROM session_tokens st 456 JOIN users u ON st.did = u.did 457 JOIN user_keys k ON u.id = k.user_id 458 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 459 FOR UPDATE OF st"#, 460 refresh_jti 461 ) 462 .fetch_optional(&mut *tx) 463 .await 464 { 465 Ok(Some(row)) => row, 466 Ok(None) => { 467 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into())) 468 .into_response(); 469 } 470 Err(e) => { 471 error!("Database error fetching session: {:?}", e); 472 return ApiError::InternalError(None).into_response(); 473 } 474 }; 475 let key_bytes = 476 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 477 Ok(k) => k, 478 Err(e) => { 479 error!("Failed to decrypt user key: {:?}", e); 480 return ApiError::InternalError(None).into_response(); 481 } 482 }; 483 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 484 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into())) 485 .into_response(); 486 } 487 let new_access_meta = match crate::auth::create_access_token_with_delegation( 488 &session_row.did, 489 &key_bytes, 490 session_row.scope.as_deref(), 491 session_row.controller_did.as_deref(), 492 ) { 493 Ok(m) => m, 494 Err(e) => { 495 error!("Failed to create access token: {:?}", e); 496 return ApiError::InternalError(None).into_response(); 497 } 498 }; 499 let new_refresh_meta = 500 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 501 Ok(m) => m, 502 Err(e) => { 503 error!("Failed to create refresh token: {:?}", e); 504 return ApiError::InternalError(None).into_response(); 505 } 506 }; 507 match sqlx::query!( 508 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 509 refresh_jti, 510 session_row.id 511 ) 512 .execute(&mut *tx) 513 .await 514 { 515 Ok(result) if result.rows_affected() == 0 => { 516 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 517 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 518 .execute(&mut *tx) 519 .await; 520 let _ = tx.commit().await; 521 return ApiError::AuthenticationFailed(Some("Refresh token has been revoked due to suspected compromise".into())).into_response(); 522 } 523 Err(e) => { 524 error!("Failed to record used refresh token: {:?}", e); 525 return ApiError::InternalError(None).into_response(); 526 } 527 Ok(_) => {} 528 } 529 if let Err(e) = sqlx::query!( 530 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 531 new_access_meta.jti, 532 new_refresh_meta.jti, 533 new_access_meta.expires_at, 534 new_refresh_meta.expires_at, 535 session_row.id 536 ) 537 .execute(&mut *tx) 538 .await 539 { 540 error!("Database error updating session: {:?}", e); 541 return ApiError::InternalError(None).into_response(); 542 } 543 if let Err(e) = tx.commit().await { 544 error!("Failed to commit transaction: {:?}", e); 545 return ApiError::InternalError(None).into_response(); 546 } 547 let did_for_doc = session_row.did.clone(); 548 let did_resolver = state.did_resolver.clone(); 549 let (db_result, did_doc) = tokio::join!( 550 sqlx::query!( 551 r#"SELECT 552 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref, 553 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 554 discord_verified, telegram_verified, signal_verified 555 FROM users WHERE did = $1"#, 556 session_row.did 557 ) 558 .fetch_optional(&state.db), 559 did_resolver.resolve_did_document(&did_for_doc) 560 ); 561 match db_result { 562 Ok(Some(u)) => { 563 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 564 crate::comms::CommsChannel::Email => ("email", u.email_verified), 565 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 566 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 567 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 568 }; 569 let pds_hostname = 570 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 571 let handle = full_handle(&u.handle, &pds_hostname); 572 let account_state = 573 AccountState::from_db_fields(u.deactivated_at, u.takedown_ref.clone(), None, None); 574 let mut response = json!({ 575 "accessJwt": new_access_meta.token, 576 "refreshJwt": new_refresh_meta.token, 577 "handle": handle, 578 "did": session_row.did, 579 "email": u.email, 580 "emailConfirmed": u.email_verified, 581 "preferredChannel": preferred_channel, 582 "preferredChannelVerified": preferred_channel_verified, 583 "preferredLocale": u.preferred_locale, 584 "isAdmin": u.is_admin, 585 "active": account_state.is_active() 586 }); 587 if let Some(doc) = did_doc { 588 response["didDoc"] = doc; 589 } 590 if let Some(status) = account_state.status_for_session() { 591 response["status"] = json!(status); 592 } 593 Json(response).into_response() 594 } 595 Ok(None) => { 596 error!("User not found for existing session: {}", session_row.did); 597 ApiError::InternalError(None).into_response() 598 } 599 Err(e) => { 600 error!("Database error fetching user: {:?}", e); 601 ApiError::InternalError(None).into_response() 602 } 603 } 604} 605 606#[derive(Deserialize)] 607#[serde(rename_all = "camelCase")] 608pub struct ConfirmSignupInput { 609 pub did: Did, 610 pub verification_code: String, 611} 612 613#[derive(Serialize)] 614#[serde(rename_all = "camelCase")] 615pub struct ConfirmSignupOutput { 616 pub access_jwt: String, 617 pub refresh_jwt: String, 618 pub handle: Handle, 619 pub did: Did, 620 pub email: Option<String>, 621 pub email_verified: bool, 622 pub preferred_channel: String, 623 pub preferred_channel_verified: bool, 624} 625 626pub async fn confirm_signup( 627 State(state): State<AppState>, 628 Json(input): Json<ConfirmSignupInput>, 629) -> Response { 630 info!("confirm_signup called for DID: {}", input.did); 631 let row = match sqlx::query!( 632 r#"SELECT 633 u.id, u.did, u.handle, u.email, 634 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 635 u.discord_id, u.telegram_username, u.signal_number, 636 k.key_bytes, k.encryption_version 637 FROM users u 638 JOIN user_keys k ON u.id = k.user_id 639 WHERE u.did = $1"#, 640 input.did.as_str() 641 ) 642 .fetch_optional(&state.db) 643 .await 644 { 645 Ok(Some(row)) => row, 646 Ok(None) => { 647 warn!("User not found for confirm_signup: {}", input.did); 648 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 649 .into_response(); 650 } 651 Err(e) => { 652 error!("Database error in confirm_signup: {:?}", e); 653 return ApiError::InternalError(None).into_response(); 654 } 655 }; 656 657 let (channel_str, identifier) = match row.channel { 658 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 659 crate::comms::CommsChannel::Discord => { 660 ("discord", row.discord_id.clone().unwrap_or_default()) 661 } 662 crate::comms::CommsChannel::Telegram => ( 663 "telegram", 664 row.telegram_username.clone().unwrap_or_default(), 665 ), 666 crate::comms::CommsChannel::Signal => { 667 ("signal", row.signal_number.clone().unwrap_or_default()) 668 } 669 }; 670 671 let normalized_token = 672 crate::auth::verification_token::normalize_token_input(&input.verification_code); 673 match crate::auth::verification_token::verify_signup_token( 674 &normalized_token, 675 channel_str, 676 &identifier, 677 ) { 678 Ok(token_data) => { 679 if token_data.did != input.did.as_str() { 680 warn!( 681 "Token DID mismatch for confirm_signup: expected {}, got {}", 682 input.did, token_data.did 683 ); 684 return ApiError::InvalidRequest("Invalid verification code".into()) 685 .into_response(); 686 } 687 } 688 Err(crate::auth::verification_token::VerifyError::Expired) => { 689 warn!("Verification code expired for user: {}", input.did); 690 return ApiError::ExpiredToken(Some("Verification code has expired".into())) 691 .into_response(); 692 } 693 Err(e) => { 694 warn!("Invalid verification code for user {}: {:?}", input.did, e); 695 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 696 } 697 } 698 699 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 700 Ok(k) => k, 701 Err(e) => { 702 error!("Failed to decrypt user key: {:?}", e); 703 return ApiError::InternalError(None).into_response(); 704 } 705 }; 706 let verified_column = match row.channel { 707 crate::comms::CommsChannel::Email => "email_verified", 708 crate::comms::CommsChannel::Discord => "discord_verified", 709 crate::comms::CommsChannel::Telegram => "telegram_verified", 710 crate::comms::CommsChannel::Signal => "signal_verified", 711 }; 712 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 713 if let Err(e) = sqlx::query(&update_query) 714 .bind(input.did.as_str()) 715 .execute(&state.db) 716 .await 717 { 718 error!("Failed to update verification status: {:?}", e); 719 return ApiError::InternalError(None).into_response(); 720 } 721 722 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 723 Ok(m) => m, 724 Err(e) => { 725 error!("Failed to create access token: {:?}", e); 726 return ApiError::InternalError(None).into_response(); 727 } 728 }; 729 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 730 Ok(m) => m, 731 Err(e) => { 732 error!("Failed to create refresh token: {:?}", e); 733 return ApiError::InternalError(None).into_response(); 734 } 735 }; 736 let no_scope: Option<String> = None; 737 if let Err(e) = sqlx::query!( 738 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 739 row.did, 740 access_meta.jti, 741 refresh_meta.jti, 742 access_meta.expires_at, 743 refresh_meta.expires_at, 744 false, 745 false, 746 no_scope 747 ) 748 .execute(&state.db) 749 .await 750 { 751 error!("Failed to insert session: {:?}", e); 752 return ApiError::InternalError(None).into_response(); 753 } 754 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 755 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 756 warn!("Failed to enqueue welcome notification: {:?}", e); 757 } 758 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 759 let preferred_channel = match row.channel { 760 crate::comms::CommsChannel::Email => "email", 761 crate::comms::CommsChannel::Discord => "discord", 762 crate::comms::CommsChannel::Telegram => "telegram", 763 crate::comms::CommsChannel::Signal => "signal", 764 }; 765 Json(ConfirmSignupOutput { 766 access_jwt: access_meta.token, 767 refresh_jwt: refresh_meta.token, 768 handle: row.handle.into(), 769 did: row.did.into(), 770 email: row.email, 771 email_verified, 772 preferred_channel: preferred_channel.to_string(), 773 preferred_channel_verified: true, 774 }) 775 .into_response() 776} 777 778#[derive(Deserialize)] 779#[serde(rename_all = "camelCase")] 780pub struct ResendVerificationInput { 781 pub did: Did, 782} 783 784pub async fn resend_verification( 785 State(state): State<AppState>, 786 Json(input): Json<ResendVerificationInput>, 787) -> Response { 788 info!("resend_verification called for DID: {}", input.did); 789 let row = match sqlx::query!( 790 r#"SELECT 791 id, handle, email, 792 preferred_comms_channel as "channel: crate::comms::CommsChannel", 793 discord_id, telegram_username, signal_number, 794 email_verified, discord_verified, telegram_verified, signal_verified 795 FROM users 796 WHERE did = $1"#, 797 input.did.as_str() 798 ) 799 .fetch_optional(&state.db) 800 .await 801 { 802 Ok(Some(row)) => row, 803 Ok(None) => { 804 return ApiError::InvalidRequest("User not found".into()).into_response(); 805 } 806 Err(e) => { 807 error!("Database error in resend_verification: {:?}", e); 808 return ApiError::InternalError(None).into_response(); 809 } 810 }; 811 let is_verified = 812 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 813 if is_verified { 814 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 815 } 816 817 let (channel_str, recipient) = match row.channel { 818 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 819 crate::comms::CommsChannel::Discord => { 820 ("discord", row.discord_id.clone().unwrap_or_default()) 821 } 822 crate::comms::CommsChannel::Telegram => ( 823 "telegram", 824 row.telegram_username.clone().unwrap_or_default(), 825 ), 826 crate::comms::CommsChannel::Signal => { 827 ("signal", row.signal_number.clone().unwrap_or_default()) 828 } 829 }; 830 831 let verification_token = 832 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 833 let formatted_token = 834 crate::auth::verification_token::format_token_for_display(&verification_token); 835 836 if let Err(e) = crate::comms::enqueue_signup_verification( 837 &state.db, 838 row.id, 839 channel_str, 840 &recipient, 841 &formatted_token, 842 None, 843 ) 844 .await 845 { 846 warn!("Failed to enqueue verification notification: {:?}", e); 847 } 848 SuccessResponse::ok().into_response() 849} 850 851#[derive(Serialize)] 852#[serde(rename_all = "camelCase")] 853pub struct SessionInfo { 854 pub id: String, 855 pub session_type: String, 856 pub client_name: Option<String>, 857 pub created_at: String, 858 pub expires_at: String, 859 pub is_current: bool, 860} 861 862#[derive(Serialize)] 863#[serde(rename_all = "camelCase")] 864pub struct ListSessionsOutput { 865 pub sessions: Vec<SessionInfo>, 866} 867 868pub async fn list_sessions( 869 State(state): State<AppState>, 870 headers: HeaderMap, 871 auth: BearerAuth, 872) -> Response { 873 let current_jti = headers 874 .get("authorization") 875 .and_then(|v| v.to_str().ok()) 876 .and_then(|v| v.strip_prefix("Bearer ")) 877 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 878 879 let mut sessions: Vec<SessionInfo> = Vec::new(); 880 881 let jwt_result = sqlx::query_as::< 882 _, 883 ( 884 i32, 885 String, 886 chrono::DateTime<chrono::Utc>, 887 chrono::DateTime<chrono::Utc>, 888 ), 889 >( 890 r#" 891 SELECT id, access_jti, created_at, refresh_expires_at 892 FROM session_tokens 893 WHERE did = $1 AND refresh_expires_at > NOW() 894 ORDER BY created_at DESC 895 "#, 896 ) 897 .bind(&auth.0.did) 898 .fetch_all(&state.db) 899 .await; 900 901 match jwt_result { 902 Ok(rows) => { 903 for (id, access_jti, created_at, expires_at) in rows { 904 sessions.push(SessionInfo { 905 id: format!("jwt:{}", id), 906 session_type: "legacy".to_string(), 907 client_name: None, 908 created_at: created_at.to_rfc3339(), 909 expires_at: expires_at.to_rfc3339(), 910 is_current: current_jti.as_ref() == Some(&access_jti), 911 }); 912 } 913 } 914 Err(e) => { 915 error!("DB error fetching JWT sessions: {:?}", e); 916 return ApiError::InternalError(None).into_response(); 917 } 918 } 919 920 let oauth_result = sqlx::query_as::< 921 _, 922 ( 923 i32, 924 String, 925 chrono::DateTime<chrono::Utc>, 926 chrono::DateTime<chrono::Utc>, 927 String, 928 ), 929 >( 930 r#" 931 SELECT id, token_id, created_at, expires_at, client_id 932 FROM oauth_token 933 WHERE did = $1 AND expires_at > NOW() 934 ORDER BY created_at DESC 935 "#, 936 ) 937 .bind(&auth.0.did) 938 .fetch_all(&state.db) 939 .await; 940 941 match oauth_result { 942 Ok(rows) => { 943 for (id, token_id, created_at, expires_at, client_id) in rows { 944 let client_name = extract_client_name(&client_id); 945 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 946 sessions.push(SessionInfo { 947 id: format!("oauth:{}", id), 948 session_type: "oauth".to_string(), 949 client_name: Some(client_name), 950 created_at: created_at.to_rfc3339(), 951 expires_at: expires_at.to_rfc3339(), 952 is_current: is_current_oauth, 953 }); 954 } 955 } 956 Err(e) => { 957 error!("DB error fetching OAuth sessions: {:?}", e); 958 return ApiError::InternalError(None).into_response(); 959 } 960 } 961 962 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 963 964 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 965} 966 967fn extract_client_name(client_id: &str) -> String { 968 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 969 "Localhost App".to_string() 970 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 971 parsed.host_str().unwrap_or("Unknown App").to_string() 972 } else { 973 client_id.to_string() 974 } 975} 976 977#[derive(Deserialize)] 978#[serde(rename_all = "camelCase")] 979pub struct RevokeSessionInput { 980 pub session_id: String, 981} 982 983pub async fn revoke_session( 984 State(state): State<AppState>, 985 auth: BearerAuth, 986 Json(input): Json<RevokeSessionInput>, 987) -> Response { 988 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 989 let Ok(session_id) = jwt_id.parse::<i32>() else { 990 return ApiError::InvalidRequest("Invalid session ID".into()).into_response(); 991 }; 992 let session = sqlx::query_as::<_, (String,)>( 993 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 994 ) 995 .bind(session_id) 996 .bind(&auth.0.did) 997 .fetch_optional(&state.db) 998 .await; 999 let access_jti = match session { 1000 Ok(Some((jti,))) => jti, 1001 Ok(None) => { 1002 return ApiError::SessionNotFound.into_response(); 1003 } 1004 Err(e) => { 1005 error!("DB error in revoke_session: {:?}", e); 1006 return ApiError::InternalError(None).into_response(); 1007 } 1008 }; 1009 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 1010 .bind(session_id) 1011 .execute(&state.db) 1012 .await 1013 { 1014 error!("DB error deleting session: {:?}", e); 1015 return ApiError::InternalError(None).into_response(); 1016 } 1017 let cache_key = format!("auth:session:{}:{}", &auth.0.did, access_jti); 1018 if let Err(e) = state.cache.delete(&cache_key).await { 1019 warn!("Failed to invalidate session cache: {:?}", e); 1020 } 1021 info!(did = %&auth.0.did, session_id = %session_id, "JWT session revoked"); 1022 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 1023 let Ok(session_id) = oauth_id.parse::<i32>() else { 1024 return ApiError::InvalidRequest("Invalid session ID".into()).into_response(); 1025 }; 1026 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1027 .bind(session_id) 1028 .bind(&auth.0.did) 1029 .execute(&state.db) 1030 .await; 1031 match result { 1032 Ok(r) if r.rows_affected() == 0 => { 1033 return ApiError::SessionNotFound.into_response(); 1034 } 1035 Err(e) => { 1036 error!("DB error deleting OAuth session: {:?}", e); 1037 return ApiError::InternalError(None).into_response(); 1038 } 1039 _ => {} 1040 } 1041 info!(did = %&auth.0.did, session_id = %session_id, "OAuth session revoked"); 1042 } else { 1043 return ApiError::InvalidRequest("Invalid session ID format".into()).into_response(); 1044 } 1045 EmptyResponse::ok().into_response() 1046} 1047 1048pub async fn revoke_all_sessions( 1049 State(state): State<AppState>, 1050 headers: HeaderMap, 1051 auth: BearerAuth, 1052) -> Response { 1053 let current_jti = crate::auth::extract_auth_token_from_header( 1054 headers.get("authorization").and_then(|v| v.to_str().ok()), 1055 ) 1056 .and_then(|extracted| crate::auth::get_jti_from_token(&extracted.token).ok()); 1057 1058 let Some(ref jti) = current_jti else { 1059 return ApiError::InvalidToken(None).into_response(); 1060 }; 1061 1062 if auth.0.is_oauth { 1063 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1064 .bind(&auth.0.did) 1065 .execute(&state.db) 1066 .await 1067 { 1068 error!("DB error revoking JWT sessions: {:?}", e); 1069 return ApiError::InternalError(None).into_response(); 1070 } 1071 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1072 .bind(&auth.0.did) 1073 .bind(jti) 1074 .execute(&state.db) 1075 .await 1076 { 1077 error!("DB error revoking OAuth sessions: {:?}", e); 1078 return ApiError::InternalError(None).into_response(); 1079 } 1080 } else { 1081 if let Err(e) = 1082 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1083 .bind(&auth.0.did) 1084 .bind(jti) 1085 .execute(&state.db) 1086 .await 1087 { 1088 error!("DB error revoking JWT sessions: {:?}", e); 1089 return ApiError::InternalError(None).into_response(); 1090 } 1091 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1092 .bind(&auth.0.did) 1093 .execute(&state.db) 1094 .await 1095 { 1096 error!("DB error revoking OAuth sessions: {:?}", e); 1097 return ApiError::InternalError(None).into_response(); 1098 } 1099 } 1100 1101 info!(did = %&auth.0.did, "All other sessions revoked"); 1102 SuccessResponse::ok().into_response() 1103} 1104 1105#[derive(Serialize)] 1106#[serde(rename_all = "camelCase")] 1107pub struct LegacyLoginPreferenceOutput { 1108 pub allow_legacy_login: bool, 1109 pub has_mfa: bool, 1110} 1111 1112pub async fn get_legacy_login_preference( 1113 State(state): State<AppState>, 1114 auth: BearerAuth, 1115) -> Response { 1116 let result = sqlx::query!( 1117 r#"SELECT 1118 u.allow_legacy_login, 1119 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1120 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1121 FROM users u WHERE u.did = $1"#, 1122 &auth.0.did 1123 ) 1124 .fetch_optional(&state.db) 1125 .await; 1126 1127 match result { 1128 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1129 allow_legacy_login: row.allow_legacy_login, 1130 has_mfa: row.has_mfa, 1131 }) 1132 .into_response(), 1133 Ok(None) => ApiError::AccountNotFound.into_response(), 1134 Err(e) => { 1135 error!("DB error: {:?}", e); 1136 ApiError::InternalError(None).into_response() 1137 } 1138 } 1139} 1140 1141#[derive(Deserialize)] 1142#[serde(rename_all = "camelCase")] 1143pub struct UpdateLegacyLoginInput { 1144 pub allow_legacy_login: bool, 1145} 1146 1147pub async fn update_legacy_login_preference( 1148 State(state): State<AppState>, 1149 auth: BearerAuth, 1150 Json(input): Json<UpdateLegacyLoginInput>, 1151) -> Response { 1152 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1153 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1154 .await; 1155 } 1156 1157 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1158 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1159 } 1160 1161 let result = sqlx::query!( 1162 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1163 input.allow_legacy_login, 1164 &auth.0.did 1165 ) 1166 .fetch_optional(&state.db) 1167 .await; 1168 1169 match result { 1170 Ok(Some(_)) => { 1171 info!( 1172 did = %&auth.0.did, 1173 allow_legacy_login = input.allow_legacy_login, 1174 "Legacy login preference updated" 1175 ); 1176 Json(json!({ 1177 "allowLegacyLogin": input.allow_legacy_login 1178 })) 1179 .into_response() 1180 } 1181 Ok(None) => ApiError::AccountNotFound.into_response(), 1182 Err(e) => { 1183 error!("DB error: {:?}", e); 1184 ApiError::InternalError(None).into_response() 1185 } 1186 } 1187} 1188 1189use crate::comms::locale::VALID_LOCALES; 1190 1191#[derive(Deserialize)] 1192#[serde(rename_all = "camelCase")] 1193pub struct UpdateLocaleInput { 1194 pub preferred_locale: String, 1195} 1196 1197pub async fn update_locale( 1198 State(state): State<AppState>, 1199 auth: BearerAuth, 1200 Json(input): Json<UpdateLocaleInput>, 1201) -> Response { 1202 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1203 return ApiError::InvalidRequest(format!( 1204 "Invalid locale. Valid options: {}", 1205 VALID_LOCALES.join(", ") 1206 )) 1207 .into_response(); 1208 } 1209 1210 let result = sqlx::query!( 1211 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1212 input.preferred_locale, 1213 &auth.0.did 1214 ) 1215 .fetch_optional(&state.db) 1216 .await; 1217 1218 match result { 1219 Ok(Some(_)) => { 1220 info!( 1221 did = %&auth.0.did, 1222 locale = %input.preferred_locale, 1223 "User locale preference updated" 1224 ); 1225 Json(json!({ 1226 "preferredLocale": input.preferred_locale 1227 })) 1228 .into_response() 1229 } 1230 Ok(None) => ApiError::AccountNotFound.into_response(), 1231 Err(e) => { 1232 error!("DB error updating locale: {:?}", e); 1233 ApiError::InternalError(None).into_response() 1234 } 1235 } 1236}