this repo has no description
at main 45 kB view raw
1use crate::api::error::ApiError; 2use crate::api::{EmptyResponse, SuccessResponse}; 3use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 4use crate::state::{AppState, RateLimitKind}; 5use crate::types::{AccountState, Did, Handle, PlainPassword}; 6use axum::{ 7 Json, 8 extract::State, 9 http::{HeaderMap, StatusCode}, 10 response::{IntoResponse, Response}, 11}; 12use bcrypt::verify; 13use serde::{Deserialize, Serialize}; 14use serde_json::json; 15use tracing::{error, info, warn}; 16 17fn extract_client_ip(headers: &HeaderMap) -> String { 18 if let Some(forwarded) = headers.get("x-forwarded-for") 19 && let Ok(value) = forwarded.to_str() 20 && let Some(first_ip) = value.split(',').next() 21 { 22 return first_ip.trim().to_string(); 23 } 24 if let Some(real_ip) = headers.get("x-real-ip") 25 && let Ok(value) = real_ip.to_str() 26 { 27 return value.trim().to_string(); 28 } 29 "unknown".to_string() 30} 31 32fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 33 let identifier = identifier.trim(); 34 if identifier.contains('@') || identifier.starts_with("did:") { 35 identifier.to_string() 36 } else if !identifier.contains('.') { 37 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 38 } else { 39 identifier.to_lowercase() 40 } 41} 42 43fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 44 stored_handle.to_string() 45} 46 47#[derive(Deserialize)] 48#[serde(rename_all = "camelCase")] 49pub struct CreateSessionInput { 50 pub identifier: String, 51 pub password: PlainPassword, 52 #[serde(default)] 53 pub allow_takendown: bool, 54} 55 56#[derive(Serialize)] 57#[serde(rename_all = "camelCase")] 58pub struct CreateSessionOutput { 59 pub access_jwt: String, 60 pub refresh_jwt: String, 61 pub handle: Handle, 62 pub did: Did, 63 #[serde(skip_serializing_if = "Option::is_none")] 64 pub did_doc: Option<serde_json::Value>, 65 #[serde(skip_serializing_if = "Option::is_none")] 66 pub email: Option<String>, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub email_confirmed: Option<bool>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub active: Option<bool>, 71 #[serde(skip_serializing_if = "Option::is_none")] 72 pub status: Option<String>, 73} 74 75pub async fn create_session( 76 State(state): State<AppState>, 77 headers: HeaderMap, 78 Json(input): Json<CreateSessionInput>, 79) -> Response { 80 info!( 81 "create_session called with identifier: {}", 82 input.identifier 83 ); 84 let client_ip = extract_client_ip(&headers); 85 if !state 86 .check_rate_limit(RateLimitKind::Login, &client_ip) 87 .await 88 { 89 warn!(ip = %client_ip, "Login rate limit exceeded"); 90 return ApiError::RateLimitExceeded(None).into_response(); 91 } 92 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 93 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 94 info!( 95 "Normalized identifier: {} -> {}", 96 input.identifier, normalized_identifier 97 ); 98 let row = match sqlx::query!( 99 r#"SELECT 100 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref, 101 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 102 u.allow_legacy_login, u.migrated_to_pds, 103 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 104 k.key_bytes, k.encryption_version, 105 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 106 FROM users u 107 JOIN user_keys k ON u.id = k.user_id 108 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 109 normalized_identifier 110 ) 111 .fetch_optional(&state.db) 112 .await 113 { 114 Ok(Some(row)) => row, 115 Ok(None) => { 116 let _ = verify( 117 &input.password, 118 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 119 ); 120 warn!("User not found for login attempt"); 121 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into())) 122 .into_response(); 123 } 124 Err(e) => { 125 error!("Database error fetching user: {:?}", e); 126 return ApiError::InternalError(None).into_response(); 127 } 128 }; 129 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 130 Ok(k) => k, 131 Err(e) => { 132 error!("Failed to decrypt user key: {:?}", e); 133 return ApiError::InternalError(None).into_response(); 134 } 135 }; 136 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row 137 .password_hash 138 .as_ref() 139 .map(|h| verify(&input.password, h).unwrap_or(false)) 140 .unwrap_or(false) 141 { 142 (true, None, None, None) 143 } else { 144 let app_passwords = sqlx::query!( 145 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 146 row.id 147 ) 148 .fetch_all(&state.db) 149 .await 150 .unwrap_or_default(); 151 let matched = app_passwords 152 .iter() 153 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 154 match matched { 155 Some(app) => ( 156 true, 157 Some(app.name.clone()), 158 app.scopes.clone(), 159 app.created_by_controller_did.clone(), 160 ), 161 None => (false, None, None, None), 162 } 163 }; 164 if !password_valid { 165 warn!("Password verification failed for login attempt"); 166 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into())) 167 .into_response(); 168 } 169 let account_state = AccountState::from_db_fields( 170 row.deactivated_at, 171 row.takedown_ref.clone(), 172 row.migrated_to_pds.clone(), 173 None, 174 ); 175 if account_state.is_takendown() && !input.allow_takendown { 176 warn!("Login attempt for takendown account: {}", row.did); 177 return ApiError::AccountTakedown.into_response(); 178 } 179 let is_verified = 180 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 181 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 182 .await 183 .unwrap_or(false); 184 if !is_verified && !is_delegated { 185 warn!("Login attempt for unverified account: {}", row.did); 186 return ( 187 StatusCode::FORBIDDEN, 188 Json(json!({ 189 "error": "AccountNotVerified", 190 "message": "Please verify your account before logging in", 191 "did": row.did 192 })), 193 ) 194 .into_response(); 195 } 196 let has_totp = row.totp_enabled.unwrap_or(false); 197 let is_legacy_login = has_totp; 198 if has_totp && !row.allow_legacy_login { 199 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 200 return ( 201 StatusCode::FORBIDDEN, 202 Json(json!({ 203 "error": "MfaRequired", 204 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 205 "did": row.did 206 })), 207 ) 208 .into_response(); 209 } 210 let access_meta = match crate::auth::create_access_token_with_delegation( 211 &row.did, 212 &key_bytes, 213 app_password_scopes.as_deref(), 214 app_password_controller.as_deref(), 215 None, 216 ) { 217 Ok(m) => m, 218 Err(e) => { 219 error!("Failed to create access token: {:?}", e); 220 return ApiError::InternalError(None).into_response(); 221 } 222 }; 223 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 224 Ok(m) => m, 225 Err(e) => { 226 error!("Failed to create refresh token: {:?}", e); 227 return ApiError::InternalError(None).into_response(); 228 } 229 }; 230 let did_for_doc = row.did.clone(); 231 let did_resolver = state.did_resolver.clone(); 232 let (insert_result, did_doc) = tokio::join!( 233 sqlx::query!( 234 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", 235 row.did, 236 access_meta.jti, 237 refresh_meta.jti, 238 access_meta.expires_at, 239 refresh_meta.expires_at, 240 is_legacy_login, 241 false, 242 app_password_scopes, 243 app_password_controller, 244 app_password_name 245 ) 246 .execute(&state.db), 247 did_resolver.resolve_did_document(&did_for_doc) 248 ); 249 if let Err(e) = insert_result { 250 error!("Failed to insert session: {:?}", e); 251 return ApiError::InternalError(None).into_response(); 252 } 253 if is_legacy_login { 254 warn!( 255 did = %row.did, 256 ip = %client_ip, 257 "Legacy login on TOTP-enabled account - sending notification" 258 ); 259 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 260 if let Err(e) = crate::comms::queue_legacy_login_notification( 261 &state.db, 262 row.id, 263 &hostname, 264 &client_ip, 265 row.preferred_comms_channel, 266 ) 267 .await 268 { 269 error!("Failed to queue legacy login notification: {:?}", e); 270 } 271 } 272 let handle = full_handle(&row.handle, &pds_hostname); 273 let is_active = account_state.is_active(); 274 let status = account_state.status_for_session().map(String::from); 275 Json(CreateSessionOutput { 276 access_jwt: access_meta.token, 277 refresh_jwt: refresh_meta.token, 278 handle: handle.into(), 279 did: row.did.into(), 280 did_doc, 281 email: row.email, 282 email_confirmed: Some(row.email_verified), 283 active: Some(is_active), 284 status, 285 }) 286 .into_response() 287} 288 289pub async fn get_session( 290 State(state): State<AppState>, 291 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 292) -> Response { 293 let permissions = auth_user.permissions(); 294 let can_read_email = permissions.allows_email_read(); 295 296 let did_for_doc = auth_user.did.clone(); 297 let did_resolver = state.did_resolver.clone(); 298 let (db_result, did_doc) = tokio::join!( 299 sqlx::query!( 300 r#"SELECT 301 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale, 302 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 303 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at 304 FROM users WHERE did = $1"#, 305 &auth_user.did 306 ) 307 .fetch_optional(&state.db), 308 did_resolver.resolve_did_document(&did_for_doc) 309 ); 310 match db_result { 311 Ok(Some(row)) => { 312 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 313 crate::comms::CommsChannel::Email => ("email", row.email_verified), 314 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 315 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 316 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 317 }; 318 let pds_hostname = 319 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 320 let handle = full_handle(&row.handle, &pds_hostname); 321 let account_state = AccountState::from_db_fields( 322 row.deactivated_at, 323 row.takedown_ref.clone(), 324 row.migrated_to_pds.clone(), 325 row.migrated_at, 326 ); 327 let email_value = if can_read_email { 328 row.email.clone() 329 } else { 330 None 331 }; 332 let email_confirmed_value = can_read_email && row.email_verified; 333 let mut response = json!({ 334 "handle": handle, 335 "did": &auth_user.did, 336 "active": account_state.is_active(), 337 "preferredChannel": preferred_channel, 338 "preferredChannelVerified": preferred_channel_verified, 339 "preferredLocale": row.preferred_locale, 340 "isAdmin": row.is_admin 341 }); 342 if can_read_email { 343 response["email"] = json!(email_value); 344 response["emailConfirmed"] = json!(email_confirmed_value); 345 } 346 if let Some(status) = account_state.status_for_session() { 347 response["status"] = json!(status); 348 } 349 if let AccountState::Migrated { to_pds, at } = &account_state { 350 response["migratedToPds"] = json!(to_pds); 351 response["migratedAt"] = json!(at); 352 } 353 if let Some(doc) = did_doc { 354 response["didDoc"] = doc; 355 } 356 Json(response).into_response() 357 } 358 Ok(None) => ApiError::AuthenticationFailed(None).into_response(), 359 Err(e) => { 360 error!("Database error in get_session: {:?}", e); 361 ApiError::InternalError(None).into_response() 362 } 363 } 364} 365 366pub async fn delete_session( 367 State(state): State<AppState>, 368 headers: axum::http::HeaderMap, 369 _auth: BearerAuth, 370) -> Response { 371 let extracted = match crate::auth::extract_auth_token_from_header( 372 headers.get("Authorization").and_then(|h| h.to_str().ok()), 373 ) { 374 Some(t) => t, 375 None => return ApiError::AuthenticationRequired.into_response(), 376 }; 377 let jti = match crate::auth::get_jti_from_token(&extracted.token) { 378 Ok(jti) => jti, 379 Err(_) => return ApiError::AuthenticationFailed(None).into_response(), 380 }; 381 let did = crate::auth::get_did_from_token(&extracted.token).ok(); 382 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 383 .execute(&state.db) 384 .await 385 { 386 Ok(res) if res.rows_affected() > 0 => { 387 if let Some(did) = did { 388 let session_cache_key = format!("auth:session:{}:{}", did, jti); 389 let _ = state.cache.delete(&session_cache_key).await; 390 } 391 EmptyResponse::ok().into_response() 392 } 393 Ok(_) => ApiError::AuthenticationFailed(None).into_response(), 394 Err(e) => { 395 error!("Database error in delete_session: {:?}", e); 396 ApiError::AuthenticationFailed(None).into_response() 397 } 398 } 399} 400 401pub async fn refresh_session( 402 State(state): State<AppState>, 403 headers: axum::http::HeaderMap, 404) -> Response { 405 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 406 if !state 407 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 408 .await 409 { 410 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 411 return ApiError::RateLimitExceeded(None).into_response(); 412 } 413 let extracted = match crate::auth::extract_auth_token_from_header( 414 headers.get("Authorization").and_then(|h| h.to_str().ok()), 415 ) { 416 Some(t) => t, 417 None => return ApiError::AuthenticationRequired.into_response(), 418 }; 419 let refresh_token = extracted.token; 420 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 421 Ok(jti) => jti, 422 Err(_) => { 423 return ApiError::AuthenticationFailed(Some("Invalid token format".into())) 424 .into_response(); 425 } 426 }; 427 let mut tx = match state.db.begin().await { 428 Ok(tx) => tx, 429 Err(e) => { 430 error!("Failed to begin transaction: {:?}", e); 431 return ApiError::InternalError(None).into_response(); 432 } 433 }; 434 if let Ok(Some(session_id)) = sqlx::query_scalar!( 435 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 436 refresh_jti 437 ) 438 .fetch_optional(&mut *tx) 439 .await 440 { 441 warn!( 442 "Refresh token reuse detected! Revoking token family for session_id: {}", 443 session_id 444 ); 445 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 446 .execute(&mut *tx) 447 .await; 448 let _ = tx.commit().await; 449 return ApiError::AuthenticationFailed(Some( 450 "Refresh token has been revoked due to suspected compromise".into(), 451 )) 452 .into_response(); 453 } 454 let session_row = match sqlx::query!( 455 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 456 FROM session_tokens st 457 JOIN users u ON st.did = u.did 458 JOIN user_keys k ON u.id = k.user_id 459 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 460 FOR UPDATE OF st"#, 461 refresh_jti 462 ) 463 .fetch_optional(&mut *tx) 464 .await 465 { 466 Ok(Some(row)) => row, 467 Ok(None) => { 468 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into())) 469 .into_response(); 470 } 471 Err(e) => { 472 error!("Database error fetching session: {:?}", e); 473 return ApiError::InternalError(None).into_response(); 474 } 475 }; 476 let key_bytes = 477 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 478 Ok(k) => k, 479 Err(e) => { 480 error!("Failed to decrypt user key: {:?}", e); 481 return ApiError::InternalError(None).into_response(); 482 } 483 }; 484 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 485 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into())) 486 .into_response(); 487 } 488 let new_access_meta = match crate::auth::create_access_token_with_delegation( 489 &session_row.did, 490 &key_bytes, 491 session_row.scope.as_deref(), 492 session_row.controller_did.as_deref(), 493 None, 494 ) { 495 Ok(m) => m, 496 Err(e) => { 497 error!("Failed to create access token: {:?}", e); 498 return ApiError::InternalError(None).into_response(); 499 } 500 }; 501 let new_refresh_meta = 502 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 503 Ok(m) => m, 504 Err(e) => { 505 error!("Failed to create refresh token: {:?}", e); 506 return ApiError::InternalError(None).into_response(); 507 } 508 }; 509 match sqlx::query!( 510 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 511 refresh_jti, 512 session_row.id 513 ) 514 .execute(&mut *tx) 515 .await 516 { 517 Ok(result) if result.rows_affected() == 0 => { 518 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 519 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 520 .execute(&mut *tx) 521 .await; 522 let _ = tx.commit().await; 523 return ApiError::AuthenticationFailed(Some("Refresh token has been revoked due to suspected compromise".into())).into_response(); 524 } 525 Err(e) => { 526 error!("Failed to record used refresh token: {:?}", e); 527 return ApiError::InternalError(None).into_response(); 528 } 529 Ok(_) => {} 530 } 531 if let Err(e) = sqlx::query!( 532 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 533 new_access_meta.jti, 534 new_refresh_meta.jti, 535 new_access_meta.expires_at, 536 new_refresh_meta.expires_at, 537 session_row.id 538 ) 539 .execute(&mut *tx) 540 .await 541 { 542 error!("Database error updating session: {:?}", e); 543 return ApiError::InternalError(None).into_response(); 544 } 545 if let Err(e) = tx.commit().await { 546 error!("Failed to commit transaction: {:?}", e); 547 return ApiError::InternalError(None).into_response(); 548 } 549 let did_for_doc = session_row.did.clone(); 550 let did_resolver = state.did_resolver.clone(); 551 let (db_result, did_doc) = tokio::join!( 552 sqlx::query!( 553 r#"SELECT 554 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref, 555 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 556 discord_verified, telegram_verified, signal_verified 557 FROM users WHERE did = $1"#, 558 session_row.did 559 ) 560 .fetch_optional(&state.db), 561 did_resolver.resolve_did_document(&did_for_doc) 562 ); 563 match db_result { 564 Ok(Some(u)) => { 565 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 566 crate::comms::CommsChannel::Email => ("email", u.email_verified), 567 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 568 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 569 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 570 }; 571 let pds_hostname = 572 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 573 let handle = full_handle(&u.handle, &pds_hostname); 574 let account_state = 575 AccountState::from_db_fields(u.deactivated_at, u.takedown_ref.clone(), None, None); 576 let mut response = json!({ 577 "accessJwt": new_access_meta.token, 578 "refreshJwt": new_refresh_meta.token, 579 "handle": handle, 580 "did": session_row.did, 581 "email": u.email, 582 "emailConfirmed": u.email_verified, 583 "preferredChannel": preferred_channel, 584 "preferredChannelVerified": preferred_channel_verified, 585 "preferredLocale": u.preferred_locale, 586 "isAdmin": u.is_admin, 587 "active": account_state.is_active() 588 }); 589 if let Some(doc) = did_doc { 590 response["didDoc"] = doc; 591 } 592 if let Some(status) = account_state.status_for_session() { 593 response["status"] = json!(status); 594 } 595 Json(response).into_response() 596 } 597 Ok(None) => { 598 error!("User not found for existing session: {}", session_row.did); 599 ApiError::InternalError(None).into_response() 600 } 601 Err(e) => { 602 error!("Database error fetching user: {:?}", e); 603 ApiError::InternalError(None).into_response() 604 } 605 } 606} 607 608#[derive(Deserialize)] 609#[serde(rename_all = "camelCase")] 610pub struct ConfirmSignupInput { 611 pub did: Did, 612 pub verification_code: String, 613} 614 615#[derive(Serialize)] 616#[serde(rename_all = "camelCase")] 617pub struct ConfirmSignupOutput { 618 pub access_jwt: String, 619 pub refresh_jwt: String, 620 pub handle: Handle, 621 pub did: Did, 622 pub email: Option<String>, 623 pub email_verified: bool, 624 pub preferred_channel: String, 625 pub preferred_channel_verified: bool, 626} 627 628pub async fn confirm_signup( 629 State(state): State<AppState>, 630 Json(input): Json<ConfirmSignupInput>, 631) -> Response { 632 info!("confirm_signup called for DID: {}", input.did); 633 let row = match sqlx::query!( 634 r#"SELECT 635 u.id, u.did, u.handle, u.email, 636 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 637 u.discord_id, u.telegram_username, u.signal_number, 638 k.key_bytes, k.encryption_version 639 FROM users u 640 JOIN user_keys k ON u.id = k.user_id 641 WHERE u.did = $1"#, 642 input.did.as_str() 643 ) 644 .fetch_optional(&state.db) 645 .await 646 { 647 Ok(Some(row)) => row, 648 Ok(None) => { 649 warn!("User not found for confirm_signup: {}", input.did); 650 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 651 .into_response(); 652 } 653 Err(e) => { 654 error!("Database error in confirm_signup: {:?}", e); 655 return ApiError::InternalError(None).into_response(); 656 } 657 }; 658 659 let (channel_str, identifier) = match row.channel { 660 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 661 crate::comms::CommsChannel::Discord => { 662 ("discord", row.discord_id.clone().unwrap_or_default()) 663 } 664 crate::comms::CommsChannel::Telegram => ( 665 "telegram", 666 row.telegram_username.clone().unwrap_or_default(), 667 ), 668 crate::comms::CommsChannel::Signal => { 669 ("signal", row.signal_number.clone().unwrap_or_default()) 670 } 671 }; 672 673 let normalized_token = 674 crate::auth::verification_token::normalize_token_input(&input.verification_code); 675 match crate::auth::verification_token::verify_signup_token( 676 &normalized_token, 677 channel_str, 678 &identifier, 679 ) { 680 Ok(token_data) => { 681 if token_data.did != input.did.as_str() { 682 warn!( 683 "Token DID mismatch for confirm_signup: expected {}, got {}", 684 input.did, token_data.did 685 ); 686 return ApiError::InvalidRequest("Invalid verification code".into()) 687 .into_response(); 688 } 689 } 690 Err(crate::auth::verification_token::VerifyError::Expired) => { 691 warn!("Verification code expired for user: {}", input.did); 692 return ApiError::ExpiredToken(Some("Verification code has expired".into())) 693 .into_response(); 694 } 695 Err(e) => { 696 warn!("Invalid verification code for user {}: {:?}", input.did, e); 697 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 698 } 699 } 700 701 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 702 Ok(k) => k, 703 Err(e) => { 704 error!("Failed to decrypt user key: {:?}", e); 705 return ApiError::InternalError(None).into_response(); 706 } 707 }; 708 709 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 710 Ok(m) => m, 711 Err(e) => { 712 error!("Failed to create access token: {:?}", e); 713 return ApiError::InternalError(None).into_response(); 714 } 715 }; 716 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 717 Ok(m) => m, 718 Err(e) => { 719 error!("Failed to create refresh token: {:?}", e); 720 return ApiError::InternalError(None).into_response(); 721 } 722 }; 723 724 let mut tx = match state.db.begin().await { 725 Ok(tx) => tx, 726 Err(e) => { 727 error!("Failed to begin transaction: {:?}", e); 728 return ApiError::InternalError(None).into_response(); 729 } 730 }; 731 732 let verified_column = match row.channel { 733 crate::comms::CommsChannel::Email => "email_verified", 734 crate::comms::CommsChannel::Discord => "discord_verified", 735 crate::comms::CommsChannel::Telegram => "telegram_verified", 736 crate::comms::CommsChannel::Signal => "signal_verified", 737 }; 738 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 739 if let Err(e) = sqlx::query(&update_query) 740 .bind(input.did.as_str()) 741 .execute(&mut *tx) 742 .await 743 { 744 error!("Failed to update verification status: {:?}", e); 745 return ApiError::InternalError(None).into_response(); 746 } 747 748 let no_scope: Option<String> = None; 749 if let Err(e) = sqlx::query!( 750 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 751 row.did, 752 access_meta.jti, 753 refresh_meta.jti, 754 access_meta.expires_at, 755 refresh_meta.expires_at, 756 false, 757 false, 758 no_scope 759 ) 760 .execute(&mut *tx) 761 .await 762 { 763 error!("Failed to insert session: {:?}", e); 764 return ApiError::InternalError(None).into_response(); 765 } 766 767 if let Err(e) = tx.commit().await { 768 error!("Failed to commit transaction: {:?}", e); 769 return ApiError::InternalError(None).into_response(); 770 } 771 772 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 773 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 774 warn!("Failed to enqueue welcome notification: {:?}", e); 775 } 776 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 777 let preferred_channel = match row.channel { 778 crate::comms::CommsChannel::Email => "email", 779 crate::comms::CommsChannel::Discord => "discord", 780 crate::comms::CommsChannel::Telegram => "telegram", 781 crate::comms::CommsChannel::Signal => "signal", 782 }; 783 Json(ConfirmSignupOutput { 784 access_jwt: access_meta.token, 785 refresh_jwt: refresh_meta.token, 786 handle: row.handle.into(), 787 did: row.did.into(), 788 email: row.email, 789 email_verified, 790 preferred_channel: preferred_channel.to_string(), 791 preferred_channel_verified: true, 792 }) 793 .into_response() 794} 795 796#[derive(Deserialize)] 797#[serde(rename_all = "camelCase")] 798pub struct ResendVerificationInput { 799 pub did: Did, 800} 801 802pub async fn resend_verification( 803 State(state): State<AppState>, 804 Json(input): Json<ResendVerificationInput>, 805) -> Response { 806 info!("resend_verification called for DID: {}", input.did); 807 let row = match sqlx::query!( 808 r#"SELECT 809 id, handle, email, 810 preferred_comms_channel as "channel: crate::comms::CommsChannel", 811 discord_id, telegram_username, signal_number, 812 email_verified, discord_verified, telegram_verified, signal_verified 813 FROM users 814 WHERE did = $1"#, 815 input.did.as_str() 816 ) 817 .fetch_optional(&state.db) 818 .await 819 { 820 Ok(Some(row)) => row, 821 Ok(None) => { 822 return ApiError::InvalidRequest("User not found".into()).into_response(); 823 } 824 Err(e) => { 825 error!("Database error in resend_verification: {:?}", e); 826 return ApiError::InternalError(None).into_response(); 827 } 828 }; 829 let is_verified = 830 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 831 if is_verified { 832 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 833 } 834 835 let (channel_str, recipient) = match row.channel { 836 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 837 crate::comms::CommsChannel::Discord => { 838 ("discord", row.discord_id.clone().unwrap_or_default()) 839 } 840 crate::comms::CommsChannel::Telegram => ( 841 "telegram", 842 row.telegram_username.clone().unwrap_or_default(), 843 ), 844 crate::comms::CommsChannel::Signal => { 845 ("signal", row.signal_number.clone().unwrap_or_default()) 846 } 847 }; 848 849 let verification_token = 850 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 851 let formatted_token = 852 crate::auth::verification_token::format_token_for_display(&verification_token); 853 854 if let Err(e) = crate::comms::enqueue_signup_verification( 855 &state.db, 856 row.id, 857 channel_str, 858 &recipient, 859 &formatted_token, 860 None, 861 ) 862 .await 863 { 864 warn!("Failed to enqueue verification notification: {:?}", e); 865 } 866 SuccessResponse::ok().into_response() 867} 868 869#[derive(Serialize)] 870#[serde(rename_all = "camelCase")] 871pub struct SessionInfo { 872 pub id: String, 873 pub session_type: String, 874 pub client_name: Option<String>, 875 pub created_at: String, 876 pub expires_at: String, 877 pub is_current: bool, 878} 879 880#[derive(Serialize)] 881#[serde(rename_all = "camelCase")] 882pub struct ListSessionsOutput { 883 pub sessions: Vec<SessionInfo>, 884} 885 886pub async fn list_sessions( 887 State(state): State<AppState>, 888 headers: HeaderMap, 889 auth: BearerAuth, 890) -> Response { 891 let current_jti = headers 892 .get("authorization") 893 .and_then(|v| v.to_str().ok()) 894 .and_then(|v| v.strip_prefix("Bearer ")) 895 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 896 897 let jwt_rows = match sqlx::query_as::< 898 _, 899 ( 900 i32, 901 String, 902 chrono::DateTime<chrono::Utc>, 903 chrono::DateTime<chrono::Utc>, 904 ), 905 >( 906 r#" 907 SELECT id, access_jti, created_at, refresh_expires_at 908 FROM session_tokens 909 WHERE did = $1 AND refresh_expires_at > NOW() 910 ORDER BY created_at DESC 911 "#, 912 ) 913 .bind(&auth.0.did) 914 .fetch_all(&state.db) 915 .await 916 { 917 Ok(rows) => rows, 918 Err(e) => { 919 error!("DB error fetching JWT sessions: {:?}", e); 920 return ApiError::InternalError(None).into_response(); 921 } 922 }; 923 924 let oauth_rows = match sqlx::query_as::< 925 _, 926 ( 927 i32, 928 String, 929 chrono::DateTime<chrono::Utc>, 930 chrono::DateTime<chrono::Utc>, 931 String, 932 ), 933 >( 934 r#" 935 SELECT id, token_id, created_at, expires_at, client_id 936 FROM oauth_token 937 WHERE did = $1 AND expires_at > NOW() 938 ORDER BY created_at DESC 939 "#, 940 ) 941 .bind(&auth.0.did) 942 .fetch_all(&state.db) 943 .await 944 { 945 Ok(rows) => rows, 946 Err(e) => { 947 error!("DB error fetching OAuth sessions: {:?}", e); 948 return ApiError::InternalError(None).into_response(); 949 } 950 }; 951 952 let jwt_sessions = jwt_rows 953 .into_iter() 954 .map(|(id, access_jti, created_at, expires_at)| SessionInfo { 955 id: format!("jwt:{}", id), 956 session_type: "legacy".to_string(), 957 client_name: None, 958 created_at: created_at.to_rfc3339(), 959 expires_at: expires_at.to_rfc3339(), 960 is_current: current_jti.as_ref() == Some(&access_jti), 961 }); 962 963 let is_oauth = auth.0.is_oauth; 964 let oauth_sessions = 965 oauth_rows 966 .into_iter() 967 .map(|(id, token_id, created_at, expires_at, client_id)| { 968 let client_name = extract_client_name(&client_id); 969 let is_current_oauth = is_oauth && current_jti.as_ref() == Some(&token_id); 970 SessionInfo { 971 id: format!("oauth:{}", id), 972 session_type: "oauth".to_string(), 973 client_name: Some(client_name), 974 created_at: created_at.to_rfc3339(), 975 expires_at: expires_at.to_rfc3339(), 976 is_current: is_current_oauth, 977 } 978 }); 979 980 let mut sessions: Vec<SessionInfo> = jwt_sessions.chain(oauth_sessions).collect(); 981 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 982 983 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 984} 985 986fn extract_client_name(client_id: &str) -> String { 987 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 988 "Localhost App".to_string() 989 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 990 parsed.host_str().unwrap_or("Unknown App").to_string() 991 } else { 992 client_id.to_string() 993 } 994} 995 996#[derive(Deserialize)] 997#[serde(rename_all = "camelCase")] 998pub struct RevokeSessionInput { 999 pub session_id: String, 1000} 1001 1002pub async fn revoke_session( 1003 State(state): State<AppState>, 1004 auth: BearerAuth, 1005 Json(input): Json<RevokeSessionInput>, 1006) -> Response { 1007 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 1008 let Ok(session_id) = jwt_id.parse::<i32>() else { 1009 return ApiError::InvalidRequest("Invalid session ID".into()).into_response(); 1010 }; 1011 let session = sqlx::query_as::<_, (String,)>( 1012 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 1013 ) 1014 .bind(session_id) 1015 .bind(&auth.0.did) 1016 .fetch_optional(&state.db) 1017 .await; 1018 let access_jti = match session { 1019 Ok(Some((jti,))) => jti, 1020 Ok(None) => { 1021 return ApiError::SessionNotFound.into_response(); 1022 } 1023 Err(e) => { 1024 error!("DB error in revoke_session: {:?}", e); 1025 return ApiError::InternalError(None).into_response(); 1026 } 1027 }; 1028 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 1029 .bind(session_id) 1030 .execute(&state.db) 1031 .await 1032 { 1033 error!("DB error deleting session: {:?}", e); 1034 return ApiError::InternalError(None).into_response(); 1035 } 1036 let cache_key = format!("auth:session:{}:{}", &auth.0.did, access_jti); 1037 if let Err(e) = state.cache.delete(&cache_key).await { 1038 warn!("Failed to invalidate session cache: {:?}", e); 1039 } 1040 info!(did = %&auth.0.did, session_id = %session_id, "JWT session revoked"); 1041 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 1042 let Ok(session_id) = oauth_id.parse::<i32>() else { 1043 return ApiError::InvalidRequest("Invalid session ID".into()).into_response(); 1044 }; 1045 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1046 .bind(session_id) 1047 .bind(&auth.0.did) 1048 .execute(&state.db) 1049 .await; 1050 match result { 1051 Ok(r) if r.rows_affected() == 0 => { 1052 return ApiError::SessionNotFound.into_response(); 1053 } 1054 Err(e) => { 1055 error!("DB error deleting OAuth session: {:?}", e); 1056 return ApiError::InternalError(None).into_response(); 1057 } 1058 _ => {} 1059 } 1060 info!(did = %&auth.0.did, session_id = %session_id, "OAuth session revoked"); 1061 } else { 1062 return ApiError::InvalidRequest("Invalid session ID format".into()).into_response(); 1063 } 1064 EmptyResponse::ok().into_response() 1065} 1066 1067pub async fn revoke_all_sessions( 1068 State(state): State<AppState>, 1069 headers: HeaderMap, 1070 auth: BearerAuth, 1071) -> Response { 1072 let current_jti = crate::auth::extract_auth_token_from_header( 1073 headers.get("authorization").and_then(|v| v.to_str().ok()), 1074 ) 1075 .and_then(|extracted| crate::auth::get_jti_from_token(&extracted.token).ok()); 1076 1077 let Some(ref jti) = current_jti else { 1078 return ApiError::InvalidToken(None).into_response(); 1079 }; 1080 1081 let mut tx = match state.db.begin().await { 1082 Ok(tx) => tx, 1083 Err(e) => { 1084 error!("Failed to begin transaction: {:?}", e); 1085 return ApiError::InternalError(None).into_response(); 1086 } 1087 }; 1088 1089 if auth.0.is_oauth { 1090 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1091 .bind(&auth.0.did) 1092 .execute(&mut *tx) 1093 .await 1094 { 1095 error!("DB error revoking JWT sessions: {:?}", e); 1096 return ApiError::InternalError(None).into_response(); 1097 } 1098 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1099 .bind(&auth.0.did) 1100 .bind(jti) 1101 .execute(&mut *tx) 1102 .await 1103 { 1104 error!("DB error revoking OAuth sessions: {:?}", e); 1105 return ApiError::InternalError(None).into_response(); 1106 } 1107 } else { 1108 if let Err(e) = 1109 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1110 .bind(&auth.0.did) 1111 .bind(jti) 1112 .execute(&mut *tx) 1113 .await 1114 { 1115 error!("DB error revoking JWT sessions: {:?}", e); 1116 return ApiError::InternalError(None).into_response(); 1117 } 1118 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1119 .bind(&auth.0.did) 1120 .execute(&mut *tx) 1121 .await 1122 { 1123 error!("DB error revoking OAuth sessions: {:?}", e); 1124 return ApiError::InternalError(None).into_response(); 1125 } 1126 } 1127 1128 if let Err(e) = tx.commit().await { 1129 error!("Failed to commit transaction: {:?}", e); 1130 return ApiError::InternalError(None).into_response(); 1131 } 1132 1133 info!(did = %&auth.0.did, "All other sessions revoked"); 1134 SuccessResponse::ok().into_response() 1135} 1136 1137#[derive(Serialize)] 1138#[serde(rename_all = "camelCase")] 1139pub struct LegacyLoginPreferenceOutput { 1140 pub allow_legacy_login: bool, 1141 pub has_mfa: bool, 1142} 1143 1144pub async fn get_legacy_login_preference( 1145 State(state): State<AppState>, 1146 auth: BearerAuth, 1147) -> Response { 1148 let result = sqlx::query!( 1149 r#"SELECT 1150 u.allow_legacy_login, 1151 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1152 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1153 FROM users u WHERE u.did = $1"#, 1154 &auth.0.did 1155 ) 1156 .fetch_optional(&state.db) 1157 .await; 1158 1159 match result { 1160 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1161 allow_legacy_login: row.allow_legacy_login, 1162 has_mfa: row.has_mfa, 1163 }) 1164 .into_response(), 1165 Ok(None) => ApiError::AccountNotFound.into_response(), 1166 Err(e) => { 1167 error!("DB error: {:?}", e); 1168 ApiError::InternalError(None).into_response() 1169 } 1170 } 1171} 1172 1173#[derive(Deserialize)] 1174#[serde(rename_all = "camelCase")] 1175pub struct UpdateLegacyLoginInput { 1176 pub allow_legacy_login: bool, 1177} 1178 1179pub async fn update_legacy_login_preference( 1180 State(state): State<AppState>, 1181 auth: BearerAuth, 1182 Json(input): Json<UpdateLegacyLoginInput>, 1183) -> Response { 1184 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1185 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1186 .await; 1187 } 1188 1189 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1190 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1191 } 1192 1193 let result = sqlx::query!( 1194 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1195 input.allow_legacy_login, 1196 &auth.0.did 1197 ) 1198 .fetch_optional(&state.db) 1199 .await; 1200 1201 match result { 1202 Ok(Some(_)) => { 1203 info!( 1204 did = %&auth.0.did, 1205 allow_legacy_login = input.allow_legacy_login, 1206 "Legacy login preference updated" 1207 ); 1208 Json(json!({ 1209 "allowLegacyLogin": input.allow_legacy_login 1210 })) 1211 .into_response() 1212 } 1213 Ok(None) => ApiError::AccountNotFound.into_response(), 1214 Err(e) => { 1215 error!("DB error: {:?}", e); 1216 ApiError::InternalError(None).into_response() 1217 } 1218 } 1219} 1220 1221use crate::comms::VALID_LOCALES; 1222 1223#[derive(Deserialize)] 1224#[serde(rename_all = "camelCase")] 1225pub struct UpdateLocaleInput { 1226 pub preferred_locale: String, 1227} 1228 1229pub async fn update_locale( 1230 State(state): State<AppState>, 1231 auth: BearerAuth, 1232 Json(input): Json<UpdateLocaleInput>, 1233) -> Response { 1234 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1235 return ApiError::InvalidRequest(format!( 1236 "Invalid locale. Valid options: {}", 1237 VALID_LOCALES.join(", ") 1238 )) 1239 .into_response(); 1240 } 1241 1242 let result = sqlx::query!( 1243 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1244 input.preferred_locale, 1245 &auth.0.did 1246 ) 1247 .fetch_optional(&state.db) 1248 .await; 1249 1250 match result { 1251 Ok(Some(_)) => { 1252 info!( 1253 did = %&auth.0.did, 1254 locale = %input.preferred_locale, 1255 "User locale preference updated" 1256 ); 1257 Json(json!({ 1258 "preferredLocale": input.preferred_locale 1259 })) 1260 .into_response() 1261 } 1262 Ok(None) => ApiError::AccountNotFound.into_response(), 1263 Err(e) => { 1264 error!("DB error updating locale: {:?}", e); 1265 ApiError::InternalError(None).into_response() 1266 } 1267 } 1268}