this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, info, warn}; 14 15fn extract_client_ip(headers: &HeaderMap) -> String { 16 if let Some(forwarded) = headers.get("x-forwarded-for") 17 && let Ok(value) = forwarded.to_str() 18 && let Some(first_ip) = value.split(',').next() 19 { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() 24 { 25 return value.trim().to_string(); 26 } 27 "unknown".to_string() 28} 29 30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 31 let identifier = identifier.trim(); 32 if identifier.contains('@') || identifier.starts_with("did:") { 33 identifier.to_string() 34 } else if !identifier.contains('.') { 35 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 36 } else { 37 identifier.to_lowercase() 38 } 39} 40 41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 42 stored_handle.to_string() 43} 44 45#[derive(Deserialize)] 46#[serde(rename_all = "camelCase")] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50 #[serde(default)] 51 pub allow_takendown: bool, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateSessionOutput { 57 pub access_jwt: String, 58 pub refresh_jwt: String, 59 pub handle: String, 60 pub did: String, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub did_doc: Option<serde_json::Value>, 63 #[serde(skip_serializing_if = "Option::is_none")] 64 pub email: Option<String>, 65 #[serde(skip_serializing_if = "Option::is_none")] 66 pub email_confirmed: Option<bool>, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub active: Option<bool>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub status: Option<String>, 71} 72 73pub async fn create_session( 74 State(state): State<AppState>, 75 headers: HeaderMap, 76 Json(input): Json<CreateSessionInput>, 77) -> Response { 78 info!( 79 "create_session called with identifier: {}", 80 input.identifier 81 ); 82 let client_ip = extract_client_ip(&headers); 83 if !state 84 .check_rate_limit(RateLimitKind::Login, &client_ip) 85 .await 86 { 87 warn!(ip = %client_ip, "Login rate limit exceeded"); 88 return ( 89 StatusCode::TOO_MANY_REQUESTS, 90 Json(json!({ 91 "error": "RateLimitExceeded", 92 "message": "Too many login attempts. Please try again later." 93 })), 94 ) 95 .into_response(); 96 } 97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 99 info!( 100 "Normalized identifier: {} -> {}", 101 input.identifier, normalized_identifier 102 ); 103 let row = match sqlx::query!( 104 r#"SELECT 105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref, 106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 107 u.allow_legacy_login, u.migrated_to_pds, 108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel", 109 k.key_bytes, k.encryption_version, 110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled 111 FROM users u 112 JOIN user_keys k ON u.id = k.user_id 113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 114 normalized_identifier 115 ) 116 .fetch_optional(&state.db) 117 .await 118 { 119 Ok(Some(row)) => row, 120 Ok(None) => { 121 let _ = verify( 122 &input.password, 123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 124 ); 125 warn!("User not found for login attempt"); 126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 127 .into_response(); 128 } 129 Err(e) => { 130 error!("Database error fetching user: {:?}", e); 131 return ApiError::InternalError.into_response(); 132 } 133 }; 134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 135 Ok(k) => k, 136 Err(e) => { 137 error!("Failed to decrypt user key: {:?}", e); 138 return ApiError::InternalError.into_response(); 139 } 140 }; 141 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row 142 .password_hash 143 .as_ref() 144 .map(|h| verify(&input.password, h).unwrap_or(false)) 145 .unwrap_or(false) 146 { 147 (true, None, None, None) 148 } else { 149 let app_passwords = sqlx::query!( 150 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 151 row.id 152 ) 153 .fetch_all(&state.db) 154 .await 155 .unwrap_or_default(); 156 let matched = app_passwords 157 .iter() 158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false)); 159 match matched { 160 Some(app) => ( 161 true, 162 Some(app.name.clone()), 163 app.scopes.clone(), 164 app.created_by_controller_did.clone(), 165 ), 166 None => (false, None, None, None), 167 } 168 }; 169 if !password_valid { 170 warn!("Password verification failed for login attempt"); 171 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 172 .into_response(); 173 } 174 let is_takendown = row.takedown_ref.is_some(); 175 if is_takendown && !input.allow_takendown { 176 warn!("Login attempt for takendown account: {}", row.did); 177 return ( 178 StatusCode::UNAUTHORIZED, 179 Json(json!({ 180 "error": "AccountTakedown", 181 "message": "Account has been taken down" 182 })), 183 ) 184 .into_response(); 185 } 186 let is_verified = 187 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 188 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did) 189 .await 190 .unwrap_or(false); 191 if !is_verified && !is_delegated { 192 warn!("Login attempt for unverified account: {}", row.did); 193 return ( 194 StatusCode::FORBIDDEN, 195 Json(json!({ 196 "error": "AccountNotVerified", 197 "message": "Please verify your account before logging in", 198 "did": row.did 199 })), 200 ) 201 .into_response(); 202 } 203 let has_totp = row.totp_enabled.unwrap_or(false); 204 let is_legacy_login = has_totp; 205 if has_totp && !row.allow_legacy_login { 206 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did); 207 return ( 208 StatusCode::FORBIDDEN, 209 Json(json!({ 210 "error": "MfaRequired", 211 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.", 212 "did": row.did 213 })), 214 ) 215 .into_response(); 216 } 217 let access_meta = match crate::auth::create_access_token_with_delegation( 218 &row.did, 219 &key_bytes, 220 app_password_scopes.as_deref(), 221 app_password_controller.as_deref(), 222 ) { 223 Ok(m) => m, 224 Err(e) => { 225 error!("Failed to create access token: {:?}", e); 226 return ApiError::InternalError.into_response(); 227 } 228 }; 229 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 230 Ok(m) => m, 231 Err(e) => { 232 error!("Failed to create refresh token: {:?}", e); 233 return ApiError::InternalError.into_response(); 234 } 235 }; 236 let did_for_doc = row.did.clone(); 237 let did_resolver = state.did_resolver.clone(); 238 let (insert_result, did_doc) = tokio::join!( 239 sqlx::query!( 240 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", 241 row.did, 242 access_meta.jti, 243 refresh_meta.jti, 244 access_meta.expires_at, 245 refresh_meta.expires_at, 246 is_legacy_login, 247 false, 248 app_password_scopes, 249 app_password_controller, 250 app_password_name 251 ) 252 .execute(&state.db), 253 did_resolver.resolve_did_document(&did_for_doc) 254 ); 255 if let Err(e) = insert_result { 256 error!("Failed to insert session: {:?}", e); 257 return ApiError::InternalError.into_response(); 258 } 259 if is_legacy_login { 260 warn!( 261 did = %row.did, 262 ip = %client_ip, 263 "Legacy login on TOTP-enabled account - sending notification" 264 ); 265 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 266 if let Err(e) = crate::comms::queue_legacy_login_notification( 267 &state.db, 268 row.id, 269 &hostname, 270 &client_ip, 271 row.preferred_comms_channel, 272 ) 273 .await 274 { 275 error!("Failed to queue legacy login notification: {:?}", e); 276 } 277 } 278 let handle = full_handle(&row.handle, &pds_hostname); 279 let is_migrated = row.deactivated_at.is_some() && row.migrated_to_pds.is_some(); 280 let is_active = row.deactivated_at.is_none() && !is_takendown; 281 let status = if is_takendown { 282 Some("takendown".to_string()) 283 } else if is_migrated { 284 Some("migrated".to_string()) 285 } else if row.deactivated_at.is_some() { 286 Some("deactivated".to_string()) 287 } else { 288 None 289 }; 290 Json(CreateSessionOutput { 291 access_jwt: access_meta.token, 292 refresh_jwt: refresh_meta.token, 293 handle, 294 did: row.did, 295 did_doc, 296 email: row.email, 297 email_confirmed: Some(row.email_verified), 298 active: Some(is_active), 299 status, 300 }) 301 .into_response() 302} 303 304pub async fn get_session( 305 State(state): State<AppState>, 306 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 307) -> Response { 308 let permissions = auth_user.permissions(); 309 let can_read_email = permissions.allows_email_read(); 310 311 let did_for_doc = auth_user.did.clone(); 312 let did_resolver = state.did_resolver.clone(); 313 let (db_result, did_doc) = tokio::join!( 314 sqlx::query!( 315 r#"SELECT 316 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale, 317 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 318 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at 319 FROM users WHERE did = $1"#, 320 auth_user.did 321 ) 322 .fetch_optional(&state.db), 323 did_resolver.resolve_did_document(&did_for_doc) 324 ); 325 match db_result { 326 Ok(Some(row)) => { 327 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 328 crate::comms::CommsChannel::Email => ("email", row.email_verified), 329 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 330 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 331 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 332 }; 333 let pds_hostname = 334 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 335 let handle = full_handle(&row.handle, &pds_hostname); 336 let is_takendown = row.takedown_ref.is_some(); 337 let is_migrated = 338 row.deactivated_at.is_some() && row.migrated_to_pds.is_some(); 339 let is_active = row.deactivated_at.is_none() && !is_takendown; 340 let email_value = if can_read_email { 341 row.email.clone() 342 } else { 343 None 344 }; 345 let email_confirmed_value = can_read_email && row.email_verified; 346 let mut response = json!({ 347 "handle": handle, 348 "did": auth_user.did, 349 "active": is_active, 350 "preferredChannel": preferred_channel, 351 "preferredChannelVerified": preferred_channel_verified, 352 "preferredLocale": row.preferred_locale, 353 "isAdmin": row.is_admin 354 }); 355 if can_read_email { 356 response["email"] = json!(email_value); 357 response["emailConfirmed"] = json!(email_confirmed_value); 358 } 359 if is_takendown { 360 response["status"] = json!("takendown"); 361 } else if is_migrated { 362 response["status"] = json!("migrated"); 363 response["migratedToPds"] = json!(row.migrated_to_pds); 364 response["migratedAt"] = json!(row.migrated_at); 365 } else if row.deactivated_at.is_some() { 366 response["status"] = json!("deactivated"); 367 } 368 if let Some(doc) = did_doc { 369 response["didDoc"] = doc; 370 } 371 Json(response) 372 .into_response() 373 } 374 Ok(None) => ApiError::AuthenticationFailed.into_response(), 375 Err(e) => { 376 error!("Database error in get_session: {:?}", e); 377 ApiError::InternalError.into_response() 378 } 379 } 380} 381 382pub async fn delete_session( 383 State(state): State<AppState>, 384 headers: axum::http::HeaderMap, 385) -> Response { 386 let token = match crate::auth::extract_bearer_token_from_header( 387 headers.get("Authorization").and_then(|h| h.to_str().ok()), 388 ) { 389 Some(t) => t, 390 None => return ApiError::AuthenticationRequired.into_response(), 391 }; 392 let jti = match crate::auth::get_jti_from_token(&token) { 393 Ok(jti) => jti, 394 Err(_) => return ApiError::AuthenticationFailed.into_response(), 395 }; 396 let did = crate::auth::get_did_from_token(&token).ok(); 397 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 398 .execute(&state.db) 399 .await 400 { 401 Ok(res) if res.rows_affected() > 0 => { 402 if let Some(did) = did { 403 let session_cache_key = format!("auth:session:{}:{}", did, jti); 404 let _ = state.cache.delete(&session_cache_key).await; 405 } 406 Json(json!({})).into_response() 407 } 408 Ok(_) => ApiError::AuthenticationFailed.into_response(), 409 Err(e) => { 410 error!("Database error in delete_session: {:?}", e); 411 ApiError::AuthenticationFailed.into_response() 412 } 413 } 414} 415 416pub async fn refresh_session( 417 State(state): State<AppState>, 418 headers: axum::http::HeaderMap, 419) -> Response { 420 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 421 if !state 422 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 423 .await 424 { 425 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 426 return ( 427 axum::http::StatusCode::TOO_MANY_REQUESTS, 428 axum::Json(serde_json::json!({ 429 "error": "RateLimitExceeded", 430 "message": "Too many requests. Please try again later." 431 })), 432 ) 433 .into_response(); 434 } 435 let refresh_token = match crate::auth::extract_bearer_token_from_header( 436 headers.get("Authorization").and_then(|h| h.to_str().ok()), 437 ) { 438 Some(t) => t, 439 None => return ApiError::AuthenticationRequired.into_response(), 440 }; 441 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 442 Ok(jti) => jti, 443 Err(_) => { 444 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 445 .into_response(); 446 } 447 }; 448 let mut tx = match state.db.begin().await { 449 Ok(tx) => tx, 450 Err(e) => { 451 error!("Failed to begin transaction: {:?}", e); 452 return ApiError::InternalError.into_response(); 453 } 454 }; 455 if let Ok(Some(session_id)) = sqlx::query_scalar!( 456 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 457 refresh_jti 458 ) 459 .fetch_optional(&mut *tx) 460 .await 461 { 462 warn!( 463 "Refresh token reuse detected! Revoking token family for session_id: {}", 464 session_id 465 ); 466 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 467 .execute(&mut *tx) 468 .await; 469 let _ = tx.commit().await; 470 return ApiError::ExpiredTokenMsg( 471 "Refresh token has been revoked due to suspected compromise".into(), 472 ) 473 .into_response(); 474 } 475 let session_row = match sqlx::query!( 476 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version 477 FROM session_tokens st 478 JOIN users u ON st.did = u.did 479 JOIN user_keys k ON u.id = k.user_id 480 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 481 FOR UPDATE OF st"#, 482 refresh_jti 483 ) 484 .fetch_optional(&mut *tx) 485 .await 486 { 487 Ok(Some(row)) => row, 488 Ok(None) => { 489 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 490 .into_response(); 491 } 492 Err(e) => { 493 error!("Database error fetching session: {:?}", e); 494 return ApiError::InternalError.into_response(); 495 } 496 }; 497 let key_bytes = 498 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 499 Ok(k) => k, 500 Err(e) => { 501 error!("Failed to decrypt user key: {:?}", e); 502 return ApiError::InternalError.into_response(); 503 } 504 }; 505 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 506 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 507 } 508 let new_access_meta = match crate::auth::create_access_token_with_delegation( 509 &session_row.did, 510 &key_bytes, 511 session_row.scope.as_deref(), 512 session_row.controller_did.as_deref(), 513 ) { 514 Ok(m) => m, 515 Err(e) => { 516 error!("Failed to create access token: {:?}", e); 517 return ApiError::InternalError.into_response(); 518 } 519 }; 520 let new_refresh_meta = 521 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 522 Ok(m) => m, 523 Err(e) => { 524 error!("Failed to create refresh token: {:?}", e); 525 return ApiError::InternalError.into_response(); 526 } 527 }; 528 match sqlx::query!( 529 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 530 refresh_jti, 531 session_row.id 532 ) 533 .execute(&mut *tx) 534 .await 535 { 536 Ok(result) if result.rows_affected() == 0 => { 537 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 538 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 539 .execute(&mut *tx) 540 .await; 541 let _ = tx.commit().await; 542 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 543 } 544 Err(e) => { 545 error!("Failed to record used refresh token: {:?}", e); 546 return ApiError::InternalError.into_response(); 547 } 548 Ok(_) => {} 549 } 550 if let Err(e) = sqlx::query!( 551 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 552 new_access_meta.jti, 553 new_refresh_meta.jti, 554 new_access_meta.expires_at, 555 new_refresh_meta.expires_at, 556 session_row.id 557 ) 558 .execute(&mut *tx) 559 .await 560 { 561 error!("Database error updating session: {:?}", e); 562 return ApiError::InternalError.into_response(); 563 } 564 if let Err(e) = tx.commit().await { 565 error!("Failed to commit transaction: {:?}", e); 566 return ApiError::InternalError.into_response(); 567 } 568 let did_for_doc = session_row.did.clone(); 569 let did_resolver = state.did_resolver.clone(); 570 let (db_result, did_doc) = tokio::join!( 571 sqlx::query!( 572 r#"SELECT 573 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref, 574 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 575 discord_verified, telegram_verified, signal_verified 576 FROM users WHERE did = $1"#, 577 session_row.did 578 ) 579 .fetch_optional(&state.db), 580 did_resolver.resolve_did_document(&did_for_doc) 581 ); 582 match db_result { 583 Ok(Some(u)) => { 584 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 585 crate::comms::CommsChannel::Email => ("email", u.email_verified), 586 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 587 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 588 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 589 }; 590 let pds_hostname = 591 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 592 let handle = full_handle(&u.handle, &pds_hostname); 593 let is_takendown = u.takedown_ref.is_some(); 594 let is_active = u.deactivated_at.is_none() && !is_takendown; 595 let mut response = json!({ 596 "accessJwt": new_access_meta.token, 597 "refreshJwt": new_refresh_meta.token, 598 "handle": handle, 599 "did": session_row.did, 600 "email": u.email, 601 "emailConfirmed": u.email_verified, 602 "preferredChannel": preferred_channel, 603 "preferredChannelVerified": preferred_channel_verified, 604 "preferredLocale": u.preferred_locale, 605 "isAdmin": u.is_admin, 606 "active": is_active 607 }); 608 if let Some(doc) = did_doc { 609 response["didDoc"] = doc; 610 } 611 if is_takendown { 612 response["status"] = json!("takendown"); 613 } else if u.deactivated_at.is_some() { 614 response["status"] = json!("deactivated"); 615 } 616 Json(response) 617 .into_response() 618 } 619 Ok(None) => { 620 error!("User not found for existing session: {}", session_row.did); 621 ApiError::InternalError.into_response() 622 } 623 Err(e) => { 624 error!("Database error fetching user: {:?}", e); 625 ApiError::InternalError.into_response() 626 } 627 } 628} 629 630#[derive(Deserialize)] 631#[serde(rename_all = "camelCase")] 632pub struct ConfirmSignupInput { 633 pub did: String, 634 pub verification_code: String, 635} 636 637#[derive(Serialize)] 638#[serde(rename_all = "camelCase")] 639pub struct ConfirmSignupOutput { 640 pub access_jwt: String, 641 pub refresh_jwt: String, 642 pub handle: String, 643 pub did: String, 644 pub email: Option<String>, 645 pub email_verified: bool, 646 pub preferred_channel: String, 647 pub preferred_channel_verified: bool, 648} 649 650pub async fn confirm_signup( 651 State(state): State<AppState>, 652 Json(input): Json<ConfirmSignupInput>, 653) -> Response { 654 info!("confirm_signup called for DID: {}", input.did); 655 let row = match sqlx::query!( 656 r#"SELECT 657 u.id, u.did, u.handle, u.email, 658 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 659 u.discord_id, u.telegram_username, u.signal_number, 660 k.key_bytes, k.encryption_version 661 FROM users u 662 JOIN user_keys k ON u.id = k.user_id 663 WHERE u.did = $1"#, 664 input.did 665 ) 666 .fetch_optional(&state.db) 667 .await 668 { 669 Ok(Some(row)) => row, 670 Ok(None) => { 671 warn!("User not found for confirm_signup: {}", input.did); 672 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 673 .into_response(); 674 } 675 Err(e) => { 676 error!("Database error in confirm_signup: {:?}", e); 677 return ApiError::InternalError.into_response(); 678 } 679 }; 680 681 let (channel_str, identifier) = match row.channel { 682 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 683 crate::comms::CommsChannel::Discord => { 684 ("discord", row.discord_id.clone().unwrap_or_default()) 685 } 686 crate::comms::CommsChannel::Telegram => ( 687 "telegram", 688 row.telegram_username.clone().unwrap_or_default(), 689 ), 690 crate::comms::CommsChannel::Signal => { 691 ("signal", row.signal_number.clone().unwrap_or_default()) 692 } 693 }; 694 695 let normalized_token = 696 crate::auth::verification_token::normalize_token_input(&input.verification_code); 697 match crate::auth::verification_token::verify_signup_token( 698 &normalized_token, 699 channel_str, 700 &identifier, 701 ) { 702 Ok(token_data) => { 703 if token_data.did != input.did { 704 warn!( 705 "Token DID mismatch for confirm_signup: expected {}, got {}", 706 input.did, token_data.did 707 ); 708 return ApiError::InvalidRequest("Invalid verification code".into()) 709 .into_response(); 710 } 711 } 712 Err(crate::auth::verification_token::VerifyError::Expired) => { 713 warn!("Verification code expired for user: {}", input.did); 714 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 715 .into_response(); 716 } 717 Err(e) => { 718 warn!("Invalid verification code for user {}: {:?}", input.did, e); 719 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 720 } 721 } 722 723 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 724 Ok(k) => k, 725 Err(e) => { 726 error!("Failed to decrypt user key: {:?}", e); 727 return ApiError::InternalError.into_response(); 728 } 729 }; 730 let verified_column = match row.channel { 731 crate::comms::CommsChannel::Email => "email_verified", 732 crate::comms::CommsChannel::Discord => "discord_verified", 733 crate::comms::CommsChannel::Telegram => "telegram_verified", 734 crate::comms::CommsChannel::Signal => "signal_verified", 735 }; 736 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 737 if let Err(e) = sqlx::query(&update_query) 738 .bind(&input.did) 739 .execute(&state.db) 740 .await 741 { 742 error!("Failed to update verification status: {:?}", e); 743 return ApiError::InternalError.into_response(); 744 } 745 746 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 747 Ok(m) => m, 748 Err(e) => { 749 error!("Failed to create access token: {:?}", e); 750 return ApiError::InternalError.into_response(); 751 } 752 }; 753 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 754 Ok(m) => m, 755 Err(e) => { 756 error!("Failed to create refresh token: {:?}", e); 757 return ApiError::InternalError.into_response(); 758 } 759 }; 760 let no_scope: Option<String> = None; 761 if let Err(e) = sqlx::query!( 762 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 763 row.did, 764 access_meta.jti, 765 refresh_meta.jti, 766 access_meta.expires_at, 767 refresh_meta.expires_at, 768 false, 769 false, 770 no_scope 771 ) 772 .execute(&state.db) 773 .await 774 { 775 error!("Failed to insert session: {:?}", e); 776 return ApiError::InternalError.into_response(); 777 } 778 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 779 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 780 warn!("Failed to enqueue welcome notification: {:?}", e); 781 } 782 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 783 let preferred_channel = match row.channel { 784 crate::comms::CommsChannel::Email => "email", 785 crate::comms::CommsChannel::Discord => "discord", 786 crate::comms::CommsChannel::Telegram => "telegram", 787 crate::comms::CommsChannel::Signal => "signal", 788 }; 789 Json(ConfirmSignupOutput { 790 access_jwt: access_meta.token, 791 refresh_jwt: refresh_meta.token, 792 handle: row.handle, 793 did: row.did, 794 email: row.email, 795 email_verified, 796 preferred_channel: preferred_channel.to_string(), 797 preferred_channel_verified: true, 798 }) 799 .into_response() 800} 801 802#[derive(Deserialize)] 803#[serde(rename_all = "camelCase")] 804pub struct ResendVerificationInput { 805 pub did: String, 806} 807 808pub async fn resend_verification( 809 State(state): State<AppState>, 810 Json(input): Json<ResendVerificationInput>, 811) -> Response { 812 info!("resend_verification called for DID: {}", input.did); 813 let row = match sqlx::query!( 814 r#"SELECT 815 id, handle, email, 816 preferred_comms_channel as "channel: crate::comms::CommsChannel", 817 discord_id, telegram_username, signal_number, 818 email_verified, discord_verified, telegram_verified, signal_verified 819 FROM users 820 WHERE did = $1"#, 821 input.did 822 ) 823 .fetch_optional(&state.db) 824 .await 825 { 826 Ok(Some(row)) => row, 827 Ok(None) => { 828 return ApiError::InvalidRequest("User not found".into()).into_response(); 829 } 830 Err(e) => { 831 error!("Database error in resend_verification: {:?}", e); 832 return ApiError::InternalError.into_response(); 833 } 834 }; 835 let is_verified = 836 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 837 if is_verified { 838 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 839 } 840 841 let (channel_str, recipient) = match row.channel { 842 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 843 crate::comms::CommsChannel::Discord => { 844 ("discord", row.discord_id.clone().unwrap_or_default()) 845 } 846 crate::comms::CommsChannel::Telegram => ( 847 "telegram", 848 row.telegram_username.clone().unwrap_or_default(), 849 ), 850 crate::comms::CommsChannel::Signal => { 851 ("signal", row.signal_number.clone().unwrap_or_default()) 852 } 853 }; 854 855 let verification_token = 856 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient); 857 let formatted_token = 858 crate::auth::verification_token::format_token_for_display(&verification_token); 859 860 if let Err(e) = crate::comms::enqueue_signup_verification( 861 &state.db, 862 row.id, 863 channel_str, 864 &recipient, 865 &formatted_token, 866 None, 867 ) 868 .await 869 { 870 warn!("Failed to enqueue verification notification: {:?}", e); 871 } 872 Json(json!({"success": true})).into_response() 873} 874 875#[derive(Serialize)] 876#[serde(rename_all = "camelCase")] 877pub struct SessionInfo { 878 pub id: String, 879 pub session_type: String, 880 pub client_name: Option<String>, 881 pub created_at: String, 882 pub expires_at: String, 883 pub is_current: bool, 884} 885 886#[derive(Serialize)] 887#[serde(rename_all = "camelCase")] 888pub struct ListSessionsOutput { 889 pub sessions: Vec<SessionInfo>, 890} 891 892pub async fn list_sessions( 893 State(state): State<AppState>, 894 headers: HeaderMap, 895 auth: BearerAuth, 896) -> Response { 897 let current_jti = headers 898 .get("authorization") 899 .and_then(|v| v.to_str().ok()) 900 .and_then(|v| v.strip_prefix("Bearer ")) 901 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 902 903 let mut sessions: Vec<SessionInfo> = Vec::new(); 904 905 let jwt_result = sqlx::query_as::< 906 _, 907 ( 908 i32, 909 String, 910 chrono::DateTime<chrono::Utc>, 911 chrono::DateTime<chrono::Utc>, 912 ), 913 >( 914 r#" 915 SELECT id, access_jti, created_at, refresh_expires_at 916 FROM session_tokens 917 WHERE did = $1 AND refresh_expires_at > NOW() 918 ORDER BY created_at DESC 919 "#, 920 ) 921 .bind(&auth.0.did) 922 .fetch_all(&state.db) 923 .await; 924 925 match jwt_result { 926 Ok(rows) => { 927 for (id, access_jti, created_at, expires_at) in rows { 928 sessions.push(SessionInfo { 929 id: format!("jwt:{}", id), 930 session_type: "legacy".to_string(), 931 client_name: None, 932 created_at: created_at.to_rfc3339(), 933 expires_at: expires_at.to_rfc3339(), 934 is_current: current_jti.as_ref() == Some(&access_jti), 935 }); 936 } 937 } 938 Err(e) => { 939 error!("DB error fetching JWT sessions: {:?}", e); 940 return ( 941 StatusCode::INTERNAL_SERVER_ERROR, 942 Json(json!({"error": "InternalError"})), 943 ) 944 .into_response(); 945 } 946 } 947 948 let oauth_result = sqlx::query_as::< 949 _, 950 ( 951 i32, 952 String, 953 chrono::DateTime<chrono::Utc>, 954 chrono::DateTime<chrono::Utc>, 955 String, 956 ), 957 >( 958 r#" 959 SELECT id, token_id, created_at, expires_at, client_id 960 FROM oauth_token 961 WHERE did = $1 AND expires_at > NOW() 962 ORDER BY created_at DESC 963 "#, 964 ) 965 .bind(&auth.0.did) 966 .fetch_all(&state.db) 967 .await; 968 969 match oauth_result { 970 Ok(rows) => { 971 for (id, token_id, created_at, expires_at, client_id) in rows { 972 let client_name = extract_client_name(&client_id); 973 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 974 sessions.push(SessionInfo { 975 id: format!("oauth:{}", id), 976 session_type: "oauth".to_string(), 977 client_name: Some(client_name), 978 created_at: created_at.to_rfc3339(), 979 expires_at: expires_at.to_rfc3339(), 980 is_current: is_current_oauth, 981 }); 982 } 983 } 984 Err(e) => { 985 error!("DB error fetching OAuth sessions: {:?}", e); 986 return ( 987 StatusCode::INTERNAL_SERVER_ERROR, 988 Json(json!({"error": "InternalError"})), 989 ) 990 .into_response(); 991 } 992 } 993 994 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 995 996 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 997} 998 999fn extract_client_name(client_id: &str) -> String { 1000 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") { 1001 "Localhost App".to_string() 1002 } else if let Ok(parsed) = reqwest::Url::parse(client_id) { 1003 parsed.host_str().unwrap_or("Unknown App").to_string() 1004 } else { 1005 client_id.to_string() 1006 } 1007} 1008 1009#[derive(Deserialize)] 1010#[serde(rename_all = "camelCase")] 1011pub struct RevokeSessionInput { 1012 pub session_id: String, 1013} 1014 1015pub async fn revoke_session( 1016 State(state): State<AppState>, 1017 auth: BearerAuth, 1018 Json(input): Json<RevokeSessionInput>, 1019) -> Response { 1020 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") { 1021 let session_id: i32 = match jwt_id.parse() { 1022 Ok(id) => id, 1023 Err(_) => { 1024 return ( 1025 StatusCode::BAD_REQUEST, 1026 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1027 ) 1028 .into_response(); 1029 } 1030 }; 1031 let session = sqlx::query_as::<_, (String,)>( 1032 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 1033 ) 1034 .bind(session_id) 1035 .bind(&auth.0.did) 1036 .fetch_optional(&state.db) 1037 .await; 1038 let access_jti = match session { 1039 Ok(Some((jti,))) => jti, 1040 Ok(None) => { 1041 return ( 1042 StatusCode::NOT_FOUND, 1043 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1044 ) 1045 .into_response(); 1046 } 1047 Err(e) => { 1048 error!("DB error in revoke_session: {:?}", e); 1049 return ( 1050 StatusCode::INTERNAL_SERVER_ERROR, 1051 Json(json!({"error": "InternalError"})), 1052 ) 1053 .into_response(); 1054 } 1055 }; 1056 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 1057 .bind(session_id) 1058 .execute(&state.db) 1059 .await 1060 { 1061 error!("DB error deleting session: {:?}", e); 1062 return ( 1063 StatusCode::INTERNAL_SERVER_ERROR, 1064 Json(json!({"error": "InternalError"})), 1065 ) 1066 .into_response(); 1067 } 1068 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 1069 if let Err(e) = state.cache.delete(&cache_key).await { 1070 warn!("Failed to invalidate session cache: {:?}", e); 1071 } 1072 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked"); 1073 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") { 1074 let session_id: i32 = match oauth_id.parse() { 1075 Ok(id) => id, 1076 Err(_) => { 1077 return ( 1078 StatusCode::BAD_REQUEST, 1079 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 1080 ) 1081 .into_response(); 1082 } 1083 }; 1084 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2") 1085 .bind(session_id) 1086 .bind(&auth.0.did) 1087 .execute(&state.db) 1088 .await; 1089 match result { 1090 Ok(r) if r.rows_affected() == 0 => { 1091 return ( 1092 StatusCode::NOT_FOUND, 1093 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 1094 ) 1095 .into_response(); 1096 } 1097 Err(e) => { 1098 error!("DB error deleting OAuth session: {:?}", e); 1099 return ( 1100 StatusCode::INTERNAL_SERVER_ERROR, 1101 Json(json!({"error": "InternalError"})), 1102 ) 1103 .into_response(); 1104 } 1105 _ => {} 1106 } 1107 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked"); 1108 } else { 1109 return ( 1110 StatusCode::BAD_REQUEST, 1111 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})), 1112 ) 1113 .into_response(); 1114 } 1115 (StatusCode::OK, Json(json!({}))).into_response() 1116} 1117 1118pub async fn revoke_all_sessions( 1119 State(state): State<AppState>, 1120 headers: HeaderMap, 1121 auth: BearerAuth, 1122) -> Response { 1123 let current_jti = headers 1124 .get("authorization") 1125 .and_then(|v| v.to_str().ok()) 1126 .and_then(|v| v.strip_prefix("Bearer ")) 1127 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 1128 1129 if let Some(ref jti) = current_jti { 1130 if auth.0.is_oauth { 1131 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1132 .bind(&auth.0.did) 1133 .execute(&state.db) 1134 .await 1135 { 1136 error!("DB error revoking JWT sessions: {:?}", e); 1137 return ( 1138 StatusCode::INTERNAL_SERVER_ERROR, 1139 Json(json!({"error": "InternalError"})), 1140 ) 1141 .into_response(); 1142 } 1143 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1144 .bind(&auth.0.did) 1145 .bind(jti) 1146 .execute(&state.db) 1147 .await 1148 { 1149 error!("DB error revoking OAuth sessions: {:?}", e); 1150 return ( 1151 StatusCode::INTERNAL_SERVER_ERROR, 1152 Json(json!({"error": "InternalError"})), 1153 ) 1154 .into_response(); 1155 } 1156 } else { 1157 if let Err(e) = 1158 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1159 .bind(&auth.0.did) 1160 .bind(jti) 1161 .execute(&state.db) 1162 .await 1163 { 1164 error!("DB error revoking JWT sessions: {:?}", e); 1165 return ( 1166 StatusCode::INTERNAL_SERVER_ERROR, 1167 Json(json!({"error": "InternalError"})), 1168 ) 1169 .into_response(); 1170 } 1171 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1172 .bind(&auth.0.did) 1173 .execute(&state.db) 1174 .await 1175 { 1176 error!("DB error revoking OAuth sessions: {:?}", e); 1177 return ( 1178 StatusCode::INTERNAL_SERVER_ERROR, 1179 Json(json!({"error": "InternalError"})), 1180 ) 1181 .into_response(); 1182 } 1183 } 1184 } else { 1185 return ( 1186 StatusCode::BAD_REQUEST, 1187 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})), 1188 ) 1189 .into_response(); 1190 } 1191 1192 info!(did = %auth.0.did, "All other sessions revoked"); 1193 (StatusCode::OK, Json(json!({"success": true}))).into_response() 1194} 1195 1196#[derive(Serialize)] 1197#[serde(rename_all = "camelCase")] 1198pub struct LegacyLoginPreferenceOutput { 1199 pub allow_legacy_login: bool, 1200 pub has_mfa: bool, 1201} 1202 1203pub async fn get_legacy_login_preference( 1204 State(state): State<AppState>, 1205 auth: BearerAuth, 1206) -> Response { 1207 let result = sqlx::query!( 1208 r#"SELECT 1209 u.allow_legacy_login, 1210 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR 1211 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!" 1212 FROM users u WHERE u.did = $1"#, 1213 auth.0.did 1214 ) 1215 .fetch_optional(&state.db) 1216 .await; 1217 1218 match result { 1219 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput { 1220 allow_legacy_login: row.allow_legacy_login, 1221 has_mfa: row.has_mfa, 1222 }) 1223 .into_response(), 1224 Ok(None) => ( 1225 StatusCode::NOT_FOUND, 1226 Json(json!({"error": "AccountNotFound"})), 1227 ) 1228 .into_response(), 1229 Err(e) => { 1230 error!("DB error: {:?}", e); 1231 ( 1232 StatusCode::INTERNAL_SERVER_ERROR, 1233 Json(json!({"error": "InternalError"})), 1234 ) 1235 .into_response() 1236 } 1237 } 1238} 1239 1240#[derive(Deserialize)] 1241#[serde(rename_all = "camelCase")] 1242pub struct UpdateLegacyLoginInput { 1243 pub allow_legacy_login: bool, 1244} 1245 1246pub async fn update_legacy_login_preference( 1247 State(state): State<AppState>, 1248 auth: BearerAuth, 1249 Json(input): Json<UpdateLegacyLoginInput>, 1250) -> Response { 1251 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await { 1252 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did) 1253 .await; 1254 } 1255 1256 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await { 1257 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await; 1258 } 1259 1260 let result = sqlx::query!( 1261 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did", 1262 input.allow_legacy_login, 1263 auth.0.did 1264 ) 1265 .fetch_optional(&state.db) 1266 .await; 1267 1268 match result { 1269 Ok(Some(_)) => { 1270 info!( 1271 did = %auth.0.did, 1272 allow_legacy_login = input.allow_legacy_login, 1273 "Legacy login preference updated" 1274 ); 1275 Json(json!({ 1276 "allowLegacyLogin": input.allow_legacy_login 1277 })) 1278 .into_response() 1279 } 1280 Ok(None) => ( 1281 StatusCode::NOT_FOUND, 1282 Json(json!({"error": "AccountNotFound"})), 1283 ) 1284 .into_response(), 1285 Err(e) => { 1286 error!("DB error: {:?}", e); 1287 ( 1288 StatusCode::INTERNAL_SERVER_ERROR, 1289 Json(json!({"error": "InternalError"})), 1290 ) 1291 .into_response() 1292 } 1293 } 1294} 1295 1296use crate::comms::locale::VALID_LOCALES; 1297 1298#[derive(Deserialize)] 1299#[serde(rename_all = "camelCase")] 1300pub struct UpdateLocaleInput { 1301 pub preferred_locale: String, 1302} 1303 1304pub async fn update_locale( 1305 State(state): State<AppState>, 1306 auth: BearerAuth, 1307 Json(input): Json<UpdateLocaleInput>, 1308) -> Response { 1309 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) { 1310 return ( 1311 StatusCode::BAD_REQUEST, 1312 Json(json!({ 1313 "error": "InvalidRequest", 1314 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", ")) 1315 })), 1316 ) 1317 .into_response(); 1318 } 1319 1320 let result = sqlx::query!( 1321 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did", 1322 input.preferred_locale, 1323 auth.0.did 1324 ) 1325 .fetch_optional(&state.db) 1326 .await; 1327 1328 match result { 1329 Ok(Some(_)) => { 1330 info!( 1331 did = %auth.0.did, 1332 locale = %input.preferred_locale, 1333 "User locale preference updated" 1334 ); 1335 Json(json!({ 1336 "preferredLocale": input.preferred_locale 1337 })) 1338 .into_response() 1339 } 1340 Ok(None) => ( 1341 StatusCode::NOT_FOUND, 1342 Json(json!({"error": "AccountNotFound"})), 1343 ) 1344 .into_response(), 1345 Err(e) => { 1346 error!("DB error updating locale: {:?}", e); 1347 ( 1348 StatusCode::INTERNAL_SERVER_ERROR, 1349 Json(json!({"error": "InternalError"})), 1350 ) 1351 .into_response() 1352 } 1353 } 1354}