this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") 18 && let Ok(value) = forwarded.to_str() 19 && let Some(first_ip) = value.split(',').next() 20 { 21 return first_ip.trim().to_string(); 22 } 23 if let Some(real_ip) = headers.get("x-real-ip") 24 && let Ok(value) = real_ip.to_str() 25 { 26 return value.trim().to_string(); 27 } 28 "unknown".to_string() 29} 30 31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 32 let identifier = identifier.trim(); 33 if identifier.contains('@') || identifier.starts_with("did:") { 34 identifier.to_string() 35 } else if !identifier.contains('.') { 36 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 37 } else { 38 identifier.to_lowercase() 39 } 40} 41 42fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 43 stored_handle.to_string() 44} 45 46#[derive(Deserialize)] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateSessionOutput { 55 pub access_jwt: String, 56 pub refresh_jwt: String, 57 pub handle: String, 58 pub did: String, 59} 60 61pub async fn create_session( 62 State(state): State<AppState>, 63 headers: HeaderMap, 64 Json(input): Json<CreateSessionInput>, 65) -> Response { 66 info!( 67 "create_session called with identifier: {}", 68 input.identifier 69 ); 70 let client_ip = extract_client_ip(&headers); 71 if !state 72 .check_rate_limit(RateLimitKind::Login, &client_ip) 73 .await 74 { 75 warn!(ip = %client_ip, "Login rate limit exceeded"); 76 return ( 77 StatusCode::TOO_MANY_REQUESTS, 78 Json(json!({ 79 "error": "RateLimitExceeded", 80 "message": "Too many login attempts. Please try again later." 81 })), 82 ) 83 .into_response(); 84 } 85 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 86 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 87 info!( 88 "Normalized identifier: {} -> {}", 89 input.identifier, normalized_identifier 90 ); 91 let row = match sqlx::query!( 92 r#"SELECT 93 u.id, u.did, u.handle, u.password_hash, 94 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 95 k.key_bytes, k.encryption_version 96 FROM users u 97 JOIN user_keys k ON u.id = k.user_id 98 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 99 normalized_identifier 100 ) 101 .fetch_optional(&state.db) 102 .await 103 { 104 Ok(Some(row)) => row, 105 Ok(None) => { 106 let _ = verify( 107 &input.password, 108 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 109 ); 110 warn!("User not found for login attempt"); 111 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 112 .into_response(); 113 } 114 Err(e) => { 115 error!("Database error fetching user: {:?}", e); 116 return ApiError::InternalError.into_response(); 117 } 118 }; 119 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 120 Ok(k) => k, 121 Err(e) => { 122 error!("Failed to decrypt user key: {:?}", e); 123 return ApiError::InternalError.into_response(); 124 } 125 }; 126 let password_valid = if row 127 .password_hash 128 .as_ref() 129 .map(|h| verify(&input.password, h).unwrap_or(false)) 130 .unwrap_or(false) 131 { 132 true 133 } else { 134 let app_passwords = sqlx::query!( 135 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 136 row.id 137 ) 138 .fetch_all(&state.db) 139 .await 140 .unwrap_or_default(); 141 app_passwords 142 .iter() 143 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 144 }; 145 if !password_valid { 146 warn!("Password verification failed for login attempt"); 147 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 148 .into_response(); 149 } 150 let is_verified = 151 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 152 if !is_verified { 153 warn!("Login attempt for unverified account: {}", row.did); 154 return ( 155 StatusCode::FORBIDDEN, 156 Json(json!({ 157 "error": "AccountNotVerified", 158 "message": "Please verify your account before logging in", 159 "did": row.did 160 })), 161 ) 162 .into_response(); 163 } 164 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 165 Ok(m) => m, 166 Err(e) => { 167 error!("Failed to create access token: {:?}", e); 168 return ApiError::InternalError.into_response(); 169 } 170 }; 171 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 172 Ok(m) => m, 173 Err(e) => { 174 error!("Failed to create refresh token: {:?}", e); 175 return ApiError::InternalError.into_response(); 176 } 177 }; 178 if let Err(e) = sqlx::query!( 179 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 180 row.did, 181 access_meta.jti, 182 refresh_meta.jti, 183 access_meta.expires_at, 184 refresh_meta.expires_at 185 ) 186 .execute(&state.db) 187 .await 188 { 189 error!("Failed to insert session: {:?}", e); 190 return ApiError::InternalError.into_response(); 191 } 192 let handle = full_handle(&row.handle, &pds_hostname); 193 Json(CreateSessionOutput { 194 access_jwt: access_meta.token, 195 refresh_jwt: refresh_meta.token, 196 handle, 197 did: row.did, 198 }) 199 .into_response() 200} 201 202pub async fn get_session( 203 State(state): State<AppState>, 204 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 205) -> Response { 206 let permissions = auth_user.permissions(); 207 let can_read_email = permissions.allows_email_read(); 208 209 match sqlx::query!( 210 r#"SELECT 211 handle, email, email_verified, is_admin, deactivated_at, 212 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 213 discord_verified, telegram_verified, signal_verified 214 FROM users WHERE did = $1"#, 215 auth_user.did 216 ) 217 .fetch_optional(&state.db) 218 .await 219 { 220 Ok(Some(row)) => { 221 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 222 crate::comms::CommsChannel::Email => ("email", row.email_verified), 223 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 224 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 225 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 226 }; 227 let pds_hostname = 228 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 229 let handle = full_handle(&row.handle, &pds_hostname); 230 let is_active = row.deactivated_at.is_none(); 231 let email_value = if can_read_email { 232 row.email.clone() 233 } else { 234 None 235 }; 236 let email_verified_value = can_read_email && row.email_verified; 237 Json(json!({ 238 "handle": handle, 239 "did": auth_user.did, 240 "email": email_value, 241 "emailVerified": email_verified_value, 242 "preferredChannel": preferred_channel, 243 "preferredChannelVerified": preferred_channel_verified, 244 "isAdmin": row.is_admin, 245 "active": is_active, 246 "status": if is_active { "active" } else { "deactivated" }, 247 "didDoc": {} 248 })) 249 .into_response() 250 } 251 Ok(None) => ApiError::AuthenticationFailed.into_response(), 252 Err(e) => { 253 error!("Database error in get_session: {:?}", e); 254 ApiError::InternalError.into_response() 255 } 256 } 257} 258 259pub async fn delete_session( 260 State(state): State<AppState>, 261 headers: axum::http::HeaderMap, 262) -> Response { 263 let token = match crate::auth::extract_bearer_token_from_header( 264 headers.get("Authorization").and_then(|h| h.to_str().ok()), 265 ) { 266 Some(t) => t, 267 None => return ApiError::AuthenticationRequired.into_response(), 268 }; 269 let jti = match crate::auth::get_jti_from_token(&token) { 270 Ok(jti) => jti, 271 Err(_) => return ApiError::AuthenticationFailed.into_response(), 272 }; 273 let did = crate::auth::get_did_from_token(&token).ok(); 274 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 275 .execute(&state.db) 276 .await 277 { 278 Ok(res) if res.rows_affected() > 0 => { 279 if let Some(did) = did { 280 let session_cache_key = format!("auth:session:{}:{}", did, jti); 281 let _ = state.cache.delete(&session_cache_key).await; 282 } 283 Json(json!({})).into_response() 284 } 285 Ok(_) => ApiError::AuthenticationFailed.into_response(), 286 Err(e) => { 287 error!("Database error in delete_session: {:?}", e); 288 ApiError::AuthenticationFailed.into_response() 289 } 290 } 291} 292 293pub async fn refresh_session( 294 State(state): State<AppState>, 295 headers: axum::http::HeaderMap, 296) -> Response { 297 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 298 if !state 299 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 300 .await 301 { 302 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 303 return ( 304 axum::http::StatusCode::TOO_MANY_REQUESTS, 305 axum::Json(serde_json::json!({ 306 "error": "RateLimitExceeded", 307 "message": "Too many requests. Please try again later." 308 })), 309 ) 310 .into_response(); 311 } 312 let refresh_token = match crate::auth::extract_bearer_token_from_header( 313 headers.get("Authorization").and_then(|h| h.to_str().ok()), 314 ) { 315 Some(t) => t, 316 None => return ApiError::AuthenticationRequired.into_response(), 317 }; 318 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 319 Ok(jti) => jti, 320 Err(_) => { 321 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 322 .into_response(); 323 } 324 }; 325 let mut tx = match state.db.begin().await { 326 Ok(tx) => tx, 327 Err(e) => { 328 error!("Failed to begin transaction: {:?}", e); 329 return ApiError::InternalError.into_response(); 330 } 331 }; 332 if let Ok(Some(session_id)) = sqlx::query_scalar!( 333 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 334 refresh_jti 335 ) 336 .fetch_optional(&mut *tx) 337 .await 338 { 339 warn!( 340 "Refresh token reuse detected! Revoking token family for session_id: {}", 341 session_id 342 ); 343 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 344 .execute(&mut *tx) 345 .await; 346 let _ = tx.commit().await; 347 return ApiError::ExpiredTokenMsg( 348 "Refresh token has been revoked due to suspected compromise".into(), 349 ) 350 .into_response(); 351 } 352 let session_row = match sqlx::query!( 353 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 354 FROM session_tokens st 355 JOIN users u ON st.did = u.did 356 JOIN user_keys k ON u.id = k.user_id 357 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 358 FOR UPDATE OF st"#, 359 refresh_jti 360 ) 361 .fetch_optional(&mut *tx) 362 .await 363 { 364 Ok(Some(row)) => row, 365 Ok(None) => { 366 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 367 .into_response(); 368 } 369 Err(e) => { 370 error!("Database error fetching session: {:?}", e); 371 return ApiError::InternalError.into_response(); 372 } 373 }; 374 let key_bytes = 375 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 376 Ok(k) => k, 377 Err(e) => { 378 error!("Failed to decrypt user key: {:?}", e); 379 return ApiError::InternalError.into_response(); 380 } 381 }; 382 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 383 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 384 } 385 let new_access_meta = 386 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 387 Ok(m) => m, 388 Err(e) => { 389 error!("Failed to create access token: {:?}", e); 390 return ApiError::InternalError.into_response(); 391 } 392 }; 393 let new_refresh_meta = 394 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 395 Ok(m) => m, 396 Err(e) => { 397 error!("Failed to create refresh token: {:?}", e); 398 return ApiError::InternalError.into_response(); 399 } 400 }; 401 match sqlx::query!( 402 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 403 refresh_jti, 404 session_row.id 405 ) 406 .execute(&mut *tx) 407 .await 408 { 409 Ok(result) if result.rows_affected() == 0 => { 410 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 411 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 412 .execute(&mut *tx) 413 .await; 414 let _ = tx.commit().await; 415 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 416 } 417 Err(e) => { 418 error!("Failed to record used refresh token: {:?}", e); 419 return ApiError::InternalError.into_response(); 420 } 421 Ok(_) => {} 422 } 423 if let Err(e) = sqlx::query!( 424 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 425 new_access_meta.jti, 426 new_refresh_meta.jti, 427 new_access_meta.expires_at, 428 new_refresh_meta.expires_at, 429 session_row.id 430 ) 431 .execute(&mut *tx) 432 .await 433 { 434 error!("Database error updating session: {:?}", e); 435 return ApiError::InternalError.into_response(); 436 } 437 if let Err(e) = tx.commit().await { 438 error!("Failed to commit transaction: {:?}", e); 439 return ApiError::InternalError.into_response(); 440 } 441 match sqlx::query!( 442 r#"SELECT 443 handle, email, email_verified, is_admin, 444 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 445 discord_verified, telegram_verified, signal_verified 446 FROM users WHERE did = $1"#, 447 session_row.did 448 ) 449 .fetch_optional(&state.db) 450 .await 451 { 452 Ok(Some(u)) => { 453 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 454 crate::comms::CommsChannel::Email => ("email", u.email_verified), 455 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 456 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 457 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 458 }; 459 let pds_hostname = 460 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 461 let handle = full_handle(&u.handle, &pds_hostname); 462 Json(json!({ 463 "accessJwt": new_access_meta.token, 464 "refreshJwt": new_refresh_meta.token, 465 "handle": handle, 466 "did": session_row.did, 467 "email": u.email, 468 "emailVerified": u.email_verified, 469 "preferredChannel": preferred_channel, 470 "preferredChannelVerified": preferred_channel_verified, 471 "isAdmin": u.is_admin, 472 "active": true 473 })) 474 .into_response() 475 } 476 Ok(None) => { 477 error!("User not found for existing session: {}", session_row.did); 478 ApiError::InternalError.into_response() 479 } 480 Err(e) => { 481 error!("Database error fetching user: {:?}", e); 482 ApiError::InternalError.into_response() 483 } 484 } 485} 486 487#[derive(Deserialize)] 488#[serde(rename_all = "camelCase")] 489pub struct ConfirmSignupInput { 490 pub did: String, 491 pub verification_code: String, 492} 493 494#[derive(Serialize)] 495#[serde(rename_all = "camelCase")] 496pub struct ConfirmSignupOutput { 497 pub access_jwt: String, 498 pub refresh_jwt: String, 499 pub handle: String, 500 pub did: String, 501 pub email: Option<String>, 502 pub email_verified: bool, 503 pub preferred_channel: String, 504 pub preferred_channel_verified: bool, 505} 506 507pub async fn confirm_signup( 508 State(state): State<AppState>, 509 Json(input): Json<ConfirmSignupInput>, 510) -> Response { 511 info!("confirm_signup called for DID: {}", input.did); 512 let row = match sqlx::query!( 513 r#"SELECT 514 u.id, u.did, u.handle, u.email, 515 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 516 k.key_bytes, k.encryption_version 517 FROM users u 518 JOIN user_keys k ON u.id = k.user_id 519 WHERE u.did = $1"#, 520 input.did 521 ) 522 .fetch_optional(&state.db) 523 .await 524 { 525 Ok(Some(row)) => row, 526 Ok(None) => { 527 warn!("User not found for confirm_signup: {}", input.did); 528 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 529 .into_response(); 530 } 531 Err(e) => { 532 error!("Database error in confirm_signup: {:?}", e); 533 return ApiError::InternalError.into_response(); 534 } 535 }; 536 537 let channel_str = match row.channel { 538 crate::comms::CommsChannel::Email => "email", 539 crate::comms::CommsChannel::Discord => "discord", 540 crate::comms::CommsChannel::Telegram => "telegram", 541 crate::comms::CommsChannel::Signal => "signal", 542 }; 543 let verification = match sqlx::query!( 544 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel", 545 row.id, 546 channel_str as _ 547 ) 548 .fetch_optional(&state.db) 549 .await 550 { 551 Ok(Some(v)) => v, 552 Ok(None) => { 553 warn!("No verification code found for user: {}", input.did); 554 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 555 } 556 Err(e) => { 557 error!("Database error fetching verification: {:?}", e); 558 return ApiError::InternalError.into_response(); 559 } 560 }; 561 562 if verification.code != input.verification_code { 563 warn!("Invalid verification code for user: {}", input.did); 564 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 565 } 566 if verification.expires_at < Utc::now() { 567 warn!("Verification code expired for user: {}", input.did); 568 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 569 } 570 571 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 572 Ok(k) => k, 573 Err(e) => { 574 error!("Failed to decrypt user key: {:?}", e); 575 return ApiError::InternalError.into_response(); 576 } 577 }; 578 let verified_column = match row.channel { 579 crate::comms::CommsChannel::Email => "email_verified", 580 crate::comms::CommsChannel::Discord => "discord_verified", 581 crate::comms::CommsChannel::Telegram => "telegram_verified", 582 crate::comms::CommsChannel::Signal => "signal_verified", 583 }; 584 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 585 if let Err(e) = sqlx::query(&update_query) 586 .bind(&input.did) 587 .execute(&state.db) 588 .await 589 { 590 error!("Failed to update verification status: {:?}", e); 591 return ApiError::InternalError.into_response(); 592 } 593 594 if let Err(e) = sqlx::query!( 595 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel", 596 row.id, 597 channel_str as _ 598 ) 599 .execute(&state.db) 600 .await 601 { 602 error!("Failed to delete verification record: {:?}", e); 603 } 604 605 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 606 Ok(m) => m, 607 Err(e) => { 608 error!("Failed to create access token: {:?}", e); 609 return ApiError::InternalError.into_response(); 610 } 611 }; 612 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 613 Ok(m) => m, 614 Err(e) => { 615 error!("Failed to create refresh token: {:?}", e); 616 return ApiError::InternalError.into_response(); 617 } 618 }; 619 if let Err(e) = sqlx::query!( 620 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 621 row.did, 622 access_meta.jti, 623 refresh_meta.jti, 624 access_meta.expires_at, 625 refresh_meta.expires_at 626 ) 627 .execute(&state.db) 628 .await 629 { 630 error!("Failed to insert session: {:?}", e); 631 return ApiError::InternalError.into_response(); 632 } 633 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 634 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 635 warn!("Failed to enqueue welcome notification: {:?}", e); 636 } 637 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 638 let preferred_channel = match row.channel { 639 crate::comms::CommsChannel::Email => "email", 640 crate::comms::CommsChannel::Discord => "discord", 641 crate::comms::CommsChannel::Telegram => "telegram", 642 crate::comms::CommsChannel::Signal => "signal", 643 }; 644 Json(ConfirmSignupOutput { 645 access_jwt: access_meta.token, 646 refresh_jwt: refresh_meta.token, 647 handle: row.handle, 648 did: row.did, 649 email: row.email, 650 email_verified, 651 preferred_channel: preferred_channel.to_string(), 652 preferred_channel_verified: true, 653 }) 654 .into_response() 655} 656 657#[derive(Deserialize)] 658#[serde(rename_all = "camelCase")] 659pub struct ResendVerificationInput { 660 pub did: String, 661} 662 663pub async fn resend_verification( 664 State(state): State<AppState>, 665 Json(input): Json<ResendVerificationInput>, 666) -> Response { 667 info!("resend_verification called for DID: {}", input.did); 668 let row = match sqlx::query!( 669 r#"SELECT 670 id, handle, email, 671 preferred_comms_channel as "channel: crate::comms::CommsChannel", 672 discord_id, telegram_username, signal_number, 673 email_verified, discord_verified, telegram_verified, signal_verified 674 FROM users 675 WHERE did = $1"#, 676 input.did 677 ) 678 .fetch_optional(&state.db) 679 .await 680 { 681 Ok(Some(row)) => row, 682 Ok(None) => { 683 return ApiError::InvalidRequest("User not found".into()).into_response(); 684 } 685 Err(e) => { 686 error!("Database error in resend_verification: {:?}", e); 687 return ApiError::InternalError.into_response(); 688 } 689 }; 690 let is_verified = 691 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 692 if is_verified { 693 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 694 } 695 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 696 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 697 698 let (channel_str, recipient) = match row.channel { 699 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()), 700 crate::comms::CommsChannel::Discord => { 701 ("discord", row.discord_id.clone().unwrap_or_default()) 702 } 703 crate::comms::CommsChannel::Telegram => ( 704 "telegram", 705 row.telegram_username.clone().unwrap_or_default(), 706 ), 707 crate::comms::CommsChannel::Signal => { 708 ("signal", row.signal_number.clone().unwrap_or_default()) 709 } 710 }; 711 712 if let Err(e) = sqlx::query!( 713 r#" 714 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 715 VALUES ($1, $2::comms_channel, $3, $4, $5) 716 ON CONFLICT (user_id, channel) DO UPDATE 717 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW() 718 "#, 719 row.id, 720 channel_str as _, 721 verification_code, 722 recipient, 723 code_expires_at 724 ) 725 .execute(&state.db) 726 .await 727 { 728 error!("Failed to update verification code: {:?}", e); 729 return ApiError::InternalError.into_response(); 730 } 731 if let Err(e) = crate::comms::enqueue_signup_verification( 732 &state.db, 733 row.id, 734 channel_str, 735 &recipient, 736 &verification_code, 737 ) 738 .await 739 { 740 warn!("Failed to enqueue verification notification: {:?}", e); 741 } 742 Json(json!({"success": true})).into_response() 743} 744 745#[derive(Serialize)] 746#[serde(rename_all = "camelCase")] 747pub struct SessionInfo { 748 pub id: String, 749 pub created_at: String, 750 pub expires_at: String, 751 pub is_current: bool, 752} 753 754#[derive(Serialize)] 755#[serde(rename_all = "camelCase")] 756pub struct ListSessionsOutput { 757 pub sessions: Vec<SessionInfo>, 758} 759 760pub async fn list_sessions( 761 State(state): State<AppState>, 762 headers: HeaderMap, 763 auth: BearerAuth, 764) -> Response { 765 let current_jti = headers 766 .get("authorization") 767 .and_then(|v| v.to_str().ok()) 768 .and_then(|v| v.strip_prefix("Bearer ")) 769 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 770 let result = sqlx::query_as::< 771 _, 772 ( 773 i32, 774 String, 775 chrono::DateTime<chrono::Utc>, 776 chrono::DateTime<chrono::Utc>, 777 ), 778 >( 779 r#" 780 SELECT id, access_jti, created_at, refresh_expires_at 781 FROM session_tokens 782 WHERE did = $1 AND refresh_expires_at > NOW() 783 ORDER BY created_at DESC 784 "#, 785 ) 786 .bind(&auth.0.did) 787 .fetch_all(&state.db) 788 .await; 789 match result { 790 Ok(rows) => { 791 let sessions: Vec<SessionInfo> = rows 792 .into_iter() 793 .map(|(id, access_jti, created_at, expires_at)| SessionInfo { 794 id: id.to_string(), 795 created_at: created_at.to_rfc3339(), 796 expires_at: expires_at.to_rfc3339(), 797 is_current: current_jti.as_ref() == Some(&access_jti), 798 }) 799 .collect(); 800 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 801 } 802 Err(e) => { 803 error!("DB error in list_sessions: {:?}", e); 804 ( 805 StatusCode::INTERNAL_SERVER_ERROR, 806 Json(json!({"error": "InternalError"})), 807 ) 808 .into_response() 809 } 810 } 811} 812 813#[derive(Deserialize)] 814#[serde(rename_all = "camelCase")] 815pub struct RevokeSessionInput { 816 pub session_id: String, 817} 818 819pub async fn revoke_session( 820 State(state): State<AppState>, 821 auth: BearerAuth, 822 Json(input): Json<RevokeSessionInput>, 823) -> Response { 824 let session_id: i32 = match input.session_id.parse() { 825 Ok(id) => id, 826 Err(_) => { 827 return ( 828 StatusCode::BAD_REQUEST, 829 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 830 ) 831 .into_response(); 832 } 833 }; 834 let session = sqlx::query_as::<_, (String,)>( 835 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 836 ) 837 .bind(session_id) 838 .bind(&auth.0.did) 839 .fetch_optional(&state.db) 840 .await; 841 let access_jti = match session { 842 Ok(Some((jti,))) => jti, 843 Ok(None) => { 844 return ( 845 StatusCode::NOT_FOUND, 846 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 847 ) 848 .into_response(); 849 } 850 Err(e) => { 851 error!("DB error in revoke_session: {:?}", e); 852 return ( 853 StatusCode::INTERNAL_SERVER_ERROR, 854 Json(json!({"error": "InternalError"})), 855 ) 856 .into_response(); 857 } 858 }; 859 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 860 .bind(session_id) 861 .execute(&state.db) 862 .await 863 { 864 error!("DB error deleting session: {:?}", e); 865 return ( 866 StatusCode::INTERNAL_SERVER_ERROR, 867 Json(json!({"error": "InternalError"})), 868 ) 869 .into_response(); 870 } 871 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 872 if let Err(e) = state.cache.delete(&cache_key).await { 873 warn!("Failed to invalidate session cache: {:?}", e); 874 } 875 info!(did = %auth.0.did, session_id = %session_id, "Session revoked"); 876 (StatusCode::OK, Json(json!({}))).into_response() 877}